You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/08/22 16:59:49 UTC
[beam] branch master updated: [BEAM-7587] Spark portable streaming
(#12157)
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 10361a3 [BEAM-7587] Spark portable streaming (#12157)
10361a3 is described below
commit 10361a3e138532cffd086c98016235c4cd2abcf8
Author: annaqin418 <an...@google.com>
AuthorDate: Sat Aug 22 12:59:05 2020 -0400
[BEAM-7587] Spark portable streaming (#12157)
[BEAM-7587] Implement Spark portable streaming.
---
..._PortableValidatesRunner_Spark_Streaming.groovy | 43 +++
.../translation/PipelineTranslatorUtils.java | 29 ++
runners/spark/job-server/build.gradle | 110 +++++--
.../beam/runners/spark/SparkPipelineResult.java | 18 +-
.../beam/runners/spark/SparkPipelineRunner.java | 109 +++++--
.../SparkPortableStreamingPipelineOptions.java | 36 +++
.../beam/runners/spark/SparkRunnerRegistrar.java | 4 +-
.../SparkBatchPortablePipelineTranslator.java | 43 +--
.../translation/SparkExecutableStageFunction.java | 7 +
.../SparkPortablePipelineTranslator.java | 38 +++
.../SparkStreamingPortablePipelineTranslator.java | 360 +++++++++++++++++++++
.../SparkStreamingTranslationContext.java | 50 +++
.../spark/translation/SparkTranslationContext.java | 4 +
.../streaming/StreamingTransformTranslator.java | 2 +-
.../translation/streaming/UnboundedDataset.java | 4 +-
.../runners/spark/SparkRunnerRegistrarTest.java | 5 +-
.../SparkExecutableStageFunctionTest.java | 22 +-
17 files changed, 803 insertions(+), 81 deletions(-)
diff --git a/.test-infra/jenkins/job_PostCommit_Java_PortableValidatesRunner_Spark_Streaming.groovy b/.test-infra/jenkins/job_PostCommit_Java_PortableValidatesRunner_Spark_Streaming.groovy
new file mode 100644
index 0000000..bfc5063
--- /dev/null
+++ b/.test-infra/jenkins/job_PostCommit_Java_PortableValidatesRunner_Spark_Streaming.groovy
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as commonJobProperties
+import PostcommitJobBuilder
+
+// This job runs the suite of Java ValidatesRunner tests against the Spark runner in streaming mode.
+PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_PVR_Spark_Streaming',
+ 'Run Java Spark PortableValidatesRunner Streaming', 'Java Spark PortableValidatesRunner Streaming Tests', this) {
+ description('Runs the Java PortableValidatesRunner suite on the Spark runner in streaming mode.')
+
+ // Set common parameters.
+ commonJobProperties.setTopLevelMainJobProperties(delegate)
+
+ // Publish all test results to Jenkins
+ publishers {
+ archiveJunit('**/build/test-results/**/*.xml')
+ }
+
+ // Gradle goals for this job.
+ steps {
+ gradle {
+ rootBuildScriptDir(commonJobProperties.checkoutDir)
+ tasks(':runners:spark:job-server:validatesPortableRunnerStreaming')
+ commonJobProperties.setGradleSwitches(delegate)
+ }
+ }
+ }
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
index 1fa4ef0..4705290 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
@@ -47,6 +47,7 @@ import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBu
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableBiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.joda.time.Instant;
@@ -163,4 +164,32 @@ public final class PipelineTranslatorUtils {
String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
}
}
+
+ public static <T> WindowedValue.WindowedValueCoder<T> getWindowedValueCoder(
+ String pCollectionId, RunnerApi.Components components) {
+ RunnerApi.PCollection pCollection = components.getPcollectionsOrThrow(pCollectionId);
+ PipelineNode.PCollectionNode pCollectionNode =
+ PipelineNode.pCollection(pCollectionId, pCollection);
+ WindowedValue.WindowedValueCoder<T> coder;
+ try {
+ coder =
+ (WindowedValue.WindowedValueCoder)
+ WireCoders.instantiateRunnerWireCoder(pCollectionNode, components);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return coder;
+ }
+
+ public static String getInputId(PipelineNode.PTransformNode transformNode) {
+ return Iterables.getOnlyElement(transformNode.getTransform().getInputsMap().values());
+ }
+
+ public static String getOutputId(PipelineNode.PTransformNode transformNode) {
+ return Iterables.getOnlyElement(transformNode.getTransform().getOutputsMap().values());
+ }
+
+ public static String getExecutableStageIntermediateId(PipelineNode.PTransformNode transformNode) {
+ return transformNode.getId();
+ }
}
diff --git a/runners/spark/job-server/build.gradle b/runners/spark/job-server/build.gradle
index bd05d0f..cfde392 100644
--- a/runners/spark/job-server/build.gradle
+++ b/runners/spark/job-server/build.gradle
@@ -82,20 +82,16 @@ runShadow {
jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
}
-def portableValidatesRunnerTask(String name) {
- createPortableValidatesRunnerTask(
- name: "validatesPortableRunner${name}",
- jobServerDriver: "org.apache.beam.runners.spark.SparkJobServerDriver",
- jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0",
- testClasspathConfiguration: configurations.validatesPortableRunner,
- numParallelTests: 4,
- environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
- systemProperties: [
- "beam.spark.test.reuseSparkContext": "false",
- "spark.ui.enabled": "false",
- "spark.ui.showConsoleProgress": "false",
- ],
- testCategories: {
+def portableValidatesRunnerTask(String name, Boolean streaming) {
+ def pipelineOptions = []
+ def testCategories
+ def testFilter
+
+ if (streaming) {
+ pipelineOptions += "--streaming"
+ pipelineOptions += "--streamingTimeoutMs=30000"
+
+ testCategories = {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
@@ -111,24 +107,96 @@ def portableValidatesRunnerTask(String name) {
excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
- //SplitableDoFnTests
+ // TODO (BEAM-7222) SplittableDoFnTests
excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
- },
- testFilter: {
- // TODO(BEAM-10094)
+ // Currently unsupported in portable streaming:
+ // TODO (BEAM-10712)
+ excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputs'
+ // TODO (BEAM-10754)
+ excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo'
+ // TODO (BEAM-10755)
+ excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
+ }
+
+ testFilter = {
+ // TODO (BEAM-10094)
excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
- },
- )
+ // TODO (BEAM-10784) Currently unsupported in portable streaming:
+ // // Timeout error
+ excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testWindowedContainsInAnyOrder'
+ excludeTestsMatching 'org.apache.beam.sdk.testing.PAssertTest.testWindowedSerializablePredicate'
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowTest.testNoWindowFnDoesNotReassignWindows'
+ // // Assertion error: empty iterable output
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testFixedWindowsCombine'
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombine'
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleAfterFixedWindowsAndGroupByKey'
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleAfterSessionsAndGroupByKey'
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleAfterSlidingWindowsAndGroupByKey'
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.join.CoGroupByKeyTest.testCoGroupByKeyWithWindowing'
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest'
+ // // Assertion error: incorrect output
+ excludeTestsMatching 'CombineTest$BasicTests.testHotKeyCombining'
+ }
+ }
+ else {
+ testCategories = {
+ includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+ excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
+ excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
+ // TODO (BEAM-7222) SplittableDoFnTests
+ excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
+ }
+ testFilter = {
+ // TODO (BEAM-10094)
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
+ }
+ }
+
+ createPortableValidatesRunnerTask(
+ name: "validatesPortableRunner${name}",
+ jobServerDriver: "org.apache.beam.runners.spark.SparkJobServerDriver",
+ jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0",
+ testClasspathConfiguration: configurations.validatesPortableRunner,
+ numParallelTests: 4,
+ pipelineOpts: pipelineOptions,
+ environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
+ systemProperties: [
+ "beam.spark.test.reuseSparkContext": "false",
+ "spark.ui.enabled": "false",
+ "spark.ui.showConsoleProgress": "false",
+ ],
+ testCategories: testCategories,
+ testFilter: testFilter,
+ )
}
-project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch")
+project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch", false)
+project.ext.validatesPortableRunnerStreaming = portableValidatesRunnerTask("Streaming", true)
task validatesPortableRunner() {
dependsOn validatesPortableRunnerBatch
+ dependsOn validatesPortableRunnerStreaming
}
def jobPort = BeamModulePlugin.startingExpansionPortNumber.getAndDecrement()
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
index 44d2524..5c0656d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
@@ -37,8 +37,6 @@ import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/** Represents a Spark pipeline execution result. */
public abstract class SparkPipelineResult implements PipelineResult {
@@ -148,8 +146,6 @@ public abstract class SparkPipelineResult implements PipelineResult {
static class PortableBatchMode extends BatchMode implements PortablePipelineResult {
- private static final Logger LOG = LoggerFactory.getLogger(BatchMode.class);
-
PortableBatchMode(Future<?> pipelineExecution, JavaSparkContext javaSparkContext) {
super(pipelineExecution, javaSparkContext);
}
@@ -213,6 +209,20 @@ public abstract class SparkPipelineResult implements PipelineResult {
}
}
+ static class PortableStreamingMode extends StreamingMode implements PortablePipelineResult {
+
+ PortableStreamingMode(Future<?> pipelineExecution, JavaStreamingContext javaStreamingContext) {
+ super(pipelineExecution, javaStreamingContext);
+ }
+
+ @Override
+ public JobApi.MetricResults portableMetrics() {
+ return JobApi.MetricResults.newBuilder()
+ .addAllAttempted(MetricsAccumulator.getInstance().value().getMonitoringInfos())
+ .build();
+ }
+ }
+
private void offerNewState(State newState) {
State oldState = this.state;
this.state = newState;
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index 304fcc1..f33cb47 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.spark;
import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.hasUnboundedPCollections;
import static org.apache.beam.runners.spark.SparkPipelineOptions.prepareFilesToStage;
import java.util.UUID;
@@ -42,7 +43,11 @@ import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.apache.beam.runners.spark.translation.SparkPortablePipelineTranslator;
+import org.apache.beam.runners.spark.translation.SparkStreamingPortablePipelineTranslator;
+import org.apache.beam.runners.spark.translation.SparkStreamingTranslationContext;
import org.apache.beam.runners.spark.translation.SparkTranslationContext;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
@@ -52,6 +57,9 @@ import org.apache.beam.sdk.options.PortablePipelineOptions.RetrievalServiceType;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
@@ -71,7 +79,13 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
@Override
public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) {
- SparkBatchPortablePipelineTranslator translator = new SparkBatchPortablePipelineTranslator();
+ SparkPortablePipelineTranslator translator;
+ boolean isStreaming = pipelineOptions.isStreaming() || hasUnboundedPCollections(pipeline);
+ if (isStreaming) {
+ translator = new SparkStreamingPortablePipelineTranslator();
+ } else {
+ translator = new SparkBatchPortablePipelineTranslator();
+ }
// Expand any splittable DoFns within the graph to enable sizing and splitting of bundles.
Pipeline pipelineWithSdfExpanded =
@@ -93,6 +107,7 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
? trimmedPipeline
: GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline();
+ // File staging.
if (pipelineOptions.getFilesToStage() == null) {
pipelineOptions.setFilesToStage(
detectClassPathResourcesToStage(
@@ -106,30 +121,88 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
pipelineOptions.getFilesToStage().size());
LOG.debug("Staging files: {}", pipelineOptions.getFilesToStage());
+ PortablePipelineResult result;
final JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineOptions);
+
LOG.info(String.format("Running job %s on Spark master %s", jobInfo.jobId(), jsc.master()));
- AggregatorsAccumulator.init(pipelineOptions, jsc);
+ // Initialize accumulators.
+ AggregatorsAccumulator.init(pipelineOptions, jsc);
MetricsEnvironment.setMetricsSupported(true);
MetricsAccumulator.init(pipelineOptions, jsc);
final SparkTranslationContext context =
- new SparkTranslationContext(jsc, pipelineOptions, jobInfo);
+ translator.createTranslationContext(jsc, pipelineOptions, jobInfo);
final ExecutorService executorService = Executors.newSingleThreadExecutor();
- final Future<?> submissionFuture =
- executorService.submit(
- () -> {
- translator.translate(fusedPipeline, context);
- LOG.info(
- String.format(
- "Job %s: Pipeline translated successfully. Computing outputs",
- jobInfo.jobId()));
- context.computeOutputs();
- LOG.info(String.format("Job %s finished.", jobInfo.jobId()));
- });
-
- PortablePipelineResult result =
- new SparkPipelineResult.PortableBatchMode(submissionFuture, jsc);
+
+ LOG.info(String.format("Running job %s on Spark master %s", jobInfo.jobId(), jsc.master()));
+
+ if (isStreaming) {
+ final JavaStreamingContext jssc =
+ ((SparkStreamingTranslationContext) context).getStreamingContext();
+
+ jssc.addStreamingListener(
+ new JavaStreamingListenerWrapper(
+ new AggregatorsAccumulator.AccumulatorCheckpointingSparkListener()));
+ jssc.addStreamingListener(
+ new JavaStreamingListenerWrapper(
+ new MetricsAccumulator.AccumulatorCheckpointingSparkListener()));
+
+ // Register user-defined listeners.
+ for (JavaStreamingListener listener :
+ pipelineOptions.as(SparkContextOptions.class).getListeners()) {
+ LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
+ jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
+ }
+
+ // Register Watermarks listener to broadcast the advanced WMs.
+ jssc.addStreamingListener(
+ new JavaStreamingListenerWrapper(
+ new GlobalWatermarkHolder.WatermarkAdvancingStreamingListener()));
+
+ jssc.checkpoint(pipelineOptions.getCheckpointDir());
+
+ // Obtain timeout from options.
+ Long timeout =
+ pipelineOptions.as(SparkPortableStreamingPipelineOptions.class).getStreamingTimeoutMs();
+
+ final Future<?> submissionFuture =
+ executorService.submit(
+ () -> {
+ translator.translate(fusedPipeline, context);
+ LOG.info(
+ String.format(
+ "Job %s: Pipeline translated successfully. Computing outputs",
+ jobInfo.jobId()));
+ context.computeOutputs();
+
+ jssc.start();
+ try {
+ jssc.awaitTerminationOrTimeout(timeout);
+ } catch (InterruptedException e) {
+ LOG.warn("Streaming context interrupted, shutting down.", e);
+ }
+ jssc.stop();
+ LOG.info(String.format("Job %s finished.", jobInfo.jobId()));
+ });
+ result = new SparkPipelineResult.PortableStreamingMode(submissionFuture, jssc);
+ } else {
+ final Future<?> submissionFuture =
+ executorService.submit(
+ () -> {
+ translator.translate(fusedPipeline, context);
+ LOG.info(
+ String.format(
+ "Job %s: Pipeline translated successfully. Computing outputs",
+ jobInfo.jobId()));
+ context.computeOutputs();
+ LOG.info(String.format("Job %s finished.", jobInfo.jobId()));
+ });
+ result = new SparkPipelineResult.PortableBatchMode(submissionFuture, jsc);
+ }
+ executorService.shutdown();
+ result.waitUntilFinish();
+
MetricsPusher metricsPusher =
new MetricsPusher(
MetricsAccumulator.getInstance().value(),
@@ -137,8 +210,6 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
result);
metricsPusher.start();
- result.waitUntilFinish();
- executorService.shutdown();
return result;
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java
new file mode 100644
index 0000000..ef0e3fa
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPortableStreamingPipelineOptions.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+
+/** Pipeline options specific to the Spark portable runner running a streaming job. */
+@Experimental
+public interface SparkPortableStreamingPipelineOptions
+ extends SparkPipelineOptions, PortablePipelineOptions {
+ @Description(
+ "Timeout for Spark portable streaming, in milliseconds."
+ + "Default (-1L) is infinity, i.e. no timeout.")
+ @Default.Long(-1L)
+ Long getStreamingTimeoutMs();
+
+ void setStreamingTimeoutMs(Long value);
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index 1919014..79f2a4e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -52,7 +52,9 @@ public final class SparkRunnerRegistrar {
@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
return ImmutableList.of(
- SparkPipelineOptions.class, SparkStructuredStreamingPipelineOptions.class);
+ SparkPipelineOptions.class,
+ SparkStructuredStreamingPipelineOptions.class,
+ SparkPortableStreamingPipelineOptions.class);
}
}
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
index 47e11cc..3de05e3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
@@ -18,6 +18,10 @@
package org.apache.beam.runners.spark.translation;
import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getExecutableStageIntermediateId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getInputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getOutputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowedValueCoder;
import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.instantiateCoder;
@@ -32,16 +36,14 @@ import java.util.Set;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
-import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.NativeTransforms;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
-import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
-import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.util.ByteArray;
@@ -57,11 +59,11 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.storage.StorageLevel;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -70,7 +72,8 @@ import org.slf4j.LoggerFactory;
import scala.Tuple2;
/** Translates a bounded portable pipeline into a Spark job. */
-public class SparkBatchPortablePipelineTranslator {
+public class SparkBatchPortablePipelineTranslator
+ implements SparkPortablePipelineTranslator<SparkTranslationContext> {
private static final Logger LOG =
LoggerFactory.getLogger(SparkBatchPortablePipelineTranslator.class);
@@ -84,6 +87,7 @@ public class SparkBatchPortablePipelineTranslator {
PTransformNode transformNode, RunnerApi.Pipeline pipeline, SparkTranslationContext context);
}
+ @Override
public Set<String> knownUrns() {
return urnToTransformTranslator.keySet();
}
@@ -108,6 +112,7 @@ public class SparkBatchPortablePipelineTranslator {
}
/** Translates pipeline from Beam into the Spark context. */
+ @Override
public void translate(final RunnerApi.Pipeline pipeline, SparkTranslationContext context) {
QueryablePipeline p =
QueryablePipeline.forTransforms(
@@ -393,30 +398,10 @@ public class SparkBatchPortablePipelineTranslator {
: new HashPartitioner(context.getSparkContext().defaultParallelism());
}
- private static String getInputId(PTransformNode transformNode) {
- return Iterables.getOnlyElement(transformNode.getTransform().getInputsMap().values());
- }
-
- private static String getOutputId(PTransformNode transformNode) {
- return Iterables.getOnlyElement(transformNode.getTransform().getOutputsMap().values());
- }
-
- private static <T> WindowedValueCoder<T> getWindowedValueCoder(
- String pCollectionId, RunnerApi.Components components) {
- PCollection pCollection = components.getPcollectionsOrThrow(pCollectionId);
- PCollectionNode pCollectionNode = PipelineNode.pCollection(pCollectionId, pCollection);
- WindowedValueCoder<T> coder;
- try {
- coder =
- (WindowedValueCoder) WireCoders.instantiateRunnerWireCoder(pCollectionNode, components);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return coder;
- }
-
- private static String getExecutableStageIntermediateId(PTransformNode transformNode) {
- return transformNode.getId();
+ @Override
+ public SparkTranslationContext createTranslationContext(
+ JavaSparkContext jsc, SparkPipelineOptions options, JobInfo jobInfo) {
+ return new SparkTranslationContext(jsc, options, jobInfo);
}
/** Predicate to determine whether a URN is a Spark native transform. */
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
index 233e095..d28b9bd 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.spark.translation;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Collections;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.List;
@@ -118,6 +119,12 @@ class SparkExecutableStageFunction<InputT, SideInputT>
@Override
public Iterator<RawUnionValue> call(Iterator<WindowedValue<InputT>> inputs) throws Exception {
+ // Do not call processElements if there are no inputs
+ // Otherwise, this may cause validation errors (e.g. ParDoTest)
+ if (!inputs.hasNext()) {
+ return Collections.emptyIterator();
+ }
+
try (ExecutableStageContext stageContext = contextFactory.get(jobInfo)) {
ExecutableStage executableStage = ExecutableStage.fromPayload(stagePayload);
try (StageBundleFactory stageBundleFactory =
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPortablePipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPortablePipelineTranslator.java
new file mode 100644
index 0000000..3dc885a
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPortablePipelineTranslator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import java.util.Set;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Interface for portable Spark translators. This allows for a uniform invocation pattern for
+ * pipeline translation between streaming and batch runners.
+ */
+public interface SparkPortablePipelineTranslator<T extends SparkTranslationContext> {
+
+ Set<String> knownUrns();
+
+ /** Translates the given pipeline. */
+ void translate(RunnerApi.Pipeline pipeline, T context);
+
+ T createTranslationContext(JavaSparkContext jsc, SparkPipelineOptions options, JobInfo jobInfo);
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
new file mode 100644
index 0000000..48312ad
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getExecutableStageIntermediateId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getInputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getOutputId;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowedValueCoder;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.SparkCompat;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+/** Translates an unbounded portable pipeline into a Spark job. */
+public class SparkStreamingPortablePipelineTranslator
+ implements SparkPortablePipelineTranslator<SparkStreamingTranslationContext> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SparkStreamingPortablePipelineTranslator.class);
+
+ private final ImmutableMap<String, PTransformTranslator> urnToTransformTranslator;
+
+ interface PTransformTranslator {
+
+ /** Translates transformNode from Beam into the Spark context. */
+ void translate(
+ PTransformNode transformNode,
+ RunnerApi.Pipeline pipeline,
+ SparkStreamingTranslationContext context);
+ }
+
+ @Override
+ public Set<String> knownUrns() {
+ return urnToTransformTranslator.keySet();
+ }
+
+ public SparkStreamingPortablePipelineTranslator() {
+ ImmutableMap.Builder<String, PTransformTranslator> translatorMap = ImmutableMap.builder();
+ translatorMap.put(
+ PTransformTranslation.IMPULSE_TRANSFORM_URN,
+ SparkStreamingPortablePipelineTranslator::translateImpulse);
+ translatorMap.put(
+ PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+ SparkStreamingPortablePipelineTranslator::translateGroupByKey);
+ translatorMap.put(
+ ExecutableStage.URN, SparkStreamingPortablePipelineTranslator::translateExecutableStage);
+ translatorMap.put(
+ PTransformTranslation.FLATTEN_TRANSFORM_URN,
+ SparkStreamingPortablePipelineTranslator::translateFlatten);
+ translatorMap.put(
+ PTransformTranslation.RESHUFFLE_URN,
+ SparkStreamingPortablePipelineTranslator::translateReshuffle);
+ this.urnToTransformTranslator = translatorMap.build();
+ }
+
+ /** Translates pipeline from Beam into the Spark context. */
+ @Override
+ public void translate(
+ final RunnerApi.Pipeline pipeline, SparkStreamingTranslationContext context) {
+ QueryablePipeline p =
+ QueryablePipeline.forTransforms(
+ pipeline.getRootTransformIdsList(), pipeline.getComponents());
+ for (PipelineNode.PTransformNode transformNode : p.getTopologicallyOrderedTransforms()) {
+ urnToTransformTranslator
+ .getOrDefault(
+ transformNode.getTransform().getSpec().getUrn(),
+ SparkStreamingPortablePipelineTranslator::urnNotFound)
+ .translate(transformNode, pipeline, context);
+ }
+ }
+
+ private static void urnNotFound(
+ PTransformNode transformNode,
+ RunnerApi.Pipeline pipeline,
+ SparkStreamingTranslationContext context) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Transform %s has unknown URN %s",
+ transformNode.getId(), transformNode.getTransform().getSpec().getUrn()));
+ }
+
+ private static void translateImpulse(
+ PTransformNode transformNode,
+ RunnerApi.Pipeline pipeline,
+ SparkStreamingTranslationContext context) {
+
+ Iterable<WindowedValue<byte[]>> windowedValues =
+ Collections.singletonList(
+ WindowedValue.of(
+ new byte[0],
+ BoundedWindow.TIMESTAMP_MIN_VALUE,
+ GlobalWindow.INSTANCE,
+ PaneInfo.NO_FIRING));
+
+ WindowedValue.FullWindowedValueCoder<byte[]> windowCoder =
+ WindowedValue.FullWindowedValueCoder.of(ByteArrayCoder.of(), GlobalWindow.Coder.INSTANCE);
+ JavaRDD<WindowedValue<byte[]>> emptyByteArrayRDD =
+ context
+ .getSparkContext()
+ .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+ .map(CoderHelpers.fromByteFunction(windowCoder));
+
+ Queue<JavaRDD<WindowedValue<byte[]>>> rddQueue = new LinkedBlockingQueue<>();
+ rddQueue.offer(emptyByteArrayRDD);
+ JavaInputDStream<WindowedValue<byte[]>> emptyByteArrayStream =
+ context.getStreamingContext().queueStream(rddQueue, true /* oneAtATime */);
+
+ UnboundedDataset<byte[]> output =
+ new UnboundedDataset<>(
+ emptyByteArrayStream,
+ Collections.singletonList(emptyByteArrayStream.inputDStream().id()));
+
+ // Add watermark to holder and advance to infinity to ensure future watermarks can be updated
+ GlobalWatermarkHolder.SparkWatermarks sparkWatermark =
+ new GlobalWatermarkHolder.SparkWatermarks(
+ GlobalWindow.INSTANCE.maxTimestamp(),
+ BoundedWindow.TIMESTAMP_MAX_VALUE,
+ context.getFirstTimestamp());
+ GlobalWatermarkHolder.add(output.getStreamSources().get(0), sparkWatermark);
+
+ context.pushDataset(getOutputId(transformNode), output);
+ }
+
+ private static <K, V> void translateGroupByKey(
+ PTransformNode transformNode,
+ RunnerApi.Pipeline pipeline,
+ SparkStreamingTranslationContext context) {
+ RunnerApi.Components components = pipeline.getComponents();
+ String inputId = getInputId(transformNode);
+ UnboundedDataset<KV<K, V>> inputDataset =
+ (UnboundedDataset<KV<K, V>>) context.popDataset(inputId);
+ List<Integer> streamSources = inputDataset.getStreamSources();
+ WindowedValue.WindowedValueCoder<KV<K, V>> inputCoder =
+ getWindowedValueCoder(inputId, components);
+ KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) inputCoder.getValueCoder();
+ WindowingStrategy windowingStrategy = getWindowingStrategy(inputId, components);
+ WindowFn<Object, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+ WindowedValue.WindowedValueCoder<V> wvCoder =
+ WindowedValue.FullWindowedValueCoder.of(
+ inputKvCoder.getValueCoder(), windowFn.windowCoder());
+
+ JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
+ SparkGroupAlsoByWindowViaWindowSet.groupByKeyAndWindow(
+ inputDataset.getDStream(),
+ inputKvCoder.getKeyCoder(),
+ wvCoder,
+ windowingStrategy,
+ context.getSerializableOptions(),
+ streamSources,
+ transformNode.getId());
+
+ context.pushDataset(
+ getOutputId(transformNode), new UnboundedDataset<>(outStream, streamSources));
+ }
+
+ private static <InputT, OutputT, SideInputT> void translateExecutableStage(
+ PTransformNode transformNode,
+ RunnerApi.Pipeline pipeline,
+ SparkStreamingTranslationContext context) {
+ RunnerApi.ExecutableStagePayload stagePayload;
+ try {
+ stagePayload =
+ RunnerApi.ExecutableStagePayload.parseFrom(
+ transformNode.getTransform().getSpec().getPayload());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ String inputPCollectionId = stagePayload.getInput();
+ UnboundedDataset<InputT> inputDataset =
+ (UnboundedDataset<InputT>) context.popDataset(inputPCollectionId);
+ List<Integer> streamSources = inputDataset.getStreamSources();
+ JavaDStream<WindowedValue<InputT>> inputDStream = inputDataset.getDStream();
+ Map<String, String> outputs = transformNode.getTransform().getOutputsMap();
+ BiMap<String, Integer> outputMap = createOutputMap(outputs.values());
+
+ RunnerApi.Components components = pipeline.getComponents();
+ Coder windowCoder =
+ getWindowingStrategy(inputPCollectionId, components).getWindowFn().windowCoder();
+
+ // TODO (BEAM-10712): handle side inputs.
+ if (stagePayload.getSideInputsCount() > 0) {
+ throw new UnsupportedOperationException(
+ "Side inputs to executable stage are currently unsupported.");
+ }
+ ImmutableMap<
+ String, Tuple2<Broadcast<List<byte[]>>, WindowedValue.WindowedValueCoder<SideInputT>>>
+ broadcastVariables = ImmutableMap.copyOf(new HashMap<>());
+
+ SparkExecutableStageFunction<InputT, SideInputT> function =
+ new SparkExecutableStageFunction<>(
+ stagePayload,
+ context.jobInfo,
+ outputMap,
+ SparkExecutableStageContextFactory.getInstance(),
+ broadcastVariables,
+ MetricsAccumulator.getInstance(),
+ windowCoder);
+ JavaDStream<RawUnionValue> staged = inputDStream.mapPartitions(function);
+
+ String intermediateId = getExecutableStageIntermediateId(transformNode);
+ context.pushDataset(
+ intermediateId,
+ new Dataset() {
+ @Override
+ public void cache(String storageLevel, Coder<?> coder) {
+ StorageLevel level = StorageLevel.fromString(storageLevel);
+ staged.persist(level);
+ }
+
+ @Override
+ public void action() {
+ // Empty function to force computation of RDD.
+ staged.foreachRDD(TranslationUtils.emptyVoidFunction());
+ }
+
+ @Override
+ public void setName(String name) {
+ // ignore
+ }
+ });
+ // Pop dataset to mark DStream as used
+ context.popDataset(intermediateId);
+
+ for (String outputId : outputs.values()) {
+ JavaDStream<WindowedValue<OutputT>> outStream =
+ staged.flatMap(new SparkExecutableStageExtractionFunction<>(outputMap.get(outputId)));
+ context.pushDataset(outputId, new UnboundedDataset<>(outStream, streamSources));
+ }
+
+ // Add sink to ensure stage is executed
+ if (outputs.isEmpty()) {
+ JavaDStream<WindowedValue<OutputT>> outStream =
+ staged.flatMap((rawUnionValue) -> Collections.emptyIterator());
+ context.pushDataset(
+ String.format("EmptyOutputSink_%d", context.nextSinkId()),
+ new UnboundedDataset<>(outStream, streamSources));
+ }
+ }
+
+ private static <T> void translateFlatten(
+ PTransformNode transformNode,
+ RunnerApi.Pipeline pipeline,
+ SparkStreamingTranslationContext context) {
+ Map<String, String> inputsMap = transformNode.getTransform().getInputsMap();
+ JavaDStream<WindowedValue<T>> unifiedStreams;
+ List<Integer> streamSources = new ArrayList<>();
+
+ if (inputsMap.isEmpty()) {
+ Queue<JavaRDD<WindowedValue<T>>> q = new LinkedBlockingQueue<>();
+ q.offer(context.getSparkContext().emptyRDD());
+ unifiedStreams = context.getStreamingContext().queueStream(q);
+ } else {
+ List<JavaDStream<WindowedValue<T>>> dStreams = new ArrayList<>();
+ for (String inputId : inputsMap.values()) {
+ Dataset dataset = context.popDataset(inputId);
+ if (dataset instanceof UnboundedDataset) {
+ UnboundedDataset<T> unboundedDataset = (UnboundedDataset<T>) dataset;
+ streamSources.addAll(unboundedDataset.getStreamSources());
+ dStreams.add(unboundedDataset.getDStream());
+ } else {
+ // create a single RDD stream.
+ Queue<JavaRDD<WindowedValue<T>>> q = new LinkedBlockingQueue<>();
+ q.offer(((BoundedDataset) dataset).getRDD());
+ // TODO (BEAM-10789): this is not recoverable from checkpoint!
+ JavaDStream<WindowedValue<T>> dStream = context.getStreamingContext().queueStream(q);
+ dStreams.add(dStream);
+ }
+ }
+ // Unify streams into a single stream.
+ unifiedStreams = SparkCompat.joinStreams(context.getStreamingContext(), dStreams);
+ }
+
+ context.pushDataset(
+ getOutputId(transformNode), new UnboundedDataset<>(unifiedStreams, streamSources));
+ }
+
+ private static <T> void translateReshuffle(
+ PTransformNode transformNode,
+ RunnerApi.Pipeline pipeline,
+ SparkStreamingTranslationContext context) {
+ String inputId = getInputId(transformNode);
+ UnboundedDataset<T> inputDataset = (UnboundedDataset<T>) context.popDataset(inputId);
+ List<Integer> streamSources = inputDataset.getStreamSources();
+ JavaDStream<WindowedValue<T>> dStream = inputDataset.getDStream();
+ WindowedValue.WindowedValueCoder<T> coder =
+ getWindowedValueCoder(inputId, pipeline.getComponents());
+
+ JavaDStream<WindowedValue<T>> reshuffledStream =
+ dStream.transform(rdd -> GroupCombineFunctions.reshuffle(rdd, coder));
+
+ context.pushDataset(
+ getOutputId(transformNode), new UnboundedDataset<>(reshuffledStream, streamSources));
+ }
+
+ @Override
+ public SparkStreamingTranslationContext createTranslationContext(
+ JavaSparkContext jsc, SparkPipelineOptions options, JobInfo jobInfo) {
+ return new SparkStreamingTranslationContext(jsc, options, jobInfo);
+ }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingTranslationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingTranslationContext.java
new file mode 100644
index 0000000..7e20b21
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingTranslationContext.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.joda.time.Instant;
+
+/**
+ * Translation context used to lazily store Spark datasets during streaming portable pipeline
+ * translation and compute them after translation.
+ */
+public class SparkStreamingTranslationContext extends SparkTranslationContext {
+ private final JavaStreamingContext streamingContext;
+ private final Instant firstTimestamp;
+
+ public SparkStreamingTranslationContext(
+ JavaSparkContext jsc, SparkPipelineOptions options, JobInfo jobInfo) {
+ super(jsc, options, jobInfo);
+ Duration batchDuration = new Duration(options.getBatchIntervalMillis());
+ this.streamingContext = new JavaStreamingContext(jsc, batchDuration);
+ this.firstTimestamp = new Instant();
+ }
+
+ public JavaStreamingContext getStreamingContext() {
+ return streamingContext;
+ }
+
+ public Instant getFirstTimestamp() {
+ return firstTimestamp;
+ }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkTranslationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkTranslationContext.java
index d7cc5e9..a96a784 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkTranslationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkTranslationContext.java
@@ -55,6 +55,10 @@ public class SparkTranslationContext {
return jsc;
}
+ public SerializablePipelineOptions getSerializableOptions() {
+ return serializablePipelineOptions;
+ }
+
/** Add output of transform to context. */
public void pushDataset(String pCollectionId, Dataset dataset) {
dataset.setName(pCollectionId);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 6ce3644..9b45f05 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -237,7 +237,7 @@ public final class StreamingTransformTranslator {
// create a single RDD stream.
Queue<JavaRDD<WindowedValue<T>>> q = new LinkedBlockingQueue<>();
q.offer(((BoundedDataset) dataset).getRDD());
- // TODO: this is not recoverable from checkpoint!
+ // TODO (BEAM-10789): this is not recoverable from checkpoint!
JavaDStream<WindowedValue<T>> dStream = context.getStreamingContext().queueStream(q);
dStreams.add(dStream);
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
index f59ee2b..ec7e00d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
@@ -48,11 +48,11 @@ public class UnboundedDataset<T> implements Dataset {
this.streamSources.addAll(streamSources);
}
- JavaDStream<WindowedValue<T>> getDStream() {
+ public JavaDStream<WindowedValue<T>> getDStream() {
return dStream;
}
- List<Integer> getStreamSources() {
+ public List<Integer> getStreamSources() {
return streamSources;
}
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index 6b273d0..652bb8f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -37,7 +37,10 @@ public class SparkRunnerRegistrarTest {
@Test
public void testOptions() {
assertEquals(
- ImmutableList.of(SparkPipelineOptions.class, SparkStructuredStreamingPipelineOptions.class),
+ ImmutableList.of(
+ SparkPipelineOptions.class,
+ SparkStructuredStreamingPipelineOptions.class,
+ SparkPortableStreamingPipelineOptions.class),
new SparkRunnerRegistrar.Options().getPipelineOptions());
}
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
index 061f5c0..39e28be 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
@@ -24,11 +24,14 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
@@ -101,7 +104,9 @@ public class SparkExecutableStageFunctionTest {
public void sdkErrorsSurfaceOnClose() throws Exception {
SparkExecutableStageFunction<Integer, ?> function = getFunction(Collections.emptyMap());
doThrow(new Exception()).when(remoteBundle).close();
- function.call(Collections.emptyIterator());
+ List<WindowedValue<Integer>> inputs = new ArrayList<>();
+ inputs.add(WindowedValue.valueInGlobalWindow(0));
+ function.call(inputs.iterator());
}
@Test
@@ -205,7 +210,9 @@ public class SparkExecutableStageFunctionTest {
when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
SparkExecutableStageFunction<Integer, ?> function = getFunction(outputTagMap);
- Iterator<RawUnionValue> iterator = function.call(Collections.emptyIterator());
+ List<WindowedValue<Integer>> inputs = new ArrayList<>();
+ inputs.add(WindowedValue.valueInGlobalWindow(0));
+ Iterator<RawUnionValue> iterator = function.call(inputs.iterator());
Iterable<RawUnionValue> iterable = () -> iterator;
assertThat(
@@ -217,13 +224,22 @@ public class SparkExecutableStageFunctionTest {
@Test
public void testStageBundleClosed() throws Exception {
SparkExecutableStageFunction<Integer, ?> function = getFunction(Collections.emptyMap());
- function.call(Collections.emptyIterator());
+ List<WindowedValue<Integer>> inputs = new ArrayList<>();
+ inputs.add(WindowedValue.valueInGlobalWindow(0));
+ function.call(inputs.iterator());
verify(stageBundleFactory).getBundle(any(), any(), any(), any());
verify(stageBundleFactory).getProcessBundleDescriptor();
verify(stageBundleFactory).close();
verifyNoMoreInteractions(stageBundleFactory);
}
+ @Test
+ public void testNoCallOnEmptyInputIterator() throws Exception {
+ SparkExecutableStageFunction<Integer, ?> function = getFunction(Collections.emptyMap());
+ function.call(Collections.emptyIterator());
+ verifyZeroInteractions(stageBundleFactory);
+ }
+
private <InputT, SideInputT> SparkExecutableStageFunction<InputT, SideInputT> getFunction(
Map<String, Integer> outputMap) {
return new SparkExecutableStageFunction<>(