You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/02 18:17:09 UTC

[1/4] beam git commit: Introduces SerializablePipelineOptions in core-construction

Repository: beam
Updated Branches:
  refs/heads/master 0a358c780 -> 339976c9f


http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 1385e07..1263618 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -31,6 +31,7 @@ import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.UnsupportedSideInputReader;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.TriggerTranslation;
 import org.apache.beam.runners.core.metrics.CounterCell;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
@@ -38,7 +39,6 @@ import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.runners.spark.translation.TranslationUtils;
 import org.apache.beam.runners.spark.translation.WindowingHelpers;
 import org.apache.beam.runners.spark.util.ByteArray;
@@ -108,11 +108,11 @@ public class SparkGroupAlsoByWindowViaWindowSet {
           final Coder<K> keyCoder,
           final Coder<WindowedValue<InputT>> wvCoder,
           final WindowingStrategy<?, W> windowingStrategy,
-          final SparkRuntimeContext runtimeContext,
+          final SerializablePipelineOptions options,
           final List<Integer> sourceIds) {
 
     final long batchDurationMillis =
-        runtimeContext.getPipelineOptions().as(SparkPipelineOptions.class).getBatchIntervalMillis();
+        options.get().as(SparkPipelineOptions.class).getBatchIntervalMillis();
     final IterableCoder<WindowedValue<InputT>> itrWvCoder = IterableCoder.of(wvCoder);
     final Coder<InputT> iCoder = ((FullWindowedValueCoder<InputT>) wvCoder).getValueCoder();
     final Coder<? extends BoundedWindow> wCoder =
@@ -123,7 +123,7 @@ public class SparkGroupAlsoByWindowViaWindowSet {
         TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
 
     long checkpointDurationMillis =
-        runtimeContext.getPipelineOptions().as(SparkPipelineOptions.class)
+        options.get().as(SparkPipelineOptions.class)
             .getCheckpointDurationMillis();
 
     // we have to switch to Scala API to avoid Optional in the Java API, see: SPARK-4819.
@@ -268,7 +268,7 @@ public class SparkGroupAlsoByWindowViaWindowSet {
                               outputHolder,
                               new UnsupportedSideInputReader("GroupAlsoByWindow"),
                               reduceFn,
-                              runtimeContext.getPipelineOptions());
+                              options.get());
 
                       outputHolder.clear(); // clear before potential use.
                       if (!seq.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 549bd30..1b54478 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -27,12 +27,12 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.EmptyCheckpointMark;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
 import org.apache.beam.runners.spark.io.SparkUnboundedSource.Metadata;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -91,7 +91,7 @@ public class StateSpecFunctions {
    *
    * <p>See also <a href="https://issues.apache.org/jira/browse/SPARK-4819">SPARK-4819</a>.</p>
    *
-   * @param runtimeContext    A serializable {@link SparkRuntimeContext}.
+   * @param options           A serializable {@link SerializablePipelineOptions}.
    * @param <T>               The type of the input stream elements.
    * @param <CheckpointMarkT> The type of the {@link UnboundedSource.CheckpointMark}.
    * @return The appropriate {@link org.apache.spark.streaming.StateSpec} function.
@@ -99,7 +99,7 @@ public class StateSpecFunctions {
   public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
   scala.Function3<Source<T>, scala.Option<CheckpointMarkT>, State<Tuple2<byte[], Instant>>,
       Tuple2<Iterable<byte[]>, Metadata>> mapSourceFunction(
-           final SparkRuntimeContext runtimeContext, final String stepName) {
+      final SerializablePipelineOptions options, final String stepName) {
 
     return new SerializableFunction3<Source<T>, Option<CheckpointMarkT>,
         State<Tuple2<byte[], Instant>>, Tuple2<Iterable<byte[]>, Metadata>>() {
@@ -151,7 +151,7 @@ public class StateSpecFunctions {
         try {
           microbatchReader =
               (MicrobatchSource.Reader)
-                  microbatchSource.getOrCreateReader(runtimeContext.getPipelineOptions(),
+                  microbatchSource.getOrCreateReader(options.get(),
                                                      checkpointMark);
         } catch (IOException e) {
           throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 23e430a..463e507 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -26,6 +26,7 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
@@ -50,7 +51,6 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
 public class EvaluationContext {
   private final JavaSparkContext jsc;
   private JavaStreamingContext jssc;
-  private final SparkRuntimeContext runtime;
   private final Pipeline pipeline;
   private final Map<PValue, Dataset> datasets = new LinkedHashMap<>();
   private final Map<PValue, Dataset> pcollections = new LinkedHashMap<>();
@@ -60,12 +60,13 @@ public class EvaluationContext {
   private final SparkPCollectionView pviews = new SparkPCollectionView();
   private final Map<PCollection, Long> cacheCandidates = new HashMap<>();
   private final PipelineOptions options;
+  private final SerializablePipelineOptions serializableOptions;
 
   public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, PipelineOptions options) {
     this.jsc = jsc;
     this.pipeline = pipeline;
     this.options = options;
-    this.runtime = new SparkRuntimeContext(options);
+    this.serializableOptions = new SerializablePipelineOptions(options);
   }
 
   public EvaluationContext(
@@ -90,8 +91,8 @@ public class EvaluationContext {
     return options;
   }
 
-  public SparkRuntimeContext getRuntimeContext() {
-    return runtime;
+  public SerializablePipelineOptions getSerializableOptions() {
+    return serializableOptions;
   }
 
   public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
@@ -254,7 +255,7 @@ public class EvaluationContext {
   }
 
   private String storageLevel() {
-    return runtime.getPipelineOptions().as(SparkPipelineOptions.class).getStorageLevel();
+    return serializableOptions.get().as(SparkPipelineOptions.class).getStorageLevel();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 23d5b32..7299583 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -34,8 +34,8 @@ import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
-import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -59,11 +59,10 @@ import scala.Tuple2;
 public class MultiDoFnFunction<InputT, OutputT>
     implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, TupleTag<?>, WindowedValue<?>> {
 
-  private final Accumulator<NamedAggregators> aggAccum;
   private final Accumulator<MetricsContainerStepMap> metricsAccum;
   private final String stepName;
   private final DoFn<InputT, OutputT> doFn;
-  private final SparkRuntimeContext runtimeContext;
+  private final SerializablePipelineOptions options;
   private final TupleTag<OutputT> mainOutputTag;
   private final List<TupleTag<?>> additionalOutputTags;
   private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
@@ -71,10 +70,9 @@ public class MultiDoFnFunction<InputT, OutputT>
   private final boolean stateful;
 
   /**
-   * @param aggAccum       The Spark {@link Accumulator} that backs the Beam Aggregators.
    * @param metricsAccum       The Spark {@link Accumulator} that backs the Beam metrics.
    * @param doFn              The {@link DoFn} to be wrapped.
-   * @param runtimeContext    The {@link SparkRuntimeContext}.
+   * @param options    The {@link SerializablePipelineOptions}.
    * @param mainOutputTag     The main output {@link TupleTag}.
    * @param additionalOutputTags Additional {@link TupleTag output tags}.
    * @param sideInputs        Side inputs used in this {@link DoFn}.
@@ -82,21 +80,19 @@ public class MultiDoFnFunction<InputT, OutputT>
    * @param stateful          Stateful {@link DoFn}.
    */
   public MultiDoFnFunction(
-      Accumulator<NamedAggregators> aggAccum,
       Accumulator<MetricsContainerStepMap> metricsAccum,
       String stepName,
       DoFn<InputT, OutputT> doFn,
-      SparkRuntimeContext runtimeContext,
+      SerializablePipelineOptions options,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
       Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
       WindowingStrategy<?, ?> windowingStrategy,
       boolean stateful) {
-    this.aggAccum = aggAccum;
     this.metricsAccum = metricsAccum;
     this.stepName = stepName;
     this.doFn = doFn;
-    this.runtimeContext = runtimeContext;
+    this.options = options;
     this.mainOutputTag = mainOutputTag;
     this.additionalOutputTags = additionalOutputTags;
     this.sideInputs = sideInputs;
@@ -140,7 +136,7 @@ public class MultiDoFnFunction<InputT, OutputT>
 
     final DoFnRunner<InputT, OutputT> doFnRunner =
         DoFnRunners.simpleRunner(
-            runtimeContext.getPipelineOptions(),
+            options.get(),
             doFn,
             new SparkSideInputReader(sideInputs),
             outputManager,

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
index 315f7fb..d8d71ff 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
@@ -30,6 +30,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -48,16 +49,16 @@ import org.apache.beam.sdk.values.WindowingStrategy;
  * {@link org.apache.beam.sdk.transforms.Combine.CombineFn}.
  */
 public class SparkAbstractCombineFn implements Serializable {
-  protected final SparkRuntimeContext runtimeContext;
+  protected final SerializablePipelineOptions options;
   protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
   protected final WindowingStrategy<?, BoundedWindow> windowingStrategy;
 
 
   public SparkAbstractCombineFn(
-      SparkRuntimeContext runtimeContext,
+      SerializablePipelineOptions options,
       Map<TupleTag<?>,  KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
       WindowingStrategy<?, ?> windowingStrategy) {
-    this.runtimeContext = runtimeContext;
+    this.options = options;
     this.sideInputs = sideInputs;
     this.windowingStrategy = (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
   }
@@ -71,7 +72,7 @@ public class SparkAbstractCombineFn implements Serializable {
   private transient SparkCombineContext combineContext;
   protected SparkCombineContext ctxtForInput(WindowedValue<?> input) {
     if (combineContext == null) {
-      combineContext = new SparkCombineContext(runtimeContext.getPipelineOptions(),
+      combineContext = new SparkCombineContext(options.get(),
           new SparkSideInputReader(sideInputs));
     }
     return combineContext.forInput(input);

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
index d0e9038..81416a3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -49,10 +50,10 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract
 
   public SparkGlobalCombineFn(
       CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
-      SparkRuntimeContext runtimeContext,
+      SerializablePipelineOptions options,
       Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
       WindowingStrategy<?, ?> windowingStrategy) {
-    super(runtimeContext, sideInputs, windowingStrategy);
+    super(options, sideInputs, windowingStrategy);
     this.combineFn = combineFn;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index d2a3424..fcf438c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.UnsupportedSideInputReader;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.TriggerTranslation;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
@@ -55,18 +56,18 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde
   private final WindowingStrategy<?, W> windowingStrategy;
   private final StateInternalsFactory<K> stateInternalsFactory;
   private final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn;
-  private final SparkRuntimeContext runtimeContext;
+  private final SerializablePipelineOptions options;
 
   public SparkGroupAlsoByWindowViaOutputBufferFn(
       WindowingStrategy<?, W> windowingStrategy,
       StateInternalsFactory<K> stateInternalsFactory,
       SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn,
-      SparkRuntimeContext runtimeContext,
+      SerializablePipelineOptions options,
       Accumulator<NamedAggregators> accumulator) {
     this.windowingStrategy = windowingStrategy;
     this.stateInternalsFactory = stateInternalsFactory;
     this.reduceFn = reduceFn;
-    this.runtimeContext = runtimeContext;
+    this.options = options;
   }
 
   @Override
@@ -98,7 +99,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde
             outputter,
             new UnsupportedSideInputReader("GroupAlsoByWindow"),
             reduceFn,
-            runtimeContext.getPipelineOptions());
+            options.get());
 
     // Process the grouped values.
     reduceFnRunner.processElements(values);

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
index 7ac8e7d..55392e9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -49,10 +50,10 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra
 
   public SparkKeyedCombineFn(
       CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
-      SparkRuntimeContext runtimeContext,
+      SerializablePipelineOptions options,
       Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
       WindowingStrategy<?, ?> windowingStrategy) {
-    super(runtimeContext, sideInputs, windowingStrategy);
+    super(options, sideInputs, windowingStrategy);
     this.combineFn = combineFn;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
deleted file mode 100644
index 6361bb2..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Function;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
-
-/**
- * The SparkRuntimeContext allows us to define useful features on the client side before our
- * data flow program is launched.
- */
-public class SparkRuntimeContext implements Serializable {
-  private final Supplier<PipelineOptions> optionsSupplier;
-  private transient CoderRegistry coderRegistry;
-
-  SparkRuntimeContext(PipelineOptions options) {
-    String serializedPipelineOptions = serializePipelineOptions(options);
-    this.optionsSupplier =
-        Suppliers.memoize(
-            Suppliers.compose(
-                new DeserializeOptions(),
-                Suppliers.ofInstance(serializedPipelineOptions)));
-  }
-
-  /**
-   * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
-   * for user specified configuration injection into the ObjectMapper. This supports user custom
-   * types on {@link PipelineOptions}.
-   */
-  private static ObjectMapper createMapper() {
-    return new ObjectMapper().registerModules(
-        ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
-  }
-
-  private String serializePipelineOptions(PipelineOptions pipelineOptions) {
-    try {
-      return createMapper().writeValueAsString(pipelineOptions);
-    } catch (JsonProcessingException e) {
-      throw new IllegalStateException("Failed to serialize the pipeline options.", e);
-    }
-  }
-
-  public PipelineOptions getPipelineOptions() {
-    return optionsSupplier.get();
-  }
-
-  public CoderRegistry getCoderRegistry() {
-    if (coderRegistry == null) {
-      coderRegistry = CoderRegistry.createDefault();
-    }
-    return coderRegistry;
-  }
-
-  private static class DeserializeOptions
-      implements Function<String, PipelineOptions>, Serializable {
-    @Override
-    public PipelineOptions apply(String options) {
-      try {
-        return createMapper().readValue(options, PipelineOptions.class);
-      } catch (IOException e) {
-        throw new IllegalStateException("Failed to deserialize the pipeline options.", e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index ac5e0cd..e060e1d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -146,7 +146,7 @@ public final class TransformTranslator {
                 windowingStrategy,
                 new TranslationUtils.InMemoryStateInternalsFactory<K>(),
                 SystemReduceFn.<K, V, W>buffering(coder.getValueCoder()),
-                context.getRuntimeContext(),
+                context.getSerializableOptions(),
                 accum));
 
         context.putDataset(transform, new BoundedDataset<>(groupedAlsoByWindow));
@@ -171,7 +171,7 @@ public final class TransformTranslator {
                   (CombineWithContext.CombineFnWithContext<InputT, ?, OutputT>)
                       CombineFnUtil.toFnWithContext(transform.getFn());
               final SparkKeyedCombineFn<K, InputT, ?, OutputT> sparkCombineFn =
-                  new SparkKeyedCombineFn<>(combineFn, context.getRuntimeContext(),
+                  new SparkKeyedCombineFn<>(combineFn, context.getSerializableOptions(),
                       TranslationUtils.getSideInputs(transform.getSideInputs(), context),
                           context.getInput(transform).getWindowingStrategy());
 
@@ -222,18 +222,18 @@ public final class TransformTranslator {
             final WindowedValue.FullWindowedValueCoder<OutputT> wvoCoder =
                 WindowedValue.FullWindowedValueCoder.of(oCoder,
                     windowingStrategy.getWindowFn().windowCoder());
-            final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
             final boolean hasDefault = transform.isInsertDefault();
 
             final SparkGlobalCombineFn<InputT, AccumT, OutputT> sparkCombineFn =
                 new SparkGlobalCombineFn<>(
                     combineFn,
-                    runtimeContext,
+                    context.getSerializableOptions(),
                     TranslationUtils.getSideInputs(transform.getSideInputs(), context),
                     windowingStrategy);
             final Coder<AccumT> aCoder;
             try {
-              aCoder = combineFn.getAccumulatorCoder(runtimeContext.getCoderRegistry(), iCoder);
+              aCoder = combineFn.getAccumulatorCoder(
+                  context.getPipeline().getCoderRegistry(), iCoder);
             } catch (CannotProvideCoderException e) {
               throw new IllegalStateException("Could not determine coder for accumulator", e);
             }
@@ -295,16 +295,16 @@ public final class TransformTranslator {
             (CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT>)
                 CombineFnUtil.toFnWithContext(transform.getFn());
         final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
             TranslationUtils.getSideInputs(transform.getSideInputs(), context);
         final SparkKeyedCombineFn<K, InputT, AccumT, OutputT> sparkCombineFn =
-            new SparkKeyedCombineFn<>(combineFn, runtimeContext, sideInputs, windowingStrategy);
+            new SparkKeyedCombineFn<>(
+                combineFn, context.getSerializableOptions(), sideInputs, windowingStrategy);
         final Coder<AccumT> vaCoder;
         try {
           vaCoder =
               combineFn.getAccumulatorCoder(
-                  runtimeContext.getCoderRegistry(), inputCoder.getValueCoder());
+                  context.getPipeline().getCoderRegistry(), inputCoder.getValueCoder());
         } catch (CannotProvideCoderException e) {
           throw new IllegalStateException("Could not determine coder for accumulator", e);
         }
@@ -360,7 +360,6 @@ public final class TransformTranslator {
             ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
         WindowingStrategy<?, ?> windowingStrategy =
             context.getInput(transform).getWindowingStrategy();
-        Accumulator<NamedAggregators> aggAccum = AggregatorsAccumulator.getInstance();
         Accumulator<MetricsContainerStepMap> metricsAccum = MetricsAccumulator.getInstance();
 
         JavaPairRDD<TupleTag<?>, WindowedValue<?>> all;
@@ -370,11 +369,10 @@ public final class TransformTranslator {
             || signature.timerDeclarations().size() > 0;
 
         MultiDoFnFunction<InputT, OutputT> multiDoFnFunction = new MultiDoFnFunction<>(
-            aggAccum,
             metricsAccum,
             stepName,
             doFn,
-            context.getRuntimeContext(),
+            context.getSerializableOptions(),
             transform.getMainOutputTag(),
             transform.getAdditionalOutputTags().getAll(),
             TranslationUtils.getSideInputs(transform.getSideInputs(), context),
@@ -452,10 +450,11 @@ public final class TransformTranslator {
       public void evaluate(Read.Bounded<T> transform, EvaluationContext context) {
         String stepName = context.getCurrentTransform().getFullName();
         final JavaSparkContext jsc = context.getSparkContext();
-        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         // create an RDD from a BoundedSource.
-        JavaRDD<WindowedValue<T>> input = new SourceRDD.Bounded<>(
-            jsc.sc(), transform.getSource(), runtimeContext, stepName).toJavaRDD();
+        JavaRDD<WindowedValue<T>> input =
+            new SourceRDD.Bounded<>(
+                    jsc.sc(), transform.getSource(), context.getSerializableOptions(), stepName)
+                .toJavaRDD();
         // cache to avoid re-evaluation of the source by Spark's lazy DAG evaluation.
         context.putDataset(transform, new BoundedDataset<>(input.cache()));
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index cd5bb3e..38d6119 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -32,9 +32,8 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 import javax.annotation.Nonnull;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
-import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
-import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.ConsoleIO;
 import org.apache.beam.runners.spark.io.CreateStream;
@@ -50,7 +49,6 @@ import org.apache.beam.runners.spark.translation.SparkAssignWindowFn;
 import org.apache.beam.runners.spark.translation.SparkKeyedCombineFn;
 import org.apache.beam.runners.spark.translation.SparkPCollectionView;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TranslationUtils;
 import org.apache.beam.runners.spark.translation.WindowingHelpers;
@@ -125,7 +123,7 @@ public final class StreamingTransformTranslator {
             transform,
             SparkUnboundedSource.read(
                 context.getStreamingContext(),
-                context.getRuntimeContext(),
+                context.getSerializableOptions(),
                 transform.getSource(),
                 stepName));
       }
@@ -273,7 +271,6 @@ public final class StreamingTransformTranslator {
         JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream();
         @SuppressWarnings("unchecked")
         final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
-        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         @SuppressWarnings("unchecked")
         final WindowingStrategy<?, W> windowingStrategy =
             (WindowingStrategy<?, W>) context.getInput(transform).getWindowingStrategy();
@@ -303,7 +300,7 @@ public final class StreamingTransformTranslator {
                 coder.getKeyCoder(),
                 wvCoder,
                 windowingStrategy,
-                runtimeContext,
+                context.getSerializableOptions(),
                 streamSources);
 
         context.putDataset(transform, new UnboundedDataset<>(outStream, streamSources));
@@ -336,7 +333,7 @@ public final class StreamingTransformTranslator {
             ((UnboundedDataset<KV<K, Iterable<InputT>>>) context.borrowDataset(transform));
         JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> dStream = unboundedDataset.getDStream();
 
-        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
+        final SerializablePipelineOptions options = context.getSerializableOptions();
         final SparkPCollectionView pviews = context.getPViews();
 
         JavaDStream<WindowedValue<KV<K, OutputT>>> outStream = dStream.transform(
@@ -347,7 +344,7 @@ public final class StreamingTransformTranslator {
                     call(JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>> rdd)
                         throws Exception {
                         SparkKeyedCombineFn<K, InputT, ?, OutputT> combineFnWithContext =
-                            new SparkKeyedCombineFn<>(fn, runtimeContext,
+                            new SparkKeyedCombineFn<>(fn, options,
                                 TranslationUtils.getSideInputs(transform.getSideInputs(),
                                 new JavaSparkContext(rdd.context()), pviews),
                                 windowingStrategy);
@@ -374,7 +371,7 @@ public final class StreamingTransformTranslator {
         final DoFn<InputT, OutputT> doFn = transform.getFn();
         rejectSplittable(doFn);
         rejectStateAndTimers(doFn);
-        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
+        final SerializablePipelineOptions options = context.getSerializableOptions();
         final SparkPCollectionView pviews = context.getPViews();
         final WindowingStrategy<?, ?> windowingStrategy =
             context.getInput(transform).getWindowingStrategy();
@@ -393,8 +390,6 @@ public final class StreamingTransformTranslator {
                   @Override
                   public JavaPairRDD<TupleTag<?>, WindowedValue<?>> call(
                       JavaRDD<WindowedValue<InputT>> rdd) throws Exception {
-                    final Accumulator<NamedAggregators> aggAccum =
-                        AggregatorsAccumulator.getInstance();
                     final Accumulator<MetricsContainerStepMap> metricsAccum =
                         MetricsAccumulator.getInstance();
                     final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
@@ -405,11 +400,10 @@ public final class StreamingTransformTranslator {
                             pviews);
                     return rdd.mapPartitionsToPair(
                         new MultiDoFnFunction<>(
-                            aggAccum,
                             metricsAccum,
                             stepName,
                             doFn,
-                            runtimeContext,
+                            options,
                             transform.getMainOutputTag(),
                             transform.getAdditionalOutputTags().getAll(),
                             sideInputs,

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
deleted file mode 100644
index 456056a..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import static org.junit.Assert.assertEquals;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.auto.service.AutoService;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.CrashingRunner;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link SparkRuntimeContext}.
- */
-@RunWith(JUnit4.class)
-public class SparkRuntimeContextTest {
-  /** PipelineOptions used to test auto registration of Jackson modules. */
-  public interface JacksonIncompatibleOptions extends PipelineOptions {
-    JacksonIncompatible getJacksonIncompatible();
-    void setJacksonIncompatible(JacksonIncompatible value);
-  }
-
-  /** A Jackson {@link Module} to test auto-registration of modules. */
-  @AutoService(Module.class)
-  public static class RegisteredTestModule extends SimpleModule {
-    public RegisteredTestModule() {
-      super("RegisteredTestModule");
-      setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class);
-    }
-  }
-
-  /** A class which Jackson does not know how to serialize/deserialize. */
-  public static class JacksonIncompatible {
-    private final String value;
-    public JacksonIncompatible(String value) {
-      this.value = value;
-    }
-  }
-
-  /** A Jackson mixin used to add annotations to other classes. */
-  @JsonDeserialize(using = JacksonIncompatibleDeserializer.class)
-  @JsonSerialize(using = JacksonIncompatibleSerializer.class)
-  public static final class JacksonIncompatibleMixin {}
-
-  /** A Jackson deserializer for {@link JacksonIncompatible}. */
-  public static class JacksonIncompatibleDeserializer extends
-      JsonDeserializer<JacksonIncompatible> {
-
-    @Override
-    public JacksonIncompatible deserialize(JsonParser jsonParser,
-        DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
-      return new JacksonIncompatible(jsonParser.readValueAs(String.class));
-    }
-  }
-
-  /** A Jackson serializer for {@link JacksonIncompatible}. */
-  public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> {
-
-    @Override
-    public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator,
-        SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
-      jsonGenerator.writeString(jacksonIncompatible.value);
-    }
-  }
-
-  @Test
-  public void testSerializingPipelineOptionsWithCustomUserType() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.fromArgs("--jacksonIncompatible=\"testValue\"")
-        .as(JacksonIncompatibleOptions.class);
-    options.setRunner(CrashingRunner.class);
-    Pipeline p = Pipeline.create(options);
-    SparkRuntimeContext context = new SparkRuntimeContext(options);
-
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    try (ObjectOutputStream outputStream = new ObjectOutputStream(baos)) {
-      outputStream.writeObject(context);
-    }
-    try (ObjectInputStream inputStream =
-        new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()))) {
-      SparkRuntimeContext copy = (SparkRuntimeContext) inputStream.readObject();
-      assertEquals("testValue",
-          copy.getPipelineOptions().as(JacksonIncompatibleOptions.class)
-              .getJacksonIncompatible().value);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index 9a4d25a..5cc0b3f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -176,7 +176,12 @@ import org.joda.time.format.DateTimeFormatter;
  *
  * <h2>Serialization Of PipelineOptions</h2>
  *
- * {@link PipelineRunner}s require support for options to be serialized. Each property
+ * {@link PipelineOptions} is intentionally <i>not</i> marked {@link java.io.Serializable}, in order
+ * to discourage pipeline authors from capturing {@link PipelineOptions} at pipeline construction
+ * time, because a pipeline may be saved as a template and run with a different set of options
+ * than the ones it was constructed with. See {@link Pipeline#run(PipelineOptions)}.
+ *
+ * <p>However, {@link PipelineRunner}s require support for options to be serialized. Each property
  * within {@link PipelineOptions} must be able to be serialized using Jackson's
  * {@link ObjectMapper} or the getter method for the property annotated with
  * {@link JsonIgnore @JsonIgnore}.

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
index bc479a2..2fffffa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProviders.java
@@ -19,11 +19,9 @@ package org.apache.beam.sdk.options;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.IOException;
 import java.util.Map;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
 
 /**
  * Utilities for working with the {@link ValueProvider} interface.
@@ -37,11 +35,9 @@ public class ValueProviders {
    */
   public static String updateSerializedOptions(
       String serializedOptions, Map<String, String> runtimeValues) {
-    ObjectMapper mapper = new ObjectMapper().registerModules(
-        ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
     ObjectNode root, options;
     try {
-      root = mapper.readValue(serializedOptions, ObjectNode.class);
+      root = PipelineOptionsFactory.MAPPER.readValue(serializedOptions, ObjectNode.class);
       options = (ObjectNode) root.get("options");
       checkNotNull(options, "Unable to locate 'options' in %s", serializedOptions);
     } catch (IOException e) {
@@ -53,7 +49,7 @@ public class ValueProviders {
       options.put(entry.getKey(), entry.getValue());
     }
     try {
-      return mapper.writeValueAsString(root);
+      return PipelineOptionsFactory.MAPPER.writeValueAsString(root);
     } catch (IOException e) {
       throw new RuntimeException("Unable to parse re-serialize options", e);
     }


[2/4] beam git commit: Introduces SerializablePipelineOptions in core-construction

Posted by jk...@apache.org.
Introduces SerializablePipelineOptions in core-construction

Removes analogous classes from spark/flink and their tests.

The analogous class in Spark was SparkRuntimeContext, which
also contained a CoderRegistry, but the CoderRegistry was used
only in a class that was itself unused. I removed that class.

This also allows removing a bunch of Jackson dependencies
from Spark runner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7db051ae
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7db051ae
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7db051ae

Branch: refs/heads/master
Commit: 7db051aeae2b8e6b2dbfcc1da31410ec118299f6
Parents: ff4b36c
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jul 28 12:48:41 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Aug 2 11:04:50 2017 -0700

----------------------------------------------------------------------
 runners/apex/pom.xml                            |   8 -
 .../operators/ApexGroupByKeyOperator.java       |   6 +-
 .../operators/ApexParDoOperator.java            |   6 +-
 .../ApexReadUnboundedInputOperator.java         |   6 +-
 .../utils/SerializablePipelineOptions.java      |  78 ---------
 .../translation/utils/PipelineOptionsTest.java  | 150 -----------------
 runners/core-construction-java/pom.xml          |  15 ++
 .../SerializablePipelineOptions.java            |  74 +++++++++
 .../SerializablePipelineOptionsTest.java        |  89 ++++++++++
 runners/flink/pom.xml                           |  10 --
 .../functions/FlinkDoFnFunction.java            |  10 +-
 .../FlinkMergingNonShuffleReduceFunction.java   |   8 +-
 .../functions/FlinkPartialReduceFunction.java   |   8 +-
 .../functions/FlinkReduceFunction.java          |   8 +-
 .../functions/FlinkStatefulDoFnFunction.java    |  10 +-
 .../utils/SerializedPipelineOptions.java        |  77 ---------
 .../translation/wrappers/SourceInputFormat.java |  10 +-
 .../wrappers/streaming/DoFnOperator.java        |  10 +-
 .../streaming/SplittableDoFnOperator.java       |   2 +-
 .../streaming/io/BoundedSourceWrapper.java      |  10 +-
 .../streaming/io/UnboundedSourceWrapper.java    |  12 +-
 .../beam/runners/flink/PipelineOptionsTest.java | 165 +------------------
 runners/spark/pom.xml                           |  12 --
 .../spark/aggregators/NamedAggregators.java     |  93 -----------
 .../beam/runners/spark/io/SourceDStream.java    |  20 +--
 .../apache/beam/runners/spark/io/SourceRDD.java |  22 +--
 .../runners/spark/io/SparkUnboundedSource.java  |   6 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java     |  10 +-
 .../spark/stateful/StateSpecFunctions.java      |   8 +-
 .../spark/translation/EvaluationContext.java    |  11 +-
 .../spark/translation/MultiDoFnFunction.java    |  16 +-
 .../translation/SparkAbstractCombineFn.java     |   9 +-
 .../spark/translation/SparkGlobalCombineFn.java |   5 +-
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java |   9 +-
 .../spark/translation/SparkKeyedCombineFn.java  |   5 +-
 .../spark/translation/SparkRuntimeContext.java  |  90 ----------
 .../spark/translation/TransformTranslator.java  |  27 ++-
 .../streaming/StreamingTransformTranslator.java |  20 +--
 .../translation/SparkRuntimeContextTest.java    | 122 --------------
 .../beam/sdk/options/PipelineOptions.java       |   7 +-
 .../apache/beam/sdk/options/ValueProviders.java |   8 +-
 41 files changed, 327 insertions(+), 945 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index fd5aafb..96aac8b 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -63,14 +63,6 @@
       <version>${apex.malhar.version}</version>
     </dependency>
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-core</artifactId>
-    </dependency>
-    <dependency>
-       <groupId>com.fasterxml.jackson.core</groupId>
-       <artifactId>jackson-databind</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.apex</groupId>
       <artifactId>apex-engine</artifactId>
       <version>${apex.core.version}</version>

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 39f681f..5c0d72f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -33,7 +33,6 @@ import java.util.Collections;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
-import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
 import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
@@ -41,6 +40,7 @@ import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.TriggerTranslation;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
@@ -149,7 +149,9 @@ public class ApexGroupByKeyOperator<K, V> implements Operator,
 
   @Override
   public void setup(OperatorContext context) {
-    this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this);
+    this.traceTuples =
+        ApexStreamTuple.Logging.isDebugEnabled(
+            serializedOptions.get().as(ApexPipelineOptions.class), this);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index c3cbab2..4dc807d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -40,7 +40,6 @@ import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
-import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.StateInternalsProxy;
 import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
 import org.apache.beam.runners.core.DoFnRunner;
@@ -64,6 +63,7 @@ import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
@@ -386,7 +386,9 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
 
   @Override
   public void setup(OperatorContext context) {
-    this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
+    this.traceTuples =
+        ApexStreamTuple.Logging.isDebugEnabled(
+            pipelineOptions.get().as(ApexPipelineOptions.class), this);
     SideInputReader sideInputReader = NullSideInputReader.of(sideInputs);
     if (!sideInputs.isEmpty()) {
       sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals);

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
index 1549560..21fb9d2 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
@@ -30,8 +30,8 @@ import java.io.IOException;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.DataTuple;
-import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ValuesSource;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -119,7 +119,9 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
 
   @Override
   public void setup(OperatorContext context) {
-    this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
+    this.traceTuples =
+        ApexStreamTuple.Logging.isDebugEnabled(
+            pipelineOptions.get().as(ApexPipelineOptions.class), this);
     try {
       reader = source.createReader(this.pipelineOptions.get(), null);
       available = reader.start();

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
deleted file mode 100644
index 46b04fc..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translation.utils;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
-
-/**
- * A wrapper to enable serialization of {@link PipelineOptions}.
- */
-public class SerializablePipelineOptions implements Externalizable {
-
-  /* Used to ensure we initialize file systems exactly once, because it's a slow operation. */
-  private static final AtomicBoolean FILE_SYSTEMS_INTIIALIZED = new AtomicBoolean(false);
-
-  private transient ApexPipelineOptions pipelineOptions;
-
-  public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) {
-    this.pipelineOptions = pipelineOptions;
-  }
-
-  public SerializablePipelineOptions() {
-  }
-
-  public ApexPipelineOptions get() {
-    return this.pipelineOptions;
-  }
-
-  @Override
-  public void writeExternal(ObjectOutput out) throws IOException {
-    out.writeUTF(createMapper().writeValueAsString(pipelineOptions));
-  }
-
-  @Override
-  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-    String s = in.readUTF();
-    this.pipelineOptions = createMapper().readValue(s, PipelineOptions.class)
-        .as(ApexPipelineOptions.class);
-
-    if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) {
-      FileSystems.setDefaultPipelineOptions(pipelineOptions);
-    }
-  }
-
-  /**
-   * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
-   * for user specified configuration injection into the ObjectMapper. This supports user custom
-   * types on {@link PipelineOptions}.
-   */
-  private static ObjectMapper createMapper() {
-    return new ObjectMapper().registerModules(
-        ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
deleted file mode 100644
index 118ff99..0000000
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translation.utils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import com.datatorrent.common.util.FSStorageAgent;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.auto.service.AutoService;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.Test;
-
-/**
- * Tests the serialization of PipelineOptions.
- */
-public class PipelineOptionsTest {
-
-  /**
-   * Interface for testing.
-   */
-  public interface MyOptions extends ApexPipelineOptions {
-    @Description("Bla bla bla")
-    @Default.String("Hello")
-    String getTestOption();
-    void setTestOption(String value);
-  }
-
-  private static class OptionsWrapper {
-    private OptionsWrapper() {
-      this(null); // required for Kryo
-    }
-    private OptionsWrapper(ApexPipelineOptions options) {
-      this.options = new SerializablePipelineOptions(options);
-    }
-    @Bind(JavaSerializer.class)
-    private final SerializablePipelineOptions options;
-  }
-
-  @Test
-  public void testSerialization() {
-    OptionsWrapper wrapper = new OptionsWrapper(
-        PipelineOptionsFactory.fromArgs("--testOption=nothing").as(MyOptions.class));
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    FSStorageAgent.store(bos, wrapper);
-
-    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
-    OptionsWrapper wrapperCopy = (OptionsWrapper) FSStorageAgent.retrieve(bis);
-    assertNotNull(wrapperCopy.options);
-    assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption());
-  }
-
-  @Test
-  public void testSerializationWithUserCustomType() {
-    OptionsWrapper wrapper = new OptionsWrapper(
-        PipelineOptionsFactory.fromArgs("--jacksonIncompatible=\"testValue\"")
-            .as(JacksonIncompatibleOptions.class));
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    FSStorageAgent.store(bos, wrapper);
-
-    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
-    OptionsWrapper wrapperCopy = (OptionsWrapper) FSStorageAgent.retrieve(bis);
-    assertNotNull(wrapperCopy.options);
-    assertEquals("testValue",
-        wrapperCopy.options.get().as(JacksonIncompatibleOptions.class)
-            .getJacksonIncompatible().value);
-  }
-
-  /** PipelineOptions used to test auto registration of Jackson modules. */
-  public interface JacksonIncompatibleOptions extends ApexPipelineOptions {
-    JacksonIncompatible getJacksonIncompatible();
-    void setJacksonIncompatible(JacksonIncompatible value);
-  }
-
-  /** A Jackson {@link Module} to test auto-registration of modules. */
-  @AutoService(Module.class)
-  public static class RegisteredTestModule extends SimpleModule {
-    public RegisteredTestModule() {
-      super("RegisteredTestModule");
-      setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class);
-    }
-  }
-
-  /** A class which Jackson does not know how to serialize/deserialize. */
-  public static class JacksonIncompatible {
-    private final String value;
-    public JacksonIncompatible(String value) {
-      this.value = value;
-    }
-  }
-
-  /** A Jackson mixin used to add annotations to other classes. */
-  @JsonDeserialize(using = JacksonIncompatibleDeserializer.class)
-  @JsonSerialize(using = JacksonIncompatibleSerializer.class)
-  public static final class JacksonIncompatibleMixin {}
-
-  /** A Jackson deserializer for {@link JacksonIncompatible}. */
-  public static class JacksonIncompatibleDeserializer extends
-      JsonDeserializer<JacksonIncompatible> {
-
-    @Override
-    public JacksonIncompatible deserialize(JsonParser jsonParser,
-        DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
-      return new JacksonIncompatible(jsonParser.readValueAs(String.class));
-    }
-  }
-
-  /** A Jackson serializer for {@link JacksonIncompatible}. */
-  public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> {
-
-    @Override
-    public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator,
-        SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
-      jsonGenerator.writeString(jacksonIncompatible.value);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index b85b5f5..1a52914 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -65,6 +65,21 @@
     </dependency>
 
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SerializablePipelineOptions.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SerializablePipelineOptions.java
new file mode 100644
index 0000000..e697fb2
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SerializablePipelineOptions.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+
+/**
+ * Holds a {@link PipelineOptions} in JSON serialized form and calls {@link
+ * FileSystems#setDefaultPipelineOptions(PipelineOptions)} on construction or on deserialization.
+ */
+public class SerializablePipelineOptions implements Serializable {
+  private static final ObjectMapper MAPPER =
+      new ObjectMapper()
+          .registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+
+  private final String serializedPipelineOptions;
+  private transient PipelineOptions options;
+
+  public SerializablePipelineOptions(PipelineOptions options) {
+    this.serializedPipelineOptions = serializeToJson(options);
+    this.options = options;
+    FileSystems.setDefaultPipelineOptions(options);
+  }
+
+  public PipelineOptions get() {
+    return options;
+  }
+
+  private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException {
+    is.defaultReadObject();
+    this.options = deserializeFromJson(serializedPipelineOptions);
+    // TODO https://issues.apache.org/jira/browse/BEAM-2712: remove this call.
+    FileSystems.setDefaultPipelineOptions(options);
+  }
+
+  private static String serializeToJson(PipelineOptions options) {
+    try {
+      return MAPPER.writeValueAsString(options);
+    } catch (JsonProcessingException e) {
+      throw new IllegalArgumentException("Failed to serialize PipelineOptions", e);
+    }
+  }
+
+  private static PipelineOptions deserializeFromJson(String options) {
+    try {
+      return MAPPER.readValue(options, PipelineOptions.class);
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Failed to deserialize PipelineOptions", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SerializablePipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SerializablePipelineOptionsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SerializablePipelineOptionsTest.java
new file mode 100644
index 0000000..cd470b2
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SerializablePipelineOptionsTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.junit.Assert.assertEquals;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SerializablePipelineOptions}. */
+@RunWith(JUnit4.class)
+public class SerializablePipelineOptionsTest {
+  /** Options for testing. */
+  public interface MyOptions extends PipelineOptions {
+    String getFoo();
+
+    void setFoo(String foo);
+
+    @JsonIgnore
+    @Default.String("not overridden")
+    String getIgnoredField();
+
+    void setIgnoredField(String value);
+  }
+
+  @Test
+  public void testSerializationAndDeserialization() throws Exception {
+    PipelineOptions options =
+        PipelineOptionsFactory.fromArgs("--foo=testValue", "--ignoredField=overridden")
+            .as(MyOptions.class);
+
+    SerializablePipelineOptions serializableOptions = new SerializablePipelineOptions(options);
+    assertEquals("testValue", serializableOptions.get().as(MyOptions.class).getFoo());
+    assertEquals("overridden", serializableOptions.get().as(MyOptions.class).getIgnoredField());
+
+    SerializablePipelineOptions copy = SerializableUtils.clone(serializableOptions);
+    assertEquals("testValue", copy.get().as(MyOptions.class).getFoo());
+    assertEquals("not overridden", copy.get().as(MyOptions.class).getIgnoredField());
+  }
+
+  @Test
+  public void testIndependence() throws Exception {
+    SerializablePipelineOptions first =
+        new SerializablePipelineOptions(
+            PipelineOptionsFactory.fromArgs("--foo=first").as(MyOptions.class));
+    SerializablePipelineOptions firstCopy = SerializableUtils.clone(first);
+    SerializablePipelineOptions second =
+        new SerializablePipelineOptions(
+            PipelineOptionsFactory.fromArgs("--foo=second").as(MyOptions.class));
+    SerializablePipelineOptions secondCopy = SerializableUtils.clone(second);
+
+    assertEquals("first", first.get().as(MyOptions.class).getFoo());
+    assertEquals("first", firstCopy.get().as(MyOptions.class).getFoo());
+    assertEquals("second", second.get().as(MyOptions.class).getFoo());
+    assertEquals("second", secondCopy.get().as(MyOptions.class).getFoo());
+
+    first.get().as(MyOptions.class).setFoo("new first");
+    firstCopy.get().as(MyOptions.class).setFoo("new firstCopy");
+    second.get().as(MyOptions.class).setFoo("new second");
+    secondCopy.get().as(MyOptions.class).setFoo("new secondCopy");
+
+    assertEquals("new first", first.get().as(MyOptions.class).getFoo());
+    assertEquals("new firstCopy", firstCopy.get().as(MyOptions.class).getFoo());
+    assertEquals("new second", second.get().as(MyOptions.class).getFoo());
+    assertEquals("new secondCopy", secondCopy.get().as(MyOptions.class).getFoo());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index c063a2d..06746fd 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -256,16 +256,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index d8ed622..3048168 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -22,9 +22,9 @@ import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
@@ -50,7 +50,7 @@ import org.apache.flink.util.Collector;
 public class FlinkDoFnFunction<InputT, OutputT>
     extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> {
 
-  private final SerializedPipelineOptions serializedOptions;
+  private final SerializablePipelineOptions serializedOptions;
 
   private final DoFn<InputT, OutputT> doFn;
   private final String stepName;
@@ -75,7 +75,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
     this.doFn = doFn;
     this.stepName = stepName;
     this.sideInputs = sideInputs;
-    this.serializedOptions = new SerializedPipelineOptions(options);
+    this.serializedOptions = new SerializablePipelineOptions(options);
     this.windowingStrategy = windowingStrategy;
     this.outputMap = outputMap;
     this.mainOutputTag = mainOutputTag;
@@ -101,7 +101,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
     List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet());
 
     DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
-        serializedOptions.getPipelineOptions(), doFn,
+        serializedOptions.get(), doFn,
         new FlinkSideInputReader(sideInputs, runtimeContext),
         outputManager,
         mainOutputTag,
@@ -109,7 +109,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
         new FlinkNoOpStepContext(),
         windowingStrategy);
 
-    if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
+    if ((serializedOptions.get().as(FlinkPipelineOptions.class))
         .getEnableMetrics()) {
       doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 13be913..c73dade 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Map;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -47,7 +47,7 @@ public class FlinkMergingNonShuffleReduceFunction<
 
   private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
 
-  private final SerializedPipelineOptions serializedOptions;
+  private final SerializablePipelineOptions serializedOptions;
 
   public FlinkMergingNonShuffleReduceFunction(
       CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn,
@@ -60,7 +60,7 @@ public class FlinkMergingNonShuffleReduceFunction<
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
 
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
 
   }
 
@@ -69,7 +69,7 @@ public class FlinkMergingNonShuffleReduceFunction<
       Iterable<WindowedValue<KV<K, InputT>>> elements,
       Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
 
-    PipelineOptions options = serializedOptions.getPipelineOptions();
+    PipelineOptions options = serializedOptions.get();
 
     FlinkSideInputReader sideInputReader =
         new FlinkSideInputReader(sideInputs, getRuntimeContext());

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index db12a49..49e821c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Map;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -46,7 +46,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
 
   protected final WindowingStrategy<Object, W> windowingStrategy;
 
-  protected final SerializedPipelineOptions serializedOptions;
+  protected final SerializablePipelineOptions serializedOptions;
 
   protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
 
@@ -59,7 +59,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
     this.combineFn = combineFn;
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
 
   }
 
@@ -68,7 +68,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
       Iterable<WindowedValue<KV<K, InputT>>> elements,
       Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
 
-    PipelineOptions options = serializedOptions.getPipelineOptions();
+    PipelineOptions options = serializedOptions.get();
 
     FlinkSideInputReader sideInputReader =
         new FlinkSideInputReader(sideInputs, getRuntimeContext());

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index 53d71d8..6645b3a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Map;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -48,7 +48,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
 
   protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
 
-  protected final SerializedPipelineOptions serializedOptions;
+  protected final SerializablePipelineOptions serializedOptions;
 
   public FlinkReduceFunction(
       CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn,
@@ -61,7 +61,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
 
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
 
   }
 
@@ -70,7 +70,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
       Iterable<WindowedValue<KV<K, AccumT>>> elements,
       Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
 
-    PipelineOptions options = serializedOptions.getPipelineOptions();
+    PipelineOptions options = serializedOptions.get();
 
     FlinkSideInputReader sideInputReader =
         new FlinkSideInputReader(sideInputs, getRuntimeContext());

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 11d4fee..412269c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -31,9 +31,9 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -61,7 +61,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
   private String stepName;
   private final WindowingStrategy<?, ?> windowingStrategy;
   private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
-  private final SerializedPipelineOptions serializedOptions;
+  private final SerializablePipelineOptions serializedOptions;
   private final Map<TupleTag<?>, Integer> outputMap;
   private final TupleTag<OutputT> mainOutputTag;
   private transient DoFnInvoker doFnInvoker;
@@ -79,7 +79,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
     this.stepName = stepName;
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
     this.outputMap = outputMap;
     this.mainOutputTag = mainOutputTag;
   }
@@ -118,7 +118,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
     List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet());
 
     DoFnRunner<KV<K, V>, OutputT> doFnRunner = DoFnRunners.simpleRunner(
-        serializedOptions.getPipelineOptions(), dofn,
+        serializedOptions.get(), dofn,
         new FlinkSideInputReader(sideInputs, runtimeContext),
         outputManager,
         mainOutputTag,
@@ -135,7 +135,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
         },
         windowingStrategy);
 
-    if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
+    if ((serializedOptions.get().as(FlinkPipelineOptions.class))
         .getEnableMetrics()) {
       doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
deleted file mode 100644
index 40b6dd6..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink.translation.utils;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
-
-/**
- * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
- */
-public class SerializedPipelineOptions implements Serializable {
-
-  private final byte[] serializedOptions;
-
-  /** Lazily initialized copy of deserialized options. */
-  private transient PipelineOptions pipelineOptions;
-
-  public SerializedPipelineOptions(PipelineOptions options) {
-    checkNotNull(options, "PipelineOptions must not be null.");
-
-    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-      createMapper().writeValue(baos, options);
-      this.serializedOptions = baos.toByteArray();
-    } catch (Exception e) {
-      throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
-    }
-
-  }
-
-  public PipelineOptions getPipelineOptions() {
-    if (pipelineOptions == null) {
-      try {
-        pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class);
-
-        FileSystems.setDefaultPipelineOptions(pipelineOptions);
-      } catch (IOException e) {
-        throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
-      }
-    }
-
-    return pipelineOptions;
-  }
-
-  /**
-   * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
-   * for user specified configuration injection into the ObjectMapper. This supports user custom
-   * types on {@link PipelineOptions}.
-   */
-  private static ObjectMapper createMapper() {
-    return new ObjectMapper().registerModules(
-        ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index 27e6912..3f9d601 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -19,9 +19,9 @@ package org.apache.beam.runners.flink.translation.wrappers;
 
 import java.io.IOException;
 import java.util.List;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -50,7 +50,7 @@ public class SourceInputFormat<T>
   private final BoundedSource<T> initialSource;
 
   private transient PipelineOptions options;
-  private final SerializedPipelineOptions serializedOptions;
+  private final SerializablePipelineOptions serializedOptions;
 
   private transient BoundedSource.BoundedReader<T> reader;
   private boolean inputAvailable = false;
@@ -61,12 +61,12 @@ public class SourceInputFormat<T>
       String stepName, BoundedSource<T> initialSource, PipelineOptions options) {
     this.stepName = stepName;
     this.initialSource = initialSource;
-    this.serializedOptions = new SerializedPipelineOptions(options);
+    this.serializedOptions = new SerializablePipelineOptions(options);
   }
 
   @Override
   public void configure(Configuration configuration) {
-    options = serializedOptions.getPipelineOptions();
+    options = serializedOptions.get();
   }
 
   @Override
@@ -76,7 +76,7 @@ public class SourceInputFormat<T>
     readerInvoker =
         new ReaderInvocationUtil<>(
             stepName,
-            serializedOptions.getPipelineOptions(),
+            serializedOptions.get(),
             metricContainer);
 
     reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 7995ea8..62de423 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -47,10 +47,10 @@ import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
 import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals;
@@ -106,7 +106,7 @@ public class DoFnOperator<InputT, OutputT>
 
   protected DoFn<InputT, OutputT> doFn;
 
-  protected final SerializedPipelineOptions serializedOptions;
+  protected final SerializablePipelineOptions serializedOptions;
 
   protected final TupleTag<OutputT> mainOutputTag;
   protected final List<TupleTag<?>> additionalOutputTags;
@@ -174,7 +174,7 @@ public class DoFnOperator<InputT, OutputT>
     this.additionalOutputTags = additionalOutputTags;
     this.sideInputTagMapping = sideInputTagMapping;
     this.sideInputs = sideInputs;
-    this.serializedOptions = new SerializedPipelineOptions(options);
+    this.serializedOptions = new SerializablePipelineOptions(options);
     this.windowingStrategy = windowingStrategy;
     this.outputManagerFactory = outputManagerFactory;
 
@@ -256,7 +256,7 @@ public class DoFnOperator<InputT, OutputT>
     org.apache.beam.runners.core.StepContext stepContext = createStepContext();
 
     doFnRunner = DoFnRunners.simpleRunner(
-        serializedOptions.getPipelineOptions(),
+        serializedOptions.get(),
         doFn,
         sideInputReader,
         outputManager,
@@ -301,7 +301,7 @@ public class DoFnOperator<InputT, OutputT>
           stateCleaner);
     }
 
-    if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
+    if ((serializedOptions.get().as(FlinkPipelineOptions.class))
         .getEnableMetrics()) {
       doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 2f095d4..be758a6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -115,7 +115,7 @@ public class SplittableDoFnOperator<
     ((ProcessFn) doFn).setProcessElementInvoker(
         new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
             doFn,
-            serializedOptions.getPipelineOptions(),
+            serializedOptions.get(),
             new OutputWindowedValue<OutputT>() {
               @Override
               public void outputWindowedValue(

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index 6d75688..5ddc46f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -20,9 +20,9 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -48,7 +48,7 @@ public class BoundedSourceWrapper<OutputT>
   /**
    * Keep the options so that we can initialize the readers.
    */
-  private final SerializedPipelineOptions serializedOptions;
+  private final SerializablePipelineOptions serializedOptions;
 
   /**
    * The split sources. We split them in the constructor to ensure that all parallel
@@ -74,7 +74,7 @@ public class BoundedSourceWrapper<OutputT>
       BoundedSource<OutputT> source,
       int parallelism) throws Exception {
     this.stepName = stepName;
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
 
     long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism;
 
@@ -109,13 +109,13 @@ public class BoundedSourceWrapper<OutputT>
     ReaderInvocationUtil<OutputT, BoundedSource.BoundedReader<OutputT>> readerInvoker =
         new ReaderInvocationUtil<>(
             stepName,
-            serializedOptions.getPipelineOptions(),
+            serializedOptions.get(),
             metricContainer);
 
     readers = new ArrayList<>();
     // initialize readers from scratch
     for (BoundedSource<OutputT> source : localSources) {
-      readers.add(source.createReader(serializedOptions.getPipelineOptions()));
+      readers.add(source.createReader(serializedOptions.get()));
     }
 
    if (readers.size() == 1) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index e75072a..817dd74 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -22,10 +22,10 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
@@ -72,7 +72,7 @@ public class UnboundedSourceWrapper<
   /**
    * Keep the options so that we can initialize the localReaders.
    */
-  private final SerializedPipelineOptions serializedOptions;
+  private final SerializablePipelineOptions serializedOptions;
 
   /**
    * For snapshot and restore.
@@ -141,7 +141,7 @@ public class UnboundedSourceWrapper<
       UnboundedSource<OutputT, CheckpointMarkT> source,
       int parallelism) throws Exception {
     this.stepName = stepName;
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
 
     if (source.requiresDeduping()) {
       LOG.warn("Source {} requires deduping but Flink runner doesn't support this yet.", source);
@@ -189,7 +189,7 @@ public class UnboundedSourceWrapper<
           stateForCheckpoint.get()) {
         localSplitSources.add(restored.getKey());
         localReaders.add(restored.getKey().createReader(
-            serializedOptions.getPipelineOptions(), restored.getValue()));
+            serializedOptions.get(), restored.getValue()));
       }
     } else {
       // initialize localReaders and localSources from scratch
@@ -198,7 +198,7 @@ public class UnboundedSourceWrapper<
           UnboundedSource<OutputT, CheckpointMarkT> source =
               splitSources.get(i);
           UnboundedSource.UnboundedReader<OutputT> reader =
-              source.createReader(serializedOptions.getPipelineOptions(), null);
+              source.createReader(serializedOptions.get(), null);
           localSplitSources.add(source);
           localReaders.add(reader);
         }
@@ -221,7 +221,7 @@ public class UnboundedSourceWrapper<
     ReaderInvocationUtil<OutputT, UnboundedSource.UnboundedReader<OutputT>> readerInvoker =
         new ReaderInvocationUtil<>(
             stepName,
-            serializedOptions.getPipelineOptions(),
+            serializedOptions.get(),
             metricContainer);
 
     if (localReaders.size() == 0) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index d0281ec..eb06026 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -17,32 +17,8 @@
  */
 package org.apache.beam.runners.flink;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.auto.service.AutoService;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.Collections;
 import java.util.HashMap;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.Default;
@@ -60,12 +36,10 @@ import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.joda.time.Instant;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
@@ -73,9 +47,7 @@ import org.junit.Test;
  */
 public class PipelineOptionsTest {
 
-  /**
-   * Pipeline options.
-   */
+  /** Pipeline options. */
   public interface MyOptions extends FlinkPipelineOptions {
     @Description("Bla bla bla")
     @Default.String("Hello")
@@ -83,60 +55,12 @@ public class PipelineOptionsTest {
     void setTestOption(String value);
   }
 
-  private static MyOptions options;
-  private static SerializedPipelineOptions serializedOptions;
-
-  private static final String[] args = new String[]{"--testOption=nothing"};
-
-  @BeforeClass
-  public static void beforeTest() {
-    options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
-    serializedOptions = new SerializedPipelineOptions(options);
-  }
-
-  @Test
-  public void testDeserialization() {
-    MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class);
-    assertEquals("nothing", deserializedOptions.getTestOption());
-  }
-
-  @Test
-  public void testIgnoredFieldSerialization() {
-    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
-    options.setStateBackend(new MemoryStateBackend());
-
-    FlinkPipelineOptions deserialized =
-        new SerializedPipelineOptions(options).getPipelineOptions().as(FlinkPipelineOptions.class);
-
-    assertNull(deserialized.getStateBackend());
-  }
-
-  @Test
-  public void testEnableMetrics() {
-    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
-    options.setEnableMetrics(false);
-    assertFalse(options.getEnableMetrics());
-  }
-
-  @Test
-  public void testCaching() {
-    PipelineOptions deserializedOptions =
-        serializedOptions.getPipelineOptions().as(PipelineOptions.class);
-
-    assertNotNull(deserializedOptions);
-    assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
-    assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
-    assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
-  }
-
-  @Test(expected = Exception.class)
-  public void testNonNull() {
-    new SerializedPipelineOptions(null);
-  }
+  private static MyOptions options =
+      PipelineOptionsFactory.fromArgs("--testOption=nothing").as(MyOptions.class);
 
   @Test(expected = Exception.class)
   public void parDoBaseClassPipelineOptionsNullTest() {
-    DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+    new DoFnOperator<>(
         new TestDoFn(),
         "stepName",
         WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
@@ -196,18 +120,7 @@ public class PipelineOptionsTest {
 
   }
 
-  @Test
-  public void testExternalizedCheckpointsConfigs() {
-    String[] args = new String[] { "--externalizedCheckpointsEnabled=true",
-        "--retainExternalizedCheckpointsOnCancellation=false" };
-    final FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
-        .as(FlinkPipelineOptions.class);
-    assertEquals(options.isExternalizedCheckpointsEnabled(), true);
-    assertEquals(options.getRetainExternalizedCheckpointsOnCancellation(), false);
-  }
-
   private static class TestDoFn extends DoFn<String, String> {
-
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       Assert.assertNotNull(c.getPipelineOptions());
@@ -216,74 +129,4 @@ public class PipelineOptionsTest {
           c.getPipelineOptions().as(MyOptions.class).getTestOption());
     }
   }
-
-  /** PipelineOptions used to test auto registration of Jackson modules. */
-  public interface JacksonIncompatibleOptions extends PipelineOptions {
-    JacksonIncompatible getJacksonIncompatible();
-    void setJacksonIncompatible(JacksonIncompatible value);
-  }
-
-  /** A Jackson {@link Module} to test auto-registration of modules. */
-  @AutoService(Module.class)
-  public static class RegisteredTestModule extends SimpleModule {
-    public RegisteredTestModule() {
-      super("RegisteredTestModule");
-      setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class);
-    }
-  }
-
-  /** A class which Jackson does not know how to serialize/deserialize. */
-  public static class JacksonIncompatible {
-    private final String value;
-    public JacksonIncompatible(String value) {
-      this.value = value;
-    }
-  }
-
-  /** A Jackson mixin used to add annotations to other classes. */
-  @JsonDeserialize(using = JacksonIncompatibleDeserializer.class)
-  @JsonSerialize(using = JacksonIncompatibleSerializer.class)
-  public static final class JacksonIncompatibleMixin {}
-
-  /** A Jackson deserializer for {@link JacksonIncompatible}. */
-  public static class JacksonIncompatibleDeserializer extends
-      JsonDeserializer<JacksonIncompatible> {
-
-    @Override
-    public JacksonIncompatible deserialize(JsonParser jsonParser,
-        DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
-      return new JacksonIncompatible(jsonParser.readValueAs(String.class));
-    }
-  }
-
-  /** A Jackson serializer for {@link JacksonIncompatible}. */
-  public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> {
-
-    @Override
-    public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator,
-        SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
-      jsonGenerator.writeString(jacksonIncompatible.value);
-    }
-  }
-
-  @Test
-  public void testSerializingPipelineOptionsWithCustomUserType() throws Exception {
-    String expectedValue = "testValue";
-    PipelineOptions options = PipelineOptionsFactory
-        .fromArgs("--jacksonIncompatible=\"" + expectedValue + "\"")
-        .as(JacksonIncompatibleOptions.class);
-    SerializedPipelineOptions context = new SerializedPipelineOptions(options);
-
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    try (ObjectOutputStream outputStream = new ObjectOutputStream(baos)) {
-      outputStream.writeObject(context);
-    }
-    try (ObjectInputStream inputStream =
-        new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()))) {
-      SerializedPipelineOptions copy = (SerializedPipelineOptions) inputStream.readObject();
-      assertEquals(expectedValue,
-          copy.getPipelineOptions().as(JacksonIncompatibleOptions.class)
-              .getJacksonIncompatible().value);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index e823060..b2e7fe4 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -35,7 +35,6 @@
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <kafka.version>0.9.0.1</kafka.version>
-    <jackson.version>2.4.4</jackson.version>
     <dropwizard.metrics.version>3.1.2</dropwizard.metrics.version>
   </properties>
 
@@ -184,18 +183,7 @@
     </dependency>
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-core</artifactId>
-      <version>${jackson.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
-      <version>${jackson.version}</version>
-    </dependency>
-      <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-        <version>${jackson.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.avro</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index 27f2ec8..a9f2c44 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -19,18 +19,11 @@
 package org.apache.beam.runners.spark.aggregators;
 
 import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.TreeMap;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine;
 
 /**
@@ -52,17 +45,6 @@ public class NamedAggregators implements Serializable {
   }
 
   /**
-   * Constructs a new named aggregators instance that contains a mapping from the specified
-   * `named` to the associated initial state.
-   *
-   * @param name  Name of aggregator.
-   * @param state Associated State.
-   */
-  public NamedAggregators(String name, State<?, ?, ?> state) {
-    this.mNamedAggregators.put(name, state);
-  }
-
-  /**
    * @param name      Name of aggregator to retrieve.
    * @param typeClass Type class to cast the value to.
    * @param <T>       Type to be returned.
@@ -152,79 +134,4 @@ public class NamedAggregators implements Serializable {
     Combine.CombineFn<InputT, InterT, OutputT> getCombineFn();
   }
 
-  /**
-   * @param <InputT> Input data type
-   * @param <InterT> Intermediate data type (useful for averages)
-   * @param <OutputT> Output data type
-   */
-  public static class CombineFunctionState<InputT, InterT, OutputT>
-      implements State<InputT, InterT, OutputT> {
-
-    private Combine.CombineFn<InputT, InterT, OutputT> combineFn;
-    private Coder<InputT> inCoder;
-    private SparkRuntimeContext ctxt;
-    private transient InterT state;
-
-    public CombineFunctionState(
-        Combine.CombineFn<InputT, InterT, OutputT> combineFn,
-        Coder<InputT> inCoder,
-        SparkRuntimeContext ctxt) {
-      this.combineFn = combineFn;
-      this.inCoder = inCoder;
-      this.ctxt = ctxt;
-      this.state = combineFn.createAccumulator();
-    }
-
-    @Override
-    public void update(InputT element) {
-      combineFn.addInput(state, element);
-    }
-
-    @Override
-    public State<InputT, InterT, OutputT> merge(State<InputT, InterT, OutputT> other) {
-      this.state = combineFn.mergeAccumulators(ImmutableList.of(current(), other.current()));
-      return this;
-    }
-
-    @Override
-    public InterT current() {
-      return state;
-    }
-
-    @Override
-    public OutputT render() {
-      return combineFn.extractOutput(state);
-    }
-
-    @Override
-    public Combine.CombineFn<InputT, InterT, OutputT> getCombineFn() {
-      return combineFn;
-    }
-
-    private void writeObject(ObjectOutputStream oos) throws IOException {
-      oos.writeObject(ctxt);
-      oos.writeObject(combineFn);
-      oos.writeObject(inCoder);
-      try {
-        combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
-            .encode(state, oos);
-      } catch (CannotProvideCoderException e) {
-        throw new IllegalStateException("Could not determine coder for accumulator", e);
-      }
-    }
-
-    @SuppressWarnings("unchecked")
-    private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
-      ctxt = (SparkRuntimeContext) ois.readObject();
-      combineFn = (Combine.CombineFn<InputT, InterT, OutputT>) ois.readObject();
-      inCoder = (Coder<InputT>) ois.readObject();
-      try {
-        state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
-            .decode(ois);
-      } catch (CannotProvideCoderException e) {
-        throw new IllegalStateException("Could not determine coder for accumulator", e);
-      }
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index 20aca5f..b7000b4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -20,8 +20,8 @@ package org.apache.beam.runners.spark.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.spark.api.java.JavaSparkContext$;
@@ -58,7 +58,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
   private static final Logger LOG = LoggerFactory.getLogger(SourceDStream.class);
 
   private final UnboundedSource<T, CheckpointMarkT> unboundedSource;
-  private final SparkRuntimeContext runtimeContext;
+  private final SerializablePipelineOptions options;
   private final Duration boundReadDuration;
   // Reader cache interval to expire readers if they haven't been accessed in the last microbatch.
   // The reason we expire readers is that upon executor death/addition source split ownership can be
@@ -81,20 +81,20 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
   SourceDStream(
       StreamingContext ssc,
       UnboundedSource<T, CheckpointMarkT> unboundedSource,
-      SparkRuntimeContext runtimeContext,
+      SerializablePipelineOptions options,
       Long boundMaxRecords) {
     super(ssc, JavaSparkContext$.MODULE$.<scala.Tuple2<Source<T>, CheckpointMarkT>>fakeClassTag());
     this.unboundedSource = unboundedSource;
-    this.runtimeContext = runtimeContext;
+    this.options = options;
 
-    SparkPipelineOptions options = runtimeContext.getPipelineOptions().as(
+    SparkPipelineOptions sparkOptions = options.get().as(
         SparkPipelineOptions.class);
 
     // Reader cache expiration interval. 50% of batch interval is added to accommodate latency.
-    this.readerCacheInterval = 1.5 * options.getBatchIntervalMillis();
+    this.readerCacheInterval = 1.5 * sparkOptions.getBatchIntervalMillis();
 
-    this.boundReadDuration = boundReadDuration(options.getReadTimePercentage(),
-        options.getMinReadTimeMillis());
+    this.boundReadDuration = boundReadDuration(sparkOptions.getReadTimePercentage(),
+        sparkOptions.getMinReadTimeMillis());
     // set initial parallelism once.
     this.initialParallelism = ssc().sparkContext().defaultParallelism();
     checkArgument(this.initialParallelism > 0, "Number of partitions must be greater than zero.");
@@ -104,7 +104,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
     try {
       this.numPartitions =
           createMicrobatchSource()
-              .split(options)
+              .split(sparkOptions)
               .size();
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -116,7 +116,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
     RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> rdd =
         new SourceRDD.Unbounded<>(
             ssc().sparkContext(),
-            runtimeContext,
+            options,
             createMicrobatchSource(),
             numPartitions);
     return scala.Option.apply(rdd);

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
index 01cc176..a225e0f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -28,9 +28,9 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -66,7 +66,7 @@ public class SourceRDD {
     private static final Logger LOG = LoggerFactory.getLogger(SourceRDD.Bounded.class);
 
     private final BoundedSource<T> source;
-    private final SparkRuntimeContext runtimeContext;
+    private final SerializablePipelineOptions options;
     private final int numPartitions;
     private final String stepName;
     private final Accumulator<MetricsContainerStepMap> metricsAccum;
@@ -79,11 +79,11 @@ public class SourceRDD {
     public Bounded(
         SparkContext sc,
         BoundedSource<T> source,
-        SparkRuntimeContext runtimeContext,
+        SerializablePipelineOptions options,
         String stepName) {
       super(sc, NIL, JavaSparkContext$.MODULE$.<WindowedValue<T>>fakeClassTag());
       this.source = source;
-      this.runtimeContext = runtimeContext;
+      this.options = options;
       // the input parallelism is determined by Spark's scheduler backend.
       // when running on YARN/SparkDeploy it's the result of max(totalCores, 2).
       // when running on Mesos it's 8.
@@ -103,14 +103,14 @@ public class SourceRDD {
       long desiredSizeBytes = DEFAULT_BUNDLE_SIZE;
       try {
         desiredSizeBytes = source.getEstimatedSizeBytes(
-            runtimeContext.getPipelineOptions()) / numPartitions;
+            options.get()) / numPartitions;
       } catch (Exception e) {
         LOG.warn("Failed to get estimated bundle size for source {}, using default bundle "
             + "size of {} bytes.", source, DEFAULT_BUNDLE_SIZE);
       }
       try {
         List<? extends Source<T>> partitionedSources = source.split(desiredSizeBytes,
-            runtimeContext.getPipelineOptions());
+            options.get());
         Partition[] partitions = new SourcePartition[partitionedSources.size()];
         for (int i = 0; i < partitionedSources.size(); i++) {
           partitions[i] = new SourcePartition<>(id(), i, partitionedSources.get(i));
@@ -125,7 +125,7 @@ public class SourceRDD {
     private BoundedSource.BoundedReader<T> createReader(SourcePartition<T> partition) {
       try {
         return ((BoundedSource<T>) partition.source).createReader(
-            runtimeContext.getPipelineOptions());
+            options.get());
       } catch (IOException e) {
         throw new RuntimeException("Failed to create reader from a BoundedSource.", e);
       }
@@ -293,7 +293,7 @@ public class SourceRDD {
         UnboundedSource.CheckpointMark> extends RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> {
 
     private final MicrobatchSource<T, CheckpointMarkT> microbatchSource;
-    private final SparkRuntimeContext runtimeContext;
+    private final SerializablePipelineOptions options;
     private final Partitioner partitioner;
 
     // to satisfy Scala API.
@@ -302,12 +302,12 @@ public class SourceRDD {
             .asScalaBuffer(Collections.<Dependency<?>>emptyList()).toList();
 
     public Unbounded(SparkContext sc,
-        SparkRuntimeContext runtimeContext,
+        SerializablePipelineOptions options,
         MicrobatchSource<T, CheckpointMarkT> microbatchSource,
         int initialNumPartitions) {
       super(sc, NIL,
           JavaSparkContext$.MODULE$.<scala.Tuple2<Source<T>, CheckpointMarkT>>fakeClassTag());
-      this.runtimeContext = runtimeContext;
+      this.options = options;
       this.microbatchSource = microbatchSource;
       this.partitioner = new HashPartitioner(initialNumPartitions);
     }
@@ -316,7 +316,7 @@ public class SourceRDD {
     public Partition[] getPartitions() {
       try {
         final List<? extends Source<T>> partitionedSources =
-            microbatchSource.split(runtimeContext.getPipelineOptions());
+            microbatchSource.split(options.get());
         final Partition[] partitions = new CheckpointableSourcePartition[partitionedSources.size()];
         for (int i = 0; i < partitionedSources.size(); i++) {
           partitions[i] =

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index 7106c73..b31aa9f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -22,12 +22,12 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.stateful.StateSpecFunctions;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
 import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
@@ -80,11 +80,11 @@ public class SparkUnboundedSource {
 
   public static <T, CheckpointMarkT extends CheckpointMark> UnboundedDataset<T> read(
       JavaStreamingContext jssc,
-      SparkRuntimeContext rc,
+      SerializablePipelineOptions rc,
       UnboundedSource<T, CheckpointMarkT> source,
       String stepName) {
 
-    SparkPipelineOptions options = rc.getPipelineOptions().as(SparkPipelineOptions.class);
+    SparkPipelineOptions options = rc.get().as(SparkPipelineOptions.class);
     Long maxRecordsPerBatch = options.getMaxRecordsPerBatch();
     SourceDStream<T, CheckpointMarkT> sourceDStream =
         new SourceDStream<>(jssc.ssc(), source, rc, maxRecordsPerBatch);


[4/4] beam git commit: This closes #3654: [BEAM-2670] Deduplicates serializable wrappers over PipelineOptions from Spark, Flink and Apex, and removes buggy SparkRuntimeContext

Posted by jk...@apache.org.
This closes #3654: [BEAM-2670] Deduplicates serializable wrappers over PipelineOptions from Spark, Flink and Apex, and removes buggy SparkRuntimeContext


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/339976c9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/339976c9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/339976c9

Branch: refs/heads/master
Commit: 339976c9fc00aa4ff951f2e2b114631c034be9c2
Parents: 0a358c7 7db051a
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Aug 2 11:05:08 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Aug 2 11:05:08 2017 -0700

----------------------------------------------------------------------
 runners/apex/pom.xml                            |   8 -
 .../operators/ApexGroupByKeyOperator.java       |   6 +-
 .../operators/ApexParDoOperator.java            |   6 +-
 .../ApexReadUnboundedInputOperator.java         |   6 +-
 .../utils/SerializablePipelineOptions.java      |  78 ---------
 .../translation/utils/PipelineOptionsTest.java  | 150 -----------------
 runners/core-construction-java/pom.xml          |  15 ++
 .../SerializablePipelineOptions.java            |  74 +++++++++
 .../SerializablePipelineOptionsTest.java        |  89 ++++++++++
 runners/flink/pom.xml                           |  10 --
 .../functions/FlinkDoFnFunction.java            |  10 +-
 .../FlinkMergingNonShuffleReduceFunction.java   |   8 +-
 .../functions/FlinkPartialReduceFunction.java   |   8 +-
 .../functions/FlinkReduceFunction.java          |   8 +-
 .../functions/FlinkStatefulDoFnFunction.java    |  10 +-
 .../utils/SerializedPipelineOptions.java        |  77 ---------
 .../translation/wrappers/SourceInputFormat.java |  10 +-
 .../wrappers/streaming/DoFnOperator.java        |  10 +-
 .../streaming/SplittableDoFnOperator.java       |   2 +-
 .../streaming/io/BoundedSourceWrapper.java      |  10 +-
 .../streaming/io/UnboundedSourceWrapper.java    |  12 +-
 .../beam/runners/flink/PipelineOptionsTest.java | 165 +------------------
 runners/spark/pom.xml                           |  12 --
 .../spark/aggregators/NamedAggregators.java     |  93 -----------
 .../beam/runners/spark/io/SourceDStream.java    |  20 +--
 .../apache/beam/runners/spark/io/SourceRDD.java |  22 +--
 .../runners/spark/io/SparkUnboundedSource.java  |   6 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java     |  10 +-
 .../spark/stateful/StateSpecFunctions.java      |   8 +-
 .../spark/translation/EvaluationContext.java    |  11 +-
 .../spark/translation/MultiDoFnFunction.java    |  16 +-
 .../translation/SparkAbstractCombineFn.java     |   9 +-
 .../spark/translation/SparkGlobalCombineFn.java |   5 +-
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java |   9 +-
 .../spark/translation/SparkKeyedCombineFn.java  |   5 +-
 .../spark/translation/SparkRuntimeContext.java  |  98 -----------
 .../spark/translation/TransformTranslator.java  |  27 ++-
 .../streaming/StreamingTransformTranslator.java |  20 +--
 .../translation/SparkRuntimeContextTest.java    | 122 --------------
 .../beam/sdk/options/PipelineOptions.java       |   7 +-
 .../apache/beam/sdk/options/ValueProviders.java |   8 +-
 41 files changed, 327 insertions(+), 953 deletions(-)
----------------------------------------------------------------------



[3/4] beam git commit: [BEAM-2670] Fixes SparkRuntimeContext.getPipelineOptions()

Posted by jk...@apache.org.
[BEAM-2670] Fixes SparkRuntimeContext.getPipelineOptions()

It used a global variable to store the deserialized options,
so even if there were several instances of SparkRuntimeContext
created with different PipelineOptions, they would all
return the same value depending on which one was asked first.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ff4b36c8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ff4b36c8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ff4b36c8

Branch: refs/heads/master
Commit: ff4b36c8ae1bd5e436ad63a32997273c8b4a97fe
Parents: 0a358c7
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Jul 27 13:05:23 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Aug 2 11:04:50 2017 -0700

----------------------------------------------------------------------
 .../spark/translation/EvaluationContext.java    |  2 +-
 .../spark/translation/SparkRuntimeContext.java  | 48 ++++++++------------
 .../translation/SparkRuntimeContextTest.java    |  2 +-
 3 files changed, 22 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ff4b36c8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 0c6c4d1..23e430a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -65,7 +65,7 @@ public class EvaluationContext {
     this.jsc = jsc;
     this.pipeline = pipeline;
     this.options = options;
-    this.runtime = new SparkRuntimeContext(pipeline, options);
+    this.runtime = new SparkRuntimeContext(options);
   }
 
   public EvaluationContext(

http://git-wip-us.apache.org/repos/asf/beam/blob/ff4b36c8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index f3fe99c..6361bb2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -21,11 +21,12 @@ package org.apache.beam.runners.spark.translation;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import java.io.IOException;
 import java.io.Serializable;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 
@@ -34,11 +35,16 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
  * data flow program is launched.
  */
 public class SparkRuntimeContext implements Serializable {
-  private final String serializedPipelineOptions;
+  private final Supplier<PipelineOptions> optionsSupplier;
   private transient CoderRegistry coderRegistry;
 
-  SparkRuntimeContext(Pipeline pipeline, PipelineOptions options) {
-    this.serializedPipelineOptions = serializePipelineOptions(options);
+  SparkRuntimeContext(PipelineOptions options) {
+    String serializedPipelineOptions = serializePipelineOptions(options);
+    this.optionsSupplier =
+        Suppliers.memoize(
+            Suppliers.compose(
+                new DeserializeOptions(),
+                Suppliers.ofInstance(serializedPipelineOptions)));
   }
 
   /**
@@ -59,16 +65,8 @@ public class SparkRuntimeContext implements Serializable {
     }
   }
 
-  private static PipelineOptions deserializePipelineOptions(String serializedPipelineOptions) {
-    try {
-      return createMapper().readValue(serializedPipelineOptions, PipelineOptions.class);
-    } catch (IOException e) {
-      throw new IllegalStateException("Failed to deserialize the pipeline options.", e);
-    }
-  }
-
   public PipelineOptions getPipelineOptions() {
-    return PipelineOptionsHolder.getOrInit(serializedPipelineOptions);
+    return optionsSupplier.get();
   }
 
   public CoderRegistry getCoderRegistry() {
@@ -78,21 +76,15 @@ public class SparkRuntimeContext implements Serializable {
     return coderRegistry;
   }
 
-  private static class PipelineOptionsHolder {
-    // on executors, this should deserialize once.
-    private static transient volatile PipelineOptions pipelineOptions = null;
-
-    static PipelineOptions getOrInit(String serializedPipelineOptions) {
-      if (pipelineOptions == null) {
-        synchronized (PipelineOptionsHolder.class) {
-          if (pipelineOptions == null) {
-            pipelineOptions = deserializePipelineOptions(serializedPipelineOptions);
-          }
-        }
-        // Register standard FileSystems.
-        FileSystems.setDefaultPipelineOptions(pipelineOptions);
+  private static class DeserializeOptions
+      implements Function<String, PipelineOptions>, Serializable {
+    @Override
+    public PipelineOptions apply(String options) {
+      try {
+        return createMapper().readValue(options, PipelineOptions.class);
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to deserialize the pipeline options.", e);
       }
-      return pipelineOptions;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ff4b36c8/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
index e8f578a..456056a 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
@@ -105,7 +105,7 @@ public class SparkRuntimeContextTest {
         .as(JacksonIncompatibleOptions.class);
     options.setRunner(CrashingRunner.class);
     Pipeline p = Pipeline.create(options);
-    SparkRuntimeContext context = new SparkRuntimeContext(p, options);
+    SparkRuntimeContext context = new SparkRuntimeContext(options);
 
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     try (ObjectOutputStream outputStream = new ObjectOutputStream(baos)) {