You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2017/01/10 17:27:17 UTC

[1/3] beam git commit: [BEAM-647] Fault-tolerant sideInputs via Broadcast variables Fix comments by Amit + rebase from master + checkstyle

Repository: beam
Updated Branches:
  refs/heads/master fe7fc298f -> c1b7f8695


[BEAM-647] Fault-tolerant sideInputs via Broadcast variables
Fix comments by Amit + rebase from master + checkstyle

Reformat + add unpersist on push


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/130c113e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/130c113e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/130c113e

Branch: refs/heads/master
Commit: 130c113e514c8bfe95ef69b68247eceac9301b17
Parents: fe7fc29
Author: ksalant <ks...@payapal.com>
Authored: Thu Dec 15 19:42:47 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Jan 10 19:12:24 2017 +0200

----------------------------------------------------------------------
 .../spark/translation/EvaluationContext.java    |  31 ++++--
 .../spark/translation/SparkPCollectionView.java | 103 ++++++++++++++++++
 .../spark/translation/TransformTranslator.java  |  31 +++++-
 .../spark/translation/TranslationUtils.java     |  26 +++--
 .../streaming/StreamingTransformTranslator.java |  73 +++++++++----
 .../runners/spark/util/BroadcastHelper.java     | 107 ++++++-------------
 .../spark/util/SparkSideInputReader.java        |   1 +
 .../ResumeFromCheckpointStreamingTest.java      |  19 ++++
 8 files changed, 273 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/130c113e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index ec5ad3d..b1a1142 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -55,8 +55,8 @@ public class EvaluationContext {
   private final Set<Dataset> leaves = new LinkedHashSet<>();
   private final Set<PValue> multiReads = new LinkedHashSet<>();
   private final Map<PValue, Object> pobjects = new LinkedHashMap<>();
-  private final Map<PValue, Iterable<? extends WindowedValue<?>>> pview = new LinkedHashMap<>();
   private AppliedPTransform<?, ?, ?> currentTransform;
+  private final SparkPCollectionView pviews = new SparkPCollectionView();
 
   public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) {
     this.jsc = jsc;
@@ -129,10 +129,6 @@ public class EvaluationContext {
     datasets.put((PValue) getOutput(transform), new UnboundedDataset<>(values, jssc, coder));
   }
 
-  void putPView(PValue view, Iterable<? extends WindowedValue<?>> value) {
-    pview.put(view, value);
-  }
-
   public Dataset borrowDataset(PTransform<?, ?> transform) {
     return borrowDataset((PValue) getInput(transform));
   }
@@ -149,10 +145,6 @@ public class EvaluationContext {
     return dataset;
   }
 
-  <T> Iterable<? extends WindowedValue<?>> getPCollectionView(PCollectionView<T> view) {
-    return pview.get(view);
-  }
-
   /**
    * Computes the outputs for all RDDs that are leaves in the DAG and do not have any actions (like
    * saving to a file) registered on them (i.e. they are performed for side effects).
@@ -199,6 +191,26 @@ public class EvaluationContext {
     return Iterables.transform(windowedValues, WindowingHelpers.<T>unwindowValueFunction());
   }
 
+  /**
+   * Retruns the current views creates in the pipepline.
+   * @return SparkPCollectionView
+   */
+  public SparkPCollectionView getPviews() {
+    return pviews;
+  }
+
+  /**
+   * Adds/Replaces a view to the current views creates in the pipepline.
+   * @param view - Identifier of the view
+   * @param value - Actual value of the view
+   * @param coder - Coder of the value
+   */
+  public void putPView(PCollectionView<?> view,
+      Iterable<WindowedValue<?>> value,
+      Coder<Iterable<WindowedValue<?>>> coder) {
+    pviews.putPView(view, value, coder, jsc);
+  }
+
   <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
     @SuppressWarnings("unchecked")
     BoundedDataset<T> boundedDataset = (BoundedDataset<T>) datasets.get(pcollection);
@@ -209,4 +221,5 @@ public class EvaluationContext {
   private String storageLevel() {
     return runtime.getPipelineOptions().as(SparkPipelineOptions.class).getStorageLevel();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/130c113e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
new file mode 100644
index 0000000..e888182
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+/**
+ * SparkPCollectionView is used to pass serialized views to lambdas.
+ */
+public class SparkPCollectionView implements Serializable {
+
+    // Holds the view --> broadcast mapping. Transient so it will be null from resume
+    private transient volatile Map<PCollectionView<?>, BroadcastHelper> broadcastHelperMap = null;
+
+    // Holds the Actual data of the views in serialize form
+    private Map<PCollectionView<?>,
+        Tuple2<byte[], Coder<Iterable<WindowedValue<?>>>>> pviews =
+            new LinkedHashMap<>();
+
+    // Driver only - during evaluation stage
+    void putPView(
+        PCollectionView<?> view,
+        Iterable<WindowedValue<?>> value,
+        Coder<Iterable<WindowedValue<?>>> coder,
+        JavaSparkContext context) {
+
+        pviews.put(view, new Tuple2<>(CoderHelpers.toByteArray(value, coder), coder));
+        // overwrite/create broadcast - Future improvement is to initialize the BH lazily
+        getPCollectionView(view, context, true);
+    }
+
+    BroadcastHelper getPCollectionView(
+        PCollectionView<?> view,
+        JavaSparkContext context) {
+        return getPCollectionView(view, context, false);
+    }
+
+    private BroadcastHelper getPCollectionView(
+        PCollectionView<?> view,
+        JavaSparkContext context,
+        boolean overwrite) {
+        // initialize broadcastHelperMap if needed
+        if (broadcastHelperMap == null) {
+            synchronized (SparkPCollectionView.class) {
+                if (broadcastHelperMap == null) {
+                    broadcastHelperMap = new LinkedHashMap<>();
+                }
+            }
+        }
+
+        //lazily broadcast views
+        BroadcastHelper helper = broadcastHelperMap.get(view);
+        if (helper == null) {
+            synchronized (SparkPCollectionView.class) {
+                helper = broadcastHelperMap.get(view);
+                if (helper == null) {
+                    helper = createBroadcastHelper(view, context);
+                }
+            }
+        } else if (overwrite) {
+            synchronized (SparkPCollectionView.class) {
+                // Currently unsynchronized unpersist, if needed can be changed to blocking
+                helper.unpersist();
+                helper = createBroadcastHelper(view, context);
+            }
+        }
+        return helper;
+    }
+
+    private BroadcastHelper createBroadcastHelper(
+        PCollectionView<?> view,
+        JavaSparkContext context) {
+        Tuple2<byte[], Coder<Iterable<WindowedValue<?>>>> tuple2 = pviews.get(view);
+        BroadcastHelper helper = BroadcastHelper.create(tuple2._1, tuple2._2);
+        helper.broadcast(context);
+        broadcastHelperMap.put(view, helper);
+        return helper;
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/130c113e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 5dd6beb..0cf3dc6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -65,6 +65,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
@@ -529,7 +530,15 @@ public final class TransformTranslator {
       public void evaluate(View.AsSingleton<T> transform, EvaluationContext context) {
         Iterable<? extends WindowedValue<?>> iter =
             context.getWindowedValues(context.getInput(transform));
-        context.putPView(context.getOutput(transform), iter);
+        PCollectionView<T> output = context.getOutput(transform);
+        Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
+
+        @SuppressWarnings("unchecked")
+        Iterable<WindowedValue<?>> iterCast =  (Iterable<WindowedValue<?>>) iter;
+
+        context.putPView(output,
+            iterCast,
+            coderInternal);
       }
     };
   }
@@ -540,7 +549,15 @@ public final class TransformTranslator {
       public void evaluate(View.AsIterable<T> transform, EvaluationContext context) {
         Iterable<? extends WindowedValue<?>> iter =
             context.getWindowedValues(context.getInput(transform));
-        context.putPView(context.getOutput(transform), iter);
+        PCollectionView<Iterable<T>> output = context.getOutput(transform);
+        Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
+
+        @SuppressWarnings("unchecked")
+        Iterable<WindowedValue<?>> iterCast =  (Iterable<WindowedValue<?>>) iter;
+
+        context.putPView(output,
+            iterCast,
+            coderInternal);
       }
     };
   }
@@ -553,7 +570,15 @@ public final class TransformTranslator {
                            EvaluationContext context) {
         Iterable<? extends WindowedValue<?>> iter =
             context.getWindowedValues(context.getInput(transform));
-        context.putPView(context.getOutput(transform), iter);
+        PCollectionView<WriteT> output = context.getOutput(transform);
+        Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
+
+        @SuppressWarnings("unchecked")
+        Iterable<WindowedValue<?>> iterCast =  (Iterable<WindowedValue<?>>) iter;
+
+        context.putPView(output,
+            iterCast,
+            coderInternal);
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/130c113e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index eddc771..ae9cb3e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.util.BroadcastHelper;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
@@ -42,6 +41,7 @@ import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFunction;
@@ -191,8 +191,22 @@ public final class TranslationUtils {
    * @param context The {@link EvaluationContext}.
    * @return a map of tagged {@link BroadcastHelper}s and their {@link WindowingStrategy}.
    */
+  static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
+  getSideInputs(List<PCollectionView<?>> views, EvaluationContext context) {
+    return getSideInputs(views, context.getSparkContext(), context.getPviews());
+  }
+
+  /**
+   * Create SideInputs as Broadcast variables.
+   *
+   * @param views   The {@link PCollectionView}s.
+   * @param context The {@link JavaSparkContext}.
+   * @param pviews  The {@link SparkPCollectionView}.
+   * @return a map of tagged {@link BroadcastHelper}s and their {@link WindowingStrategy}.
+   */
   public static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
-      getSideInputs(List<PCollectionView<?>> views, EvaluationContext context) {
+  getSideInputs(List<PCollectionView<?>> views, JavaSparkContext context,
+                SparkPCollectionView pviews) {
 
     if (views == null) {
       return ImmutableMap.of();
@@ -200,14 +214,8 @@ public final class TranslationUtils {
       Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
           Maps.newHashMap();
       for (PCollectionView<?> view : views) {
-        Iterable<? extends WindowedValue<?>> collectionView = context.getPCollectionView(view);
-        Coder<Iterable<WindowedValue<?>>> coderInternal = view.getCoderInternal();
+        BroadcastHelper helper = pviews.getPCollectionView(view, context);
         WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal();
-        @SuppressWarnings("unchecked")
-        BroadcastHelper<?> helper =
-            BroadcastHelper.create((Iterable<WindowedValue<?>>) collectionView, coderInternal);
-        //broadcast side inputs
-        helper.broadcast(context.getSparkContext());
         sideInputs.put(view.getTagInternal(),
             KV.<WindowingStrategy<?, ?>, BroadcastHelper<?>>of(windowingStrategy, helper));
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/130c113e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 070ccbb..0b2b4d6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -37,6 +37,7 @@ import org.apache.beam.runners.spark.translation.GroupCombineFunctions;
 import org.apache.beam.runners.spark.translation.MultiDoFnFunction;
 import org.apache.beam.runners.spark.translation.SparkAssignWindowFn;
 import org.apache.beam.runners.spark.translation.SparkKeyedCombineFn;
+import org.apache.beam.runners.spark.translation.SparkPCollectionView;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.runners.spark.translation.TransformEvaluator;
@@ -238,12 +239,12 @@ final class StreamingTransformTranslator {
     return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>() {
       @SuppressWarnings("unchecked")
       @Override
-      public void evaluate(Combine.GroupedValues<K, InputT, OutputT> transform,
+      public void evaluate(final Combine.GroupedValues<K, InputT, OutputT> transform,
                            EvaluationContext context) {
         // get the applied combine function.
         PCollection<? extends KV<K, ? extends Iterable<InputT>>> input =
             context.getInput(transform);
-        WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+        final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
         final CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT> fn =
             (CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT>)
                 CombineFnUtil.toFnWithContext(transform.getFn());
@@ -252,13 +253,27 @@ final class StreamingTransformTranslator {
             ((UnboundedDataset<KV<K, Iterable<InputT>>>) context.borrowDataset(transform))
                 .getDStream();
 
-        SparkKeyedCombineFn<K, InputT, ?, OutputT> combineFnWithContext =
-            new SparkKeyedCombineFn<>(fn, context.getRuntimeContext(),
-                TranslationUtils.getSideInputs(transform.getSideInputs(), context),
-                windowingStrategy);
-        context.putDataset(transform, new UnboundedDataset<>(dStream.map(new TranslationUtils
-            .CombineGroupedValues<>(
-            combineFnWithContext))));
+        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
+        final SparkPCollectionView pviews = context.getPviews();
+
+        JavaDStream<WindowedValue<KV<K, OutputT>>> outStream = dStream.transform(
+            new Function<JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>>,
+                         JavaRDD<WindowedValue<KV<K, OutputT>>>>() {
+                @Override
+                public JavaRDD<WindowedValue<KV<K, OutputT>>>
+                    call(JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>> rdd)
+                        throws Exception {
+                        SparkKeyedCombineFn<K, InputT, ?, OutputT> combineFnWithContext =
+                            new SparkKeyedCombineFn<>(fn, runtimeContext,
+                                TranslationUtils.getSideInputs(transform.getSideInputs(),
+                                new JavaSparkContext(rdd.context()), pviews),
+                                windowingStrategy);
+                    return rdd.map(
+                        new TranslationUtils.CombineGroupedValues<>(combineFnWithContext));
+                  }
+                });
+
+        context.putDataset(transform, new UnboundedDataset<>(outStream));
       }
     };
   }
@@ -269,7 +284,8 @@ final class StreamingTransformTranslator {
 
       @SuppressWarnings("unchecked")
       @Override
-      public void evaluate(Combine.Globally<InputT, OutputT> transform, EvaluationContext context) {
+      public void evaluate(final Combine.Globally<InputT, OutputT> transform,
+                           EvaluationContext context) {
         final PCollection<InputT> input = context.getInput(transform);
         // serializable arguments to pass.
         final Coder<InputT> iCoder = context.getInput(transform).getCoder();
@@ -279,9 +295,8 @@ final class StreamingTransformTranslator {
                 CombineFnUtil.toFnWithContext(transform.getFn());
         final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
-        final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
-            TranslationUtils.getSideInputs(transform.getSideInputs(), context);
         final boolean hasDefault = transform.isInsertDefault();
+        final SparkPCollectionView pviews = context.getPviews();
 
         JavaDStream<WindowedValue<InputT>> dStream =
             ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream();
@@ -291,6 +306,10 @@ final class StreamingTransformTranslator {
           @Override
           public JavaRDD<WindowedValue<OutputT>> call(JavaRDD<WindowedValue<InputT>> rdd)
               throws Exception {
+            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+                TranslationUtils.getSideInputs(transform.getSideInputs(),
+                    JavaSparkContext.fromSparkContext(rdd.context()),
+                    pviews);
             return GroupCombineFunctions.combineGlobally(rdd, combineFn, iCoder, oCoder,
                 runtimeContext, windowingStrategy, sideInputs, hasDefault);
           }
@@ -317,8 +336,7 @@ final class StreamingTransformTranslator {
                 CombineFnUtil.toFnWithContext(transform.getFn());
         final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
-        final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
-            TranslationUtils.getSideInputs(transform.getSideInputs(), context);
+        final SparkPCollectionView pviews = context.getPviews();
 
         JavaDStream<WindowedValue<KV<K, InputT>>> dStream =
             ((UnboundedDataset<KV<K, InputT>>) context.borrowDataset(transform)).getDStream();
@@ -329,6 +347,10 @@ final class StreamingTransformTranslator {
           @Override
           public JavaRDD<WindowedValue<KV<K, OutputT>>> call(
               JavaRDD<WindowedValue<KV<K, InputT>>> rdd) throws Exception {
+            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+                TranslationUtils.getSideInputs(transform.getSideInputs(),
+                    JavaSparkContext.fromSparkContext(rdd.context()),
+                    pviews);
             return GroupCombineFunctions.combinePerKey(rdd, combineFn, inputCoder, runtimeContext,
                 windowingStrategy, sideInputs);
           }
@@ -347,10 +369,10 @@ final class StreamingTransformTranslator {
         final DoFn<InputT, OutputT> doFn = transform.getFn();
         rejectStateAndTimers(doFn);
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
-        final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
-            TranslationUtils.getSideInputs(transform.getSideInputs(), context);
         final WindowingStrategy<?, ?> windowingStrategy =
             context.getInput(transform).getWindowingStrategy();
+        final SparkPCollectionView pviews = context.getPviews();
+
         JavaDStream<WindowedValue<InputT>> dStream =
             ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream();
 
@@ -360,8 +382,14 @@ final class StreamingTransformTranslator {
           @Override
           public JavaRDD<WindowedValue<OutputT>> call(JavaRDD<WindowedValue<InputT>> rdd) throws
               Exception {
+            final JavaSparkContext jsc = new JavaSparkContext(rdd.context());
+
             final Accumulator<NamedAggregators> accum =
-                SparkAggregators.getNamedAggregators(new JavaSparkContext(rdd.context()));
+                SparkAggregators.getNamedAggregators(jsc);
+
+            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+                TranslationUtils.getSideInputs(transform.getSideInputs(),
+                    jsc, pviews);
             return rdd.mapPartitions(
                 new DoFnFunction<>(accum, doFn, runtimeContext, sideInputs, windowingStrategy));
           }
@@ -381,8 +409,7 @@ final class StreamingTransformTranslator {
         final DoFn<InputT, OutputT> doFn = transform.getFn();
         rejectStateAndTimers(doFn);
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
-        final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
-            TranslationUtils.getSideInputs(transform.getSideInputs(), context);
+        final SparkPCollectionView pviews = context.getPviews();
         final WindowingStrategy<?, ?> windowingStrategy =
             context.getInput(transform).getWindowingStrategy();
         @SuppressWarnings("unchecked")
@@ -396,8 +423,12 @@ final class StreamingTransformTranslator {
               JavaRDD<WindowedValue<InputT>> rdd) throws Exception {
             final Accumulator<NamedAggregators> accum =
                 SparkAggregators.getNamedAggregators(new JavaSparkContext(rdd.context()));
-            return rdd.mapPartitionsToPair(new MultiDoFnFunction<>(accum, doFn,
-                runtimeContext, transform.getMainOutputTag(), sideInputs, windowingStrategy));
+
+            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+                TranslationUtils.getSideInputs(transform.getSideInputs(),
+                    JavaSparkContext.fromSparkContext(rdd.context()), pviews);
+              return rdd.mapPartitionsToPair(new MultiDoFnFunction<>(accum, doFn,
+                  runtimeContext, transform.getMainOutputTag(), sideInputs, windowingStrategy));
           }
         }).cache();
         PCollectionTuple pct = context.getOutput(transform);

http://git-wip-us.apache.org/repos/asf/beam/blob/130c113e/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
index 5c13b80..946f786 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.spark.util;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.Serializable;
-import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
@@ -31,7 +30,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Broadcast helper.
  */
-public abstract class BroadcastHelper<T> implements Serializable {
+public class BroadcastHelper<T> implements Serializable {
 
   /**
    * If the property {@code beam.spark.directBroadcast} is set to
@@ -39,89 +38,45 @@ public abstract class BroadcastHelper<T> implements Serializable {
    * in View objects. By default this property is not set, and values are coded using
    * the appropriate {@link Coder}.
    */
-  public static final String DIRECT_BROADCAST = "beam.spark.directBroadcast";
-
   private static final Logger LOG = LoggerFactory.getLogger(BroadcastHelper.class);
-
-  public static <T> BroadcastHelper<T> create(T value, Coder<T> coder) {
-    if (Boolean.parseBoolean(System.getProperty(DIRECT_BROADCAST, "false"))) {
-      return new DirectBroadcastHelper<>(value);
-    }
-    return new CodedBroadcastHelper<>(value, coder);
+  private Broadcast<byte[]> bcast;
+  private final Coder<T> coder;
+  private transient T value;
+  private transient byte[] bytes = null;
+
+  private BroadcastHelper(byte[] bytes, Coder<T> coder) {
+    this.bytes = bytes;
+    this.coder = coder;
   }
 
-  public abstract T getValue();
-
-  public abstract void broadcast(JavaSparkContext jsc);
-
-  /**
-   * A {@link BroadcastHelper} that relies on the underlying
-   * Spark serialization (Kryo) to broadcast values. This is appropriate when
-   * broadcasting very large values, since no copy of the object is made.
-   * @param <T> the type of the value stored in the broadcast variable
-   */
-  static class DirectBroadcastHelper<T> extends BroadcastHelper<T> {
-    private Broadcast<T> bcast;
-    private transient T value;
-
-    DirectBroadcastHelper(T value) {
-      this.value = value;
-    }
-
-    @Override
-    public synchronized T getValue() {
-      if (value == null) {
-        value = bcast.getValue();
-      }
-      return value;
-    }
-
-    @Override
-    public void broadcast(JavaSparkContext jsc) {
-      this.bcast = jsc.broadcast(value);
-    }
+  public static <T> BroadcastHelper<T> create(byte[] bytes, Coder<T> coder) {
+    return new BroadcastHelper<>(bytes, coder);
   }
 
-  /**
-   * A {@link BroadcastHelper} that uses a
-   * {@link Coder} to encode values as byte arrays
-   * before broadcasting.
-   * @param <T> the type of the value stored in the broadcast variable
-   */
-  static class CodedBroadcastHelper<T> extends BroadcastHelper<T> {
-    private Broadcast<byte[]> bcast;
-    private final Coder<T> coder;
-    private transient T value;
-
-    CodedBroadcastHelper(T value, Coder<T> coder) {
-      this.value = value;
-      this.coder = coder;
+  public synchronized T getValue() {
+    if (value == null) {
+       value = deserialize();
     }
+    return value;
+  }
 
-    @Override
-    public synchronized T getValue() {
-      if (value == null) {
-        value = deserialize();
-      }
-      return value;
-    }
+  public void broadcast(JavaSparkContext jsc) {
+    this.bcast = jsc.broadcast(bytes);
+  }
 
-    @Override
-    public void broadcast(JavaSparkContext jsc) {
-      this.bcast = jsc.broadcast(CoderHelpers.toByteArray(value, coder));
-    }
+  public void unpersist() {
+    this.bcast.unpersist();
+  }
 
-    private T deserialize() {
-      T val;
-      try {
-        val = coder.decode(new ByteArrayInputStream(bcast.value()),
-            new Coder.Context(true));
-      } catch (IOException ioe) {
-        // this should not ever happen, log it if it does.
-        LOG.warn(ioe.getMessage());
-        val = null;
-      }
-      return val;
+  private T deserialize() {
+    T val;
+    try {
+      val = coder.decode(new ByteArrayInputStream(bcast.value()), new Coder.Context(true));
+    } catch (IOException ioe) {
+      // this should not ever happen, log it if it does.
+      LOG.warn(ioe.getMessage());
+      val = null;
     }
+    return val;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/130c113e/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
index 0a804ae..8167ee0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
@@ -61,6 +61,7 @@ public class SparkSideInputReader implements SideInputReader {
     //--- match the appropriate sideInput window.
     // a tag will point to all matching sideInputs, that is all windows.
     // now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it.
+    @SuppressWarnings("unchecked")
     Iterable<WindowedValue<?>> availableSideInputs =
         (Iterable<WindowedValue<?>>) windowedBroadcastHelper.getValue().getValue();
     Iterable<WindowedValue<?>> sideInputForWindow =

http://git-wip-us.apache.org/repos/asf/beam/blob/130c113e/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index ab04c5c..352a7d8 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -23,7 +23,9 @@ import static org.junit.Assert.assertThat;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
@@ -37,19 +39,23 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.joda.time.Duration;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -163,8 +169,21 @@ public class ResumeFromCheckpointStreamingTest {
     Duration windowDuration = new Duration(options.getBatchIntervalMillis());
 
     Pipeline p = Pipeline.create(options);
+
+    PCollection<String> expectedCol = p.apply(Create.of(EXPECTED).withCoder(StringUtf8Coder.of()));
+    final PCollectionView<List<String>> expectedView = expectedCol.apply(View.<String>asList());
+
     PCollection<String> formattedKV =
         p.apply(read.withoutMetadata())
+          .apply(ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
+               @ProcessElement
+               public void process(ProcessContext c) {
+
+                  // Check side input is passed correctly
+                  Assert.assertEquals(c.sideInput(expectedView), Arrays.asList(EXPECTED));
+                  c.output(c.element());
+                }
+          }).withSideInputs(expectedView))
         .apply(Window.<KV<String, String>>into(FixedWindows.of(windowDuration)))
         .apply(ParDo.of(new FormatAsText()));
 


[3/3] beam git commit: This closes #1624

Posted by am...@apache.org.
This closes #1624


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c1b7f869
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c1b7f869
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c1b7f869

Branch: refs/heads/master
Commit: c1b7f8695a599981e549fad598f36a1559971859
Parents: fe7fc29 662934b
Author: Sela <an...@paypal.com>
Authored: Tue Jan 10 19:13:37 2017 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Jan 10 19:13:37 2017 +0200

----------------------------------------------------------------------
 .../runners/spark/translation/DoFnFunction.java |   6 +-
 .../spark/translation/EvaluationContext.java    |  34 +++--
 .../translation/GroupCombineFunctions.java      |   8 +-
 .../spark/translation/MultiDoFnFunction.java    |   7 +-
 .../translation/SparkAbstractCombineFn.java     |  12 +-
 .../spark/translation/SparkGlobalCombineFn.java |  13 +-
 .../spark/translation/SparkKeyedCombineFn.java  |  13 +-
 .../spark/translation/SparkPCollectionView.java |  99 +++++++++++++++
 .../spark/translation/TransformTranslator.java  |  33 ++++-
 .../spark/translation/TranslationUtils.java     |  37 ++++--
 .../streaming/StreamingTransformTranslator.java |  75 +++++++----
 .../runners/spark/util/BroadcastHelper.java     | 127 -------------------
 .../runners/spark/util/SideInputBroadcast.java  |  77 +++++++++++
 .../spark/util/SparkSideInputReader.java        |   8 +-
 .../ResumeFromCheckpointStreamingTest.java      |  18 +++
 .../src/main/resources/beam/findbugs-filter.xml |  26 ----
 16 files changed, 353 insertions(+), 240 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: Create broadcast lazily

Posted by am...@apache.org.
Create broadcast lazily

Fix Amit's comments + rename BroadcastHelper to SideInputBroadcast


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/662934b1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/662934b1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/662934b1

Branch: refs/heads/master
Commit: 662934b1e88c9697585b05e29d9e7b4a34fc6943
Parents: 130c113
Author: ksalant <ks...@payapal.com>
Authored: Thu Jan 5 09:40:50 2017 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Jan 10 19:12:25 2017 +0200

----------------------------------------------------------------------
 .../runners/spark/translation/DoFnFunction.java |  6 +-
 .../spark/translation/EvaluationContext.java    | 11 ++-
 .../translation/GroupCombineFunctions.java      |  8 +-
 .../spark/translation/MultiDoFnFunction.java    |  7 +-
 .../translation/SparkAbstractCombineFn.java     | 12 +--
 .../spark/translation/SparkGlobalCombineFn.java | 13 ++--
 .../spark/translation/SparkKeyedCombineFn.java  | 13 ++--
 .../spark/translation/SparkPCollectionView.java | 42 +++++-----
 .../spark/translation/TransformTranslator.java  | 20 ++---
 .../spark/translation/TranslationUtils.java     | 25 +++---
 .../streaming/StreamingTransformTranslator.java | 20 ++---
 .../runners/spark/util/BroadcastHelper.java     | 82 --------------------
 .../runners/spark/util/SideInputBroadcast.java  | 77 ++++++++++++++++++
 .../spark/util/SparkSideInputReader.java        |  9 +--
 .../ResumeFromCheckpointStreamingTest.java      |  3 +-
 .../src/main/resources/beam/findbugs-filter.xml | 26 -------
 16 files changed, 166 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index 6a641b5..af8e089 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -27,7 +27,7 @@ import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -50,7 +50,7 @@ public class DoFnFunction<InputT, OutputT>
   private final Accumulator<NamedAggregators> accumulator;
   private final DoFn<InputT, OutputT> doFn;
   private final SparkRuntimeContext runtimeContext;
-  private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs;
+  private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
   private final WindowingStrategy<?, ?> windowingStrategy;
 
   /**
@@ -64,7 +64,7 @@ public class DoFnFunction<InputT, OutputT>
       Accumulator<NamedAggregators> accumulator,
       DoFn<InputT, OutputT> doFn,
       SparkRuntimeContext runtimeContext,
-      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs,
+      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
       WindowingStrategy<?, ?> windowingStrategy) {
 
     this.accumulator = accumulator;

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index b1a1142..0ad862d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -192,23 +192,26 @@ public class EvaluationContext {
   }
 
   /**
-   * Retruns the current views creates in the pipepline.
+   * Retrun the current views creates in the pipepline.
+   *
    * @return SparkPCollectionView
    */
-  public SparkPCollectionView getPviews() {
+  public SparkPCollectionView getPViews() {
     return pviews;
   }
 
   /**
    * Adds/Replaces a view to the current views creates in the pipepline.
+   *
    * @param view - Identifier of the view
    * @param value - Actual value of the view
    * @param coder - Coder of the value
    */
-  public void putPView(PCollectionView<?> view,
+  public void putPView(
+      PCollectionView<?> view,
       Iterable<WindowedValue<?>> value,
       Coder<Iterable<WindowedValue<?>>> coder) {
-    pviews.putPView(view, value, coder, jsc);
+    pviews.putPView(view, value, coder);
   }
 
   <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index 4875b0c..bb95065 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -24,8 +24,8 @@ import java.util.Map;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
 import org.apache.beam.runners.spark.util.ByteArray;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -102,7 +102,7 @@ public class GroupCombineFunctions {
                   final Coder<OutputT> oCoder,
                   final SparkRuntimeContext runtimeContext,
                   final WindowingStrategy<?, ?> windowingStrategy,
-                  final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
+                  final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
                       sideInputs,
                   boolean hasDefault) {
     // handle empty input RDD, which will natively skip the entire execution as Spark will not
@@ -190,8 +190,8 @@ public class GroupCombineFunctions {
                 final KvCoder<K, InputT> inputCoder,
                 final SparkRuntimeContext runtimeContext,
                 final WindowingStrategy<?, ?> windowingStrategy,
-                final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>,
-                    BroadcastHelper<?>>> sideInputs) {
+                final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
+                    sideInputs) {
     //--- coders.
     final Coder<K> keyCoder = inputCoder.getKeyCoder();
     final Coder<InputT> viCoder = inputCoder.getValueCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 8a55369..0f9417a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -29,7 +29,7 @@ import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -56,7 +56,7 @@ public class MultiDoFnFunction<InputT, OutputT>
   private final DoFn<InputT, OutputT> doFn;
   private final SparkRuntimeContext runtimeContext;
   private final TupleTag<OutputT> mainOutputTag;
-  private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs;
+  private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
   private final WindowingStrategy<?, ?> windowingStrategy;
 
   /**
@@ -72,8 +72,7 @@ public class MultiDoFnFunction<InputT, OutputT>
       DoFn<InputT, OutputT> doFn,
       SparkRuntimeContext runtimeContext,
       TupleTag<OutputT> mainOutputTag,
-      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>,
-      BroadcastHelper<?>>> sideInputs,
+      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
       WindowingStrategy<?, ?> windowingStrategy) {
 
     this.accumulator = accumulator;

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
index 6aeb0db..fa1c3fc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
@@ -29,7 +29,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineWithContext;
@@ -49,14 +49,14 @@ import org.apache.beam.sdk.values.TupleTag;
  */
 public class SparkAbstractCombineFn implements Serializable {
   protected final SparkRuntimeContext runtimeContext;
-  protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs;
+  protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
   protected final WindowingStrategy<?, ?> windowingStrategy;
 
 
-  public SparkAbstractCombineFn(SparkRuntimeContext runtimeContext,
-                                Map<TupleTag<?>,  KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
-                                    sideInputs,
-                                WindowingStrategy<?, ?> windowingStrategy) {
+  public SparkAbstractCombineFn(
+      SparkRuntimeContext runtimeContext,
+      Map<TupleTag<?>,  KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
+      WindowingStrategy<?, ?> windowingStrategy) {
     this.runtimeContext = runtimeContext;
     this.sideInputs = sideInputs;
     this.windowingStrategy = windowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
index 5339fb3..23f5d20 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -46,12 +46,11 @@ import org.joda.time.Instant;
 public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstractCombineFn {
   private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
 
-  public SparkGlobalCombineFn(CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT>
-                                  combineFn,
-                              SparkRuntimeContext runtimeContext,
-                              Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
-                                 sideInputs,
-                              WindowingStrategy<?, ?> windowingStrategy) {
+  public SparkGlobalCombineFn(
+      CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
+      SparkRuntimeContext runtimeContext,
+      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
+      WindowingStrategy<?, ?> windowingStrategy) {
     super(runtimeContext, sideInputs, windowingStrategy);
     this.combineFn = combineFn;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
index 910f7f0..b5d243f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -46,12 +46,11 @@ import org.joda.time.Instant;
 public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstractCombineFn {
   private final CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
 
-  public SparkKeyedCombineFn(CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT,
-                                 OutputT> combineFn,
-                             SparkRuntimeContext runtimeContext,
-                             Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
-                                 sideInputs,
-                             WindowingStrategy<?, ?> windowingStrategy) {
+  public SparkKeyedCombineFn(
+      CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn,
+      SparkRuntimeContext runtimeContext,
+      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
+      WindowingStrategy<?, ?> windowingStrategy) {
     super(runtimeContext, sideInputs, windowingStrategy);
     this.combineFn = combineFn;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
index e888182..f71cb6b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -34,7 +34,8 @@ import scala.Tuple2;
 public class SparkPCollectionView implements Serializable {
 
     // Holds the view --> broadcast mapping. Transient so it will be null from resume
-    private transient volatile Map<PCollectionView<?>, BroadcastHelper> broadcastHelperMap = null;
+    private transient volatile Map<PCollectionView<?>, SideInputBroadcast>
+        broadcastHelperMap = null;
 
     // Holds the Actual data of the views in serialize form
     private Map<PCollectionView<?>,
@@ -45,24 +46,25 @@ public class SparkPCollectionView implements Serializable {
     void putPView(
         PCollectionView<?> view,
         Iterable<WindowedValue<?>> value,
-        Coder<Iterable<WindowedValue<?>>> coder,
-        JavaSparkContext context) {
+        Coder<Iterable<WindowedValue<?>>> coder) {
 
         pviews.put(view, new Tuple2<>(CoderHelpers.toByteArray(value, coder), coder));
-        // overwrite/create broadcast - Future improvement is to initialize the BH lazily
-        getPCollectionView(view, context, true);
-    }
 
-    BroadcastHelper getPCollectionView(
-        PCollectionView<?> view,
-        JavaSparkContext context) {
-        return getPCollectionView(view, context, false);
+        // Currently unsynchronized unpersist, if needed can be changed to blocking
+        if (broadcastHelperMap != null) {
+            synchronized (SparkPCollectionView.class) {
+                SideInputBroadcast helper = broadcastHelperMap.get(view);
+                if (helper != null) {
+                    helper.unpersist();
+                    broadcastHelperMap.remove(view);
+                }
+            }
+        }
     }
 
-    private BroadcastHelper getPCollectionView(
+    SideInputBroadcast getPCollectionView(
         PCollectionView<?> view,
-        JavaSparkContext context,
-        boolean overwrite) {
+        JavaSparkContext context) {
         // initialize broadcastHelperMap if needed
         if (broadcastHelperMap == null) {
             synchronized (SparkPCollectionView.class) {
@@ -73,7 +75,7 @@ public class SparkPCollectionView implements Serializable {
         }
 
         //lazily broadcast views
-        BroadcastHelper helper = broadcastHelperMap.get(view);
+        SideInputBroadcast helper = broadcastHelperMap.get(view);
         if (helper == null) {
             synchronized (SparkPCollectionView.class) {
                 helper = broadcastHelperMap.get(view);
@@ -81,21 +83,15 @@ public class SparkPCollectionView implements Serializable {
                     helper = createBroadcastHelper(view, context);
                 }
             }
-        } else if (overwrite) {
-            synchronized (SparkPCollectionView.class) {
-                // Currently unsynchronized unpersist, if needed can be changed to blocking
-                helper.unpersist();
-                helper = createBroadcastHelper(view, context);
-            }
         }
         return helper;
     }
 
-    private BroadcastHelper createBroadcastHelper(
+    private SideInputBroadcast createBroadcastHelper(
         PCollectionView<?> view,
         JavaSparkContext context) {
         Tuple2<byte[], Coder<Iterable<WindowedValue<?>>>> tuple2 = pviews.get(view);
-        BroadcastHelper helper = BroadcastHelper.create(tuple2._1, tuple2._2);
+        SideInputBroadcast helper = SideInputBroadcast.create(tuple2._1, tuple2._2);
         helper.broadcast(context);
         broadcastHelperMap.put(view, helper);
         return helper;

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 0cf3dc6..3e941e4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -40,7 +40,7 @@ import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
 import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper;
 import org.apache.beam.runners.spark.io.hadoop.TemplatedAvroKeyOutputFormat;
 import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -178,7 +178,7 @@ public final class TransformTranslator {
                 CombineFnUtil.toFnWithContext(transform.getFn());
         final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
-        final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+        final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
             TranslationUtils.getSideInputs(transform.getSideInputs(), context);
         final boolean hasDefault = transform.isInsertDefault();
 
@@ -210,7 +210,7 @@ public final class TransformTranslator {
                 CombineFnUtil.toFnWithContext(transform.getFn());
         final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
-        final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+        final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
             TranslationUtils.getSideInputs(transform.getSideInputs(), context);
 
         @SuppressWarnings("unchecked")
@@ -237,7 +237,7 @@ public final class TransformTranslator {
             context.getInput(transform).getWindowingStrategy();
         Accumulator<NamedAggregators> accum =
             SparkAggregators.getNamedAggregators(context.getSparkContext());
-        Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+        Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
             TranslationUtils.getSideInputs(transform.getSideInputs(), context);
         context.putDataset(transform,
             new BoundedDataset<>(inRDD.mapPartitions(new DoFnFunction<>(accum, doFn,
@@ -536,9 +536,7 @@ public final class TransformTranslator {
         @SuppressWarnings("unchecked")
         Iterable<WindowedValue<?>> iterCast =  (Iterable<WindowedValue<?>>) iter;
 
-        context.putPView(output,
-            iterCast,
-            coderInternal);
+        context.putPView(output, iterCast, coderInternal);
       }
     };
   }
@@ -555,9 +553,7 @@ public final class TransformTranslator {
         @SuppressWarnings("unchecked")
         Iterable<WindowedValue<?>> iterCast =  (Iterable<WindowedValue<?>>) iter;
 
-        context.putPView(output,
-            iterCast,
-            coderInternal);
+        context.putPView(output, iterCast, coderInternal);
       }
     };
   }
@@ -576,9 +572,7 @@ public final class TransformTranslator {
         @SuppressWarnings("unchecked")
         Iterable<WindowedValue<?>> iterCast =  (Iterable<WindowedValue<?>>) iter;
 
-        context.putPView(output,
-            iterCast,
-            coderInternal);
+        context.putPView(output, iterCast, coderInternal);
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index ae9cb3e..965330c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
@@ -189,11 +189,11 @@ public final class TranslationUtils {
    *
    * @param views   The {@link PCollectionView}s.
    * @param context The {@link EvaluationContext}.
-   * @return a map of tagged {@link BroadcastHelper}s and their {@link WindowingStrategy}.
+   * @return a map of tagged {@link SideInputBroadcast}s and their {@link WindowingStrategy}.
    */
-  static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
+  static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
   getSideInputs(List<PCollectionView<?>> views, EvaluationContext context) {
-    return getSideInputs(views, context.getSparkContext(), context.getPviews());
+    return getSideInputs(views, context.getSparkContext(), context.getPViews());
   }
 
   /**
@@ -202,22 +202,23 @@ public final class TranslationUtils {
    * @param views   The {@link PCollectionView}s.
    * @param context The {@link JavaSparkContext}.
    * @param pviews  The {@link SparkPCollectionView}.
-   * @return a map of tagged {@link BroadcastHelper}s and their {@link WindowingStrategy}.
+   * @return a map of tagged {@link SideInputBroadcast}s and their {@link WindowingStrategy}.
    */
-  public static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
-  getSideInputs(List<PCollectionView<?>> views, JavaSparkContext context,
-                SparkPCollectionView pviews) {
-
+  public static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
+  getSideInputs(
+      List<PCollectionView<?>> views,
+      JavaSparkContext context,
+      SparkPCollectionView pviews) {
     if (views == null) {
       return ImmutableMap.of();
     } else {
-      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
           Maps.newHashMap();
       for (PCollectionView<?> view : views) {
-        BroadcastHelper helper = pviews.getPCollectionView(view, context);
+        SideInputBroadcast helper = pviews.getPCollectionView(view, context);
         WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal();
         sideInputs.put(view.getTagInternal(),
-            KV.<WindowingStrategy<?, ?>, BroadcastHelper<?>>of(windowingStrategy, helper));
+            KV.<WindowingStrategy<?, ?>, SideInputBroadcast<?>>of(windowingStrategy, helper));
       }
       return sideInputs;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 0b2b4d6..3c89b99 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -43,7 +43,7 @@ import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TranslationUtils;
 import org.apache.beam.runners.spark.translation.WindowingHelpers;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.Read;
@@ -254,7 +254,7 @@ final class StreamingTransformTranslator {
                 .getDStream();
 
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
-        final SparkPCollectionView pviews = context.getPviews();
+        final SparkPCollectionView pviews = context.getPViews();
 
         JavaDStream<WindowedValue<KV<K, OutputT>>> outStream = dStream.transform(
             new Function<JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>>,
@@ -296,7 +296,7 @@ final class StreamingTransformTranslator {
         final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final boolean hasDefault = transform.isInsertDefault();
-        final SparkPCollectionView pviews = context.getPviews();
+        final SparkPCollectionView pviews = context.getPViews();
 
         JavaDStream<WindowedValue<InputT>> dStream =
             ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream();
@@ -306,7 +306,7 @@ final class StreamingTransformTranslator {
           @Override
           public JavaRDD<WindowedValue<OutputT>> call(JavaRDD<WindowedValue<InputT>> rdd)
               throws Exception {
-            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
                 TranslationUtils.getSideInputs(transform.getSideInputs(),
                     JavaSparkContext.fromSparkContext(rdd.context()),
                     pviews);
@@ -336,7 +336,7 @@ final class StreamingTransformTranslator {
                 CombineFnUtil.toFnWithContext(transform.getFn());
         final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
-        final SparkPCollectionView pviews = context.getPviews();
+        final SparkPCollectionView pviews = context.getPViews();
 
         JavaDStream<WindowedValue<KV<K, InputT>>> dStream =
             ((UnboundedDataset<KV<K, InputT>>) context.borrowDataset(transform)).getDStream();
@@ -347,7 +347,7 @@ final class StreamingTransformTranslator {
           @Override
           public JavaRDD<WindowedValue<KV<K, OutputT>>> call(
               JavaRDD<WindowedValue<KV<K, InputT>>> rdd) throws Exception {
-            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
                 TranslationUtils.getSideInputs(transform.getSideInputs(),
                     JavaSparkContext.fromSparkContext(rdd.context()),
                     pviews);
@@ -371,7 +371,7 @@ final class StreamingTransformTranslator {
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final WindowingStrategy<?, ?> windowingStrategy =
             context.getInput(transform).getWindowingStrategy();
-        final SparkPCollectionView pviews = context.getPviews();
+        final SparkPCollectionView pviews = context.getPViews();
 
         JavaDStream<WindowedValue<InputT>> dStream =
             ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream();
@@ -387,7 +387,7 @@ final class StreamingTransformTranslator {
             final Accumulator<NamedAggregators> accum =
                 SparkAggregators.getNamedAggregators(jsc);
 
-            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
                 TranslationUtils.getSideInputs(transform.getSideInputs(),
                     jsc, pviews);
             return rdd.mapPartitions(
@@ -409,7 +409,7 @@ final class StreamingTransformTranslator {
         final DoFn<InputT, OutputT> doFn = transform.getFn();
         rejectStateAndTimers(doFn);
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
-        final SparkPCollectionView pviews = context.getPviews();
+        final SparkPCollectionView pviews = context.getPViews();
         final WindowingStrategy<?, ?> windowingStrategy =
             context.getInput(transform).getWindowingStrategy();
         @SuppressWarnings("unchecked")
@@ -424,7 +424,7 @@ final class StreamingTransformTranslator {
             final Accumulator<NamedAggregators> accum =
                 SparkAggregators.getNamedAggregators(new JavaSparkContext(rdd.context()));
 
-            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
                 TranslationUtils.getSideInputs(transform.getSideInputs(),
                     JavaSparkContext.fromSparkContext(rdd.context()), pviews);
               return rdd.mapPartitionsToPair(new MultiDoFnFunction<>(accum, doFn,

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
deleted file mode 100644
index 946f786..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.util;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Broadcast helper.
- */
-public class BroadcastHelper<T> implements Serializable {
-
-  /**
-   * If the property {@code beam.spark.directBroadcast} is set to
-   * {@code true} then Spark serialization (Kryo) will be used to broadcast values
-   * in View objects. By default this property is not set, and values are coded using
-   * the appropriate {@link Coder}.
-   */
-  private static final Logger LOG = LoggerFactory.getLogger(BroadcastHelper.class);
-  private Broadcast<byte[]> bcast;
-  private final Coder<T> coder;
-  private transient T value;
-  private transient byte[] bytes = null;
-
-  private BroadcastHelper(byte[] bytes, Coder<T> coder) {
-    this.bytes = bytes;
-    this.coder = coder;
-  }
-
-  public static <T> BroadcastHelper<T> create(byte[] bytes, Coder<T> coder) {
-    return new BroadcastHelper<>(bytes, coder);
-  }
-
-  public synchronized T getValue() {
-    if (value == null) {
-       value = deserialize();
-    }
-    return value;
-  }
-
-  public void broadcast(JavaSparkContext jsc) {
-    this.bcast = jsc.broadcast(bytes);
-  }
-
-  public void unpersist() {
-    this.bcast.unpersist();
-  }
-
-  private T deserialize() {
-    T val;
-    try {
-      val = coder.decode(new ByteArrayInputStream(bcast.value()), new Coder.Context(true));
-    } catch (IOException ioe) {
-      // this should not ever happen, log it if it does.
-      LOG.warn(ioe.getMessage());
-      val = null;
-    }
-    return val;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java
new file mode 100644
index 0000000..1fd2ea8
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Broadcast helper for side inputs. Helps to do the transformation from
+ * bytes transform to broadcast transform to value by coder
+ */
+public class SideInputBroadcast<T> implements Serializable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SideInputBroadcast.class);
+  private Broadcast<byte[]> bcast;
+  private final Coder<T> coder;
+  private transient T value;
+  private transient byte[] bytes = null;
+
+  private SideInputBroadcast(byte[] bytes, Coder<T> coder) {
+    this.bytes = bytes;
+    this.coder = coder;
+  }
+
+  public static <T> SideInputBroadcast<T> create(byte[] bytes, Coder<T> coder) {
+    return new SideInputBroadcast<>(bytes, coder);
+  }
+
+  public synchronized T getValue() {
+    if (value == null) {
+       value = deserialize();
+    }
+    return value;
+  }
+
+  public void broadcast(JavaSparkContext jsc) {
+    this.bcast = jsc.broadcast(bytes);
+  }
+
+  public void unpersist() {
+    this.bcast.unpersist();
+  }
+
+  private T deserialize() {
+    T val;
+    try {
+      val = coder.decode(new ByteArrayInputStream(bcast.value()), new Coder.Context(true));
+    } catch (IOException ioe) {
+      // this should not ever happen, log it if it does.
+      LOG.warn(ioe.getMessage());
+      val = null;
+    }
+    return val;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
index 8167ee0..c8e9850 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
@@ -37,10 +37,10 @@ import org.apache.beam.sdk.values.TupleTag;
  * A {@link SideInputReader} for thw SparkRunner.
  */
 public class SparkSideInputReader implements SideInputReader {
-  private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs;
+  private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
 
-  public SparkSideInputReader(Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
-                                  sideInputs) {
+  public SparkSideInputReader(
+      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs) {
     this.sideInputs = sideInputs;
   }
 
@@ -49,7 +49,7 @@ public class SparkSideInputReader implements SideInputReader {
   public <T> T get(PCollectionView<T> view, BoundedWindow window) {
     //--- validate sideInput.
     checkNotNull(view, "The PCollectionView passed to sideInput cannot be null ");
-    KV<WindowingStrategy<?, ?>, BroadcastHelper<?>> windowedBroadcastHelper =
+    KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>> windowedBroadcastHelper =
         sideInputs.get(view.getTagInternal());
     checkNotNull(windowedBroadcastHelper, "SideInput for view " + view + " is not available.");
 
@@ -61,7 +61,6 @@ public class SparkSideInputReader implements SideInputReader {
     //--- match the appropriate sideInput window.
     // a tag will point to all matching sideInputs, that is all windows.
     // now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it.
-    @SuppressWarnings("unchecked")
     Iterable<WindowedValue<?>> availableSideInputs =
         (Iterable<WindowedValue<?>>) windowedBroadcastHelper.getValue().getValue();
     Iterable<WindowedValue<?>> sideInputForWindow =

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 352a7d8..7346bd9 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -178,8 +178,7 @@ public class ResumeFromCheckpointStreamingTest {
           .apply(ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
                @ProcessElement
                public void process(ProcessContext c) {
-
-                  // Check side input is passed correctly
+                  // Check side input is passed correctly also after resuming from checkpoint
                   Assert.assertEquals(c.sideInput(expectedView), Arrays.asList(EXPECTED));
                   c.output(c.element());
                 }

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index bfb4988..35b5ed3 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -125,32 +125,6 @@
   </Match>
 
   <Match>
-    <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$CodedBroadcastHelper"/>
-    <Or>
-      <Field name="bcast" />
-      <Field name="value" />
-    </Or>
-    <Bug pattern="IS2_INCONSISTENT_SYNC"/>
-    <!--
-      Spark's Broadcast variables are a distributed and cached objects
-      and should not be treated as "normal" objects.
-    -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$DirectBroadcastHelper"/>
-    <Or>
-      <Field name="bcast" />
-      <Field name="value" />
-    </Or>
-    <Bug pattern="IS2_INCONSISTENT_SYNC"/>
-    <!--
-      Spark's Broadcast variables are a distributed and cached objects
-      and should not be treated as "normal" objects.
-    -->
-  </Match>
-
-  <Match>
     <Class name="org.apache.beam.runners.spark.aggregators.metrics.sink.CsvSink"/>
     <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
     <!-- Intentionally overriding parent name because inheritors should replace the parent. -->