You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2021/10/11 21:24:48 UTC

[beam] branch master updated: [BEAM-12875] Register file systems in SparkExecutableStageFunction

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 36cc7b4  [BEAM-12875] Register file systems in SparkExecutableStageFunction
     new 38305d9  Merge pull request #15502 from meowcakes/register_filesystems_for_artifact_retrieval_service
36cc7b4 is described below

commit 36cc7b42bd77eb1a87b3a469e5aeb58e534524db
Author: Rogan Morrow <ro...@gmail.com>
AuthorDate: Wed Oct 6 16:18:04 2021 +0800

    [BEAM-12875] Register file systems in SparkExecutableStageFunction
---
 .../translation/SparkBatchPortablePipelineTranslator.java     |  2 ++
 .../spark/translation/SparkExecutableStageFunction.java       | 11 +++++++++++
 .../translation/SparkStreamingPortablePipelineTranslator.java |  1 +
 .../spark/translation/SparkExecutableStageFunctionTest.java   |  5 +++++
 4 files changed, 19 insertions(+)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
index f0d666c..62b39d7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java
@@ -250,6 +250,7 @@ public class SparkBatchPortablePipelineTranslator
           groupByKeyPair(inputDataset, keyCoder, wvCoder);
       SparkExecutableStageFunction<KV, SideInputT> function =
           new SparkExecutableStageFunction<>(
+              context.getSerializableOptions(),
               stagePayload,
               context.jobInfo,
               outputExtractionMap,
@@ -262,6 +263,7 @@ public class SparkBatchPortablePipelineTranslator
       JavaRDD<WindowedValue<InputT>> inputRdd2 = ((BoundedDataset<InputT>) inputDataset).getRDD();
       SparkExecutableStageFunction<InputT, SideInputT> function2 =
           new SparkExecutableStageFunction<>(
+              context.getSerializableOptions(),
               stagePayload,
               context.jobInfo,
               outputExtractionMap,
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
index 24296bf..a78b082 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
@@ -34,6 +34,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.Timer;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
@@ -51,11 +52,13 @@ import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
 import org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory;
 import org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
 import org.apache.beam.runners.spark.util.ByteArray;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -86,6 +89,8 @@ class SparkExecutableStageFunction<InputT, SideInputT>
 
   private static final Logger LOG = LoggerFactory.getLogger(SparkExecutableStageFunction.class);
 
+  // Pipeline options for initializing the FileSystems
+  private final SerializablePipelineOptions pipelineOptions;
   private final RunnerApi.ExecutableStagePayload stagePayload;
   private final Map<String, Integer> outputMap;
   private final SparkExecutableStageContextFactory contextFactory;
@@ -100,6 +105,7 @@ class SparkExecutableStageFunction<InputT, SideInputT>
   private transient Object currentTimerKey;
 
   SparkExecutableStageFunction(
+      SerializablePipelineOptions pipelineOptions,
       RunnerApi.ExecutableStagePayload stagePayload,
       JobInfo jobInfo,
       Map<String, Integer> outputMap,
@@ -107,6 +113,7 @@ class SparkExecutableStageFunction<InputT, SideInputT>
       Map<String, Tuple2<Broadcast<List<byte[]>>, WindowedValueCoder<SideInputT>>> sideInputs,
       MetricsContainerStepMapAccumulator metricsAccumulator,
       Coder windowCoder) {
+    this.pipelineOptions = pipelineOptions;
     this.stagePayload = stagePayload;
     this.jobInfo = jobInfo;
     this.outputMap = outputMap;
@@ -123,6 +130,10 @@ class SparkExecutableStageFunction<InputT, SideInputT>
 
   @Override
   public Iterator<RawUnionValue> call(Iterator<WindowedValue<InputT>> inputs) throws Exception {
+    SparkPipelineOptions options = pipelineOptions.get().as(SparkPipelineOptions.class);
+    // Register standard file systems.
+    FileSystems.setDefaultPipelineOptions(options);
+
     // Do not call processElements if there are no inputs
     // Otherwise, this may cause validation errors (e.g. ParDoTest)
     if (!inputs.hasNext()) {
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
index d993459..7652c89 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java
@@ -252,6 +252,7 @@ public class SparkStreamingPortablePipelineTranslator
 
     SparkExecutableStageFunction<InputT, SideInputT> function =
         new SparkExecutableStageFunction<>(
+            context.getSerializableOptions(),
             stagePayload,
             context.jobInfo,
             outputMap,
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
index 24d69a5..2e74eb0 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.Timer;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
@@ -53,6 +54,7 @@ import org.apache.beam.runners.fnexecution.control.TimerReceiverFactory;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
 import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -76,6 +78,8 @@ public class SparkExecutableStageFunctionTest {
   @Mock private MetricsContainerStepMap stepMap;
   @Mock private MetricsContainerImpl container;
 
+  private final SerializablePipelineOptions pipelineOptions =
+      new SerializablePipelineOptions(PipelineOptionsFactory.create());
   private final String inputId = "input-id";
   private final ExecutableStagePayload stagePayload =
       ExecutableStagePayload.newBuilder()
@@ -258,6 +262,7 @@ public class SparkExecutableStageFunctionTest {
   private <InputT, SideInputT> SparkExecutableStageFunction<InputT, SideInputT> getFunction(
       Map<String, Integer> outputMap) {
     return new SparkExecutableStageFunction<>(
+        pipelineOptions,
         stagePayload,
         null,
         outputMap,