You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2021/10/11 21:24:48 UTC
[beam] branch master updated: [BEAM-12875] Register file systems in
SparkExecutableStageFunction
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 36cc7b4 [BEAM-12875] Register file systems in SparkExecutableStageFunction
new 38305d9 Merge pull request #15502 from meowcakes/register_filesystems_for_artifact_retrieval_service
36cc7b4 is described below
commit 36cc7b42bd77eb1a87b3a469e5aeb58e534524db
Author: Rogan Morrow <ro...@gmail.com>
AuthorDate: Wed Oct 6 16:18:04 2021 +0800
[BEAM-12875] Register file systems in SparkExecutableStageFunction
---
.../translation/SparkBatchPortablePipelineTranslator.java | 2 ++
.../spark/translation/SparkExecutableStageFunction.java | 11 +++++++++++
.../translation/SparkStreamingPortablePipelineTranslator.java | 1 +
.../spark/translation/SparkExecutableStageFunctionTest.java | 5 +++++
4 files changed, 19 insertions(+)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
index f0d666c..62b39d7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
@@ -250,6 +250,7 @@ public class SparkBatchPortablePipelineTranslator
groupByKeyPair(inputDataset, keyCoder, wvCoder);
SparkExecutableStageFunction<KV, SideInputT> function =
new SparkExecutableStageFunction<>(
+ context.getSerializableOptions(),
stagePayload,
context.jobInfo,
outputExtractionMap,
@@ -262,6 +263,7 @@ public class SparkBatchPortablePipelineTranslator
JavaRDD<WindowedValue<InputT>> inputRdd2 = ((BoundedDataset<InputT>) inputDataset).getRDD();
SparkExecutableStageFunction<InputT, SideInputT> function2 =
new SparkExecutableStageFunction<>(
+ context.getSerializableOptions(),
stagePayload,
context.jobInfo,
outputExtractionMap,
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
index 24296bf..a78b082 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
@@ -34,6 +34,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
@@ -51,11 +52,13 @@ import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory;
import org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
@@ -86,6 +89,8 @@ class SparkExecutableStageFunction<InputT, SideInputT>
private static final Logger LOG = LoggerFactory.getLogger(SparkExecutableStageFunction.class);
+ // Pipeline options for initializing the FileSystems
+ private final SerializablePipelineOptions pipelineOptions;
private final RunnerApi.ExecutableStagePayload stagePayload;
private final Map<String, Integer> outputMap;
private final SparkExecutableStageContextFactory contextFactory;
@@ -100,6 +105,7 @@ class SparkExecutableStageFunction<InputT, SideInputT>
private transient Object currentTimerKey;
SparkExecutableStageFunction(
+ SerializablePipelineOptions pipelineOptions,
RunnerApi.ExecutableStagePayload stagePayload,
JobInfo jobInfo,
Map<String, Integer> outputMap,
@@ -107,6 +113,7 @@ class SparkExecutableStageFunction<InputT, SideInputT>
Map<String, Tuple2<Broadcast<List<byte[]>>, WindowedValueCoder<SideInputT>>> sideInputs,
MetricsContainerStepMapAccumulator metricsAccumulator,
Coder windowCoder) {
+ this.pipelineOptions = pipelineOptions;
this.stagePayload = stagePayload;
this.jobInfo = jobInfo;
this.outputMap = outputMap;
@@ -123,6 +130,10 @@ class SparkExecutableStageFunction<InputT, SideInputT>
@Override
public Iterator<RawUnionValue> call(Iterator<WindowedValue<InputT>> inputs) throws Exception {
+ SparkPipelineOptions options = pipelineOptions.get().as(SparkPipelineOptions.class);
+ // Register standard file systems.
+ FileSystems.setDefaultPipelineOptions(options);
+
// Do not call processElements if there are no inputs
// Otherwise, this may cause validation errors (e.g. ParDoTest)
if (!inputs.hasNext()) {
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
index d993459..7652c89 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
@@ -252,6 +252,7 @@ public class SparkStreamingPortablePipelineTranslator
SparkExecutableStageFunction<InputT, SideInputT> function =
new SparkExecutableStageFunction<>(
+ context.getSerializableOptions(),
stagePayload,
context.jobInfo,
outputMap,
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
index 24d69a5..2e74eb0 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
@@ -53,6 +54,7 @@ import org.apache.beam.runners.fnexecution.control.TimerReceiverFactory;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -76,6 +78,8 @@ public class SparkExecutableStageFunctionTest {
@Mock private MetricsContainerStepMap stepMap;
@Mock private MetricsContainerImpl container;
+ private final SerializablePipelineOptions pipelineOptions =
+ new SerializablePipelineOptions(PipelineOptionsFactory.create());
private final String inputId = "input-id";
private final ExecutableStagePayload stagePayload =
ExecutableStagePayload.newBuilder()
@@ -258,6 +262,7 @@ public class SparkExecutableStageFunctionTest {
private <InputT, SideInputT> SparkExecutableStageFunction<InputT, SideInputT> getFunction(
Map<String, Integer> outputMap) {
return new SparkExecutableStageFunction<>(
+ pipelineOptions,
stagePayload,
null,
outputMap,