You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2017/01/10 17:27:18 UTC

[2/3] beam git commit: Create broadcast lazily

Create broadcast lazily

Fix Amit's comments + rename BroadcastHelper to SideInputBroadcast


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/662934b1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/662934b1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/662934b1

Branch: refs/heads/master
Commit: 662934b1e88c9697585b05e29d9e7b4a34fc6943
Parents: 130c113
Author: ksalant <ks...@payapal.com>
Authored: Thu Jan 5 09:40:50 2017 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Jan 10 19:12:25 2017 +0200

----------------------------------------------------------------------
 .../runners/spark/translation/DoFnFunction.java |  6 +-
 .../spark/translation/EvaluationContext.java    | 11 ++-
 .../translation/GroupCombineFunctions.java      |  8 +-
 .../spark/translation/MultiDoFnFunction.java    |  7 +-
 .../translation/SparkAbstractCombineFn.java     | 12 +--
 .../spark/translation/SparkGlobalCombineFn.java | 13 ++--
 .../spark/translation/SparkKeyedCombineFn.java  | 13 ++--
 .../spark/translation/SparkPCollectionView.java | 42 +++++-----
 .../spark/translation/TransformTranslator.java  | 20 ++---
 .../spark/translation/TranslationUtils.java     | 25 +++---
 .../streaming/StreamingTransformTranslator.java | 20 ++---
 .../runners/spark/util/BroadcastHelper.java     | 82 --------------------
 .../runners/spark/util/SideInputBroadcast.java  | 77 ++++++++++++++++++
 .../spark/util/SparkSideInputReader.java        |  9 +--
 .../ResumeFromCheckpointStreamingTest.java      |  3 +-
 .../src/main/resources/beam/findbugs-filter.xml | 26 -------
 16 files changed, 166 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index 6a641b5..af8e089 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -27,7 +27,7 @@ import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -50,7 +50,7 @@ public class DoFnFunction<InputT, OutputT>
   private final Accumulator<NamedAggregators> accumulator;
   private final DoFn<InputT, OutputT> doFn;
   private final SparkRuntimeContext runtimeContext;
-  private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs;
+  private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
   private final WindowingStrategy<?, ?> windowingStrategy;
 
   /**
@@ -64,7 +64,7 @@ public class DoFnFunction<InputT, OutputT>
       Accumulator<NamedAggregators> accumulator,
       DoFn<InputT, OutputT> doFn,
       SparkRuntimeContext runtimeContext,
-      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs,
+      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
       WindowingStrategy<?, ?> windowingStrategy) {
 
     this.accumulator = accumulator;

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index b1a1142..0ad862d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -192,23 +192,26 @@ public class EvaluationContext {
   }
 
   /**
-   * Retruns the current views creates in the pipepline.
+   * Retrun the current views creates in the pipepline.
+   *
    * @return SparkPCollectionView
    */
-  public SparkPCollectionView getPviews() {
+  public SparkPCollectionView getPViews() {
     return pviews;
   }
 
   /**
    * Adds/Replaces a view to the current views creates in the pipepline.
+   *
    * @param view - Identifier of the view
    * @param value - Actual value of the view
    * @param coder - Coder of the value
    */
-  public void putPView(PCollectionView<?> view,
+  public void putPView(
+      PCollectionView<?> view,
       Iterable<WindowedValue<?>> value,
       Coder<Iterable<WindowedValue<?>>> coder) {
-    pviews.putPView(view, value, coder, jsc);
+    pviews.putPView(view, value, coder);
   }
 
   <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index 4875b0c..bb95065 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -24,8 +24,8 @@ import java.util.Map;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
 import org.apache.beam.runners.spark.util.ByteArray;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -102,7 +102,7 @@ public class GroupCombineFunctions {
                   final Coder<OutputT> oCoder,
                   final SparkRuntimeContext runtimeContext,
                   final WindowingStrategy<?, ?> windowingStrategy,
-                  final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
+                  final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
                       sideInputs,
                   boolean hasDefault) {
     // handle empty input RDD, which will natively skip the entire execution as Spark will not
@@ -190,8 +190,8 @@ public class GroupCombineFunctions {
                 final KvCoder<K, InputT> inputCoder,
                 final SparkRuntimeContext runtimeContext,
                 final WindowingStrategy<?, ?> windowingStrategy,
-                final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>,
-                    BroadcastHelper<?>>> sideInputs) {
+                final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
+                    sideInputs) {
     //--- coders.
     final Coder<K> keyCoder = inputCoder.getKeyCoder();
     final Coder<InputT> viCoder = inputCoder.getValueCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 8a55369..0f9417a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -29,7 +29,7 @@ import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -56,7 +56,7 @@ public class MultiDoFnFunction<InputT, OutputT>
   private final DoFn<InputT, OutputT> doFn;
   private final SparkRuntimeContext runtimeContext;
   private final TupleTag<OutputT> mainOutputTag;
-  private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs;
+  private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
   private final WindowingStrategy<?, ?> windowingStrategy;
 
   /**
@@ -72,8 +72,7 @@ public class MultiDoFnFunction<InputT, OutputT>
       DoFn<InputT, OutputT> doFn,
       SparkRuntimeContext runtimeContext,
       TupleTag<OutputT> mainOutputTag,
-      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>,
-      BroadcastHelper<?>>> sideInputs,
+      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
       WindowingStrategy<?, ?> windowingStrategy) {
 
     this.accumulator = accumulator;

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
index 6aeb0db..fa1c3fc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
@@ -29,7 +29,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineWithContext;
@@ -49,14 +49,14 @@ import org.apache.beam.sdk.values.TupleTag;
  */
 public class SparkAbstractCombineFn implements Serializable {
   protected final SparkRuntimeContext runtimeContext;
-  protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs;
+  protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
   protected final WindowingStrategy<?, ?> windowingStrategy;
 
 
-  public SparkAbstractCombineFn(SparkRuntimeContext runtimeContext,
-                                Map<TupleTag<?>,  KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
-                                    sideInputs,
-                                WindowingStrategy<?, ?> windowingStrategy) {
+  public SparkAbstractCombineFn(
+      SparkRuntimeContext runtimeContext,
+      Map<TupleTag<?>,  KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
+      WindowingStrategy<?, ?> windowingStrategy) {
     this.runtimeContext = runtimeContext;
     this.sideInputs = sideInputs;
     this.windowingStrategy = windowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
index 5339fb3..23f5d20 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -46,12 +46,11 @@ import org.joda.time.Instant;
 public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstractCombineFn {
   private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
 
-  public SparkGlobalCombineFn(CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT>
-                                  combineFn,
-                              SparkRuntimeContext runtimeContext,
-                              Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
-                                 sideInputs,
-                              WindowingStrategy<?, ?> windowingStrategy) {
+  public SparkGlobalCombineFn(
+      CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
+      SparkRuntimeContext runtimeContext,
+      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
+      WindowingStrategy<?, ?> windowingStrategy) {
     super(runtimeContext, sideInputs, windowingStrategy);
     this.combineFn = combineFn;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
index 910f7f0..b5d243f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -46,12 +46,11 @@ import org.joda.time.Instant;
 public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstractCombineFn {
   private final CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
 
-  public SparkKeyedCombineFn(CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT,
-                                 OutputT> combineFn,
-                             SparkRuntimeContext runtimeContext,
-                             Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
-                                 sideInputs,
-                             WindowingStrategy<?, ?> windowingStrategy) {
+  public SparkKeyedCombineFn(
+      CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn,
+      SparkRuntimeContext runtimeContext,
+      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
+      WindowingStrategy<?, ?> windowingStrategy) {
     super(runtimeContext, sideInputs, windowingStrategy);
     this.combineFn = combineFn;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
index e888182..f71cb6b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -34,7 +34,8 @@ import scala.Tuple2;
 public class SparkPCollectionView implements Serializable {
 
     // Holds the view --> broadcast mapping. Transient so it will be null from resume
-    private transient volatile Map<PCollectionView<?>, BroadcastHelper> broadcastHelperMap = null;
+    private transient volatile Map<PCollectionView<?>, SideInputBroadcast>
+        broadcastHelperMap = null;
 
     // Holds the Actual data of the views in serialize form
     private Map<PCollectionView<?>,
@@ -45,24 +46,25 @@ public class SparkPCollectionView implements Serializable {
     void putPView(
         PCollectionView<?> view,
         Iterable<WindowedValue<?>> value,
-        Coder<Iterable<WindowedValue<?>>> coder,
-        JavaSparkContext context) {
+        Coder<Iterable<WindowedValue<?>>> coder) {
 
         pviews.put(view, new Tuple2<>(CoderHelpers.toByteArray(value, coder), coder));
-        // overwrite/create broadcast - Future improvement is to initialize the BH lazily
-        getPCollectionView(view, context, true);
-    }
 
-    BroadcastHelper getPCollectionView(
-        PCollectionView<?> view,
-        JavaSparkContext context) {
-        return getPCollectionView(view, context, false);
+        // Currently unsynchronized unpersist, if needed can be changed to blocking
+        if (broadcastHelperMap != null) {
+            synchronized (SparkPCollectionView.class) {
+                SideInputBroadcast helper = broadcastHelperMap.get(view);
+                if (helper != null) {
+                    helper.unpersist();
+                    broadcastHelperMap.remove(view);
+                }
+            }
+        }
     }
 
-    private BroadcastHelper getPCollectionView(
+    SideInputBroadcast getPCollectionView(
         PCollectionView<?> view,
-        JavaSparkContext context,
-        boolean overwrite) {
+        JavaSparkContext context) {
         // initialize broadcastHelperMap if needed
         if (broadcastHelperMap == null) {
             synchronized (SparkPCollectionView.class) {
@@ -73,7 +75,7 @@ public class SparkPCollectionView implements Serializable {
         }
 
         //lazily broadcast views
-        BroadcastHelper helper = broadcastHelperMap.get(view);
+        SideInputBroadcast helper = broadcastHelperMap.get(view);
         if (helper == null) {
             synchronized (SparkPCollectionView.class) {
                 helper = broadcastHelperMap.get(view);
@@ -81,21 +83,15 @@ public class SparkPCollectionView implements Serializable {
                     helper = createBroadcastHelper(view, context);
                 }
             }
-        } else if (overwrite) {
-            synchronized (SparkPCollectionView.class) {
-                // Currently unsynchronized unpersist, if needed can be changed to blocking
-                helper.unpersist();
-                helper = createBroadcastHelper(view, context);
-            }
         }
         return helper;
     }
 
-    private BroadcastHelper createBroadcastHelper(
+    private SideInputBroadcast createBroadcastHelper(
         PCollectionView<?> view,
         JavaSparkContext context) {
         Tuple2<byte[], Coder<Iterable<WindowedValue<?>>>> tuple2 = pviews.get(view);
-        BroadcastHelper helper = BroadcastHelper.create(tuple2._1, tuple2._2);
+        SideInputBroadcast helper = SideInputBroadcast.create(tuple2._1, tuple2._2);
         helper.broadcast(context);
         broadcastHelperMap.put(view, helper);
         return helper;

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 0cf3dc6..3e941e4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -40,7 +40,7 @@ import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
 import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper;
 import org.apache.beam.runners.spark.io.hadoop.TemplatedAvroKeyOutputFormat;
 import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -178,7 +178,7 @@ public final class TransformTranslator {
                 CombineFnUtil.toFnWithContext(transform.getFn());
         final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
-        final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+        final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
             TranslationUtils.getSideInputs(transform.getSideInputs(), context);
         final boolean hasDefault = transform.isInsertDefault();
 
@@ -210,7 +210,7 @@ public final class TransformTranslator {
                 CombineFnUtil.toFnWithContext(transform.getFn());
         final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
-        final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+        final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
             TranslationUtils.getSideInputs(transform.getSideInputs(), context);
 
         @SuppressWarnings("unchecked")
@@ -237,7 +237,7 @@ public final class TransformTranslator {
             context.getInput(transform).getWindowingStrategy();
         Accumulator<NamedAggregators> accum =
             SparkAggregators.getNamedAggregators(context.getSparkContext());
-        Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+        Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
             TranslationUtils.getSideInputs(transform.getSideInputs(), context);
         context.putDataset(transform,
             new BoundedDataset<>(inRDD.mapPartitions(new DoFnFunction<>(accum, doFn,
@@ -536,9 +536,7 @@ public final class TransformTranslator {
         @SuppressWarnings("unchecked")
         Iterable<WindowedValue<?>> iterCast =  (Iterable<WindowedValue<?>>) iter;
 
-        context.putPView(output,
-            iterCast,
-            coderInternal);
+        context.putPView(output, iterCast, coderInternal);
       }
     };
   }
@@ -555,9 +553,7 @@ public final class TransformTranslator {
         @SuppressWarnings("unchecked")
         Iterable<WindowedValue<?>> iterCast =  (Iterable<WindowedValue<?>>) iter;
 
-        context.putPView(output,
-            iterCast,
-            coderInternal);
+        context.putPView(output, iterCast, coderInternal);
       }
     };
   }
@@ -576,9 +572,7 @@ public final class TransformTranslator {
         @SuppressWarnings("unchecked")
         Iterable<WindowedValue<?>> iterCast =  (Iterable<WindowedValue<?>>) iter;
 
-        context.putPView(output,
-            iterCast,
-            coderInternal);
+        context.putPView(output, iterCast, coderInternal);
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index ae9cb3e..965330c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
@@ -189,11 +189,11 @@ public final class TranslationUtils {
    *
    * @param views   The {@link PCollectionView}s.
    * @param context The {@link EvaluationContext}.
-   * @return a map of tagged {@link BroadcastHelper}s and their {@link WindowingStrategy}.
+   * @return a map of tagged {@link SideInputBroadcast}s and their {@link WindowingStrategy}.
    */
-  static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
+  static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
   getSideInputs(List<PCollectionView<?>> views, EvaluationContext context) {
-    return getSideInputs(views, context.getSparkContext(), context.getPviews());
+    return getSideInputs(views, context.getSparkContext(), context.getPViews());
   }
 
   /**
@@ -202,22 +202,23 @@ public final class TranslationUtils {
    * @param views   The {@link PCollectionView}s.
    * @param context The {@link JavaSparkContext}.
    * @param pviews  The {@link SparkPCollectionView}.
-   * @return a map of tagged {@link BroadcastHelper}s and their {@link WindowingStrategy}.
+   * @return a map of tagged {@link SideInputBroadcast}s and their {@link WindowingStrategy}.
    */
-  public static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
-  getSideInputs(List<PCollectionView<?>> views, JavaSparkContext context,
-                SparkPCollectionView pviews) {
-
+  public static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
+  getSideInputs(
+      List<PCollectionView<?>> views,
+      JavaSparkContext context,
+      SparkPCollectionView pviews) {
     if (views == null) {
       return ImmutableMap.of();
     } else {
-      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
           Maps.newHashMap();
       for (PCollectionView<?> view : views) {
-        BroadcastHelper helper = pviews.getPCollectionView(view, context);
+        SideInputBroadcast helper = pviews.getPCollectionView(view, context);
         WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal();
         sideInputs.put(view.getTagInternal(),
-            KV.<WindowingStrategy<?, ?>, BroadcastHelper<?>>of(windowingStrategy, helper));
+            KV.<WindowingStrategy<?, ?>, SideInputBroadcast<?>>of(windowingStrategy, helper));
       }
       return sideInputs;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 0b2b4d6..3c89b99 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -43,7 +43,7 @@ import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TranslationUtils;
 import org.apache.beam.runners.spark.translation.WindowingHelpers;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.Read;
@@ -254,7 +254,7 @@ final class StreamingTransformTranslator {
                 .getDStream();
 
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
-        final SparkPCollectionView pviews = context.getPviews();
+        final SparkPCollectionView pviews = context.getPViews();
 
         JavaDStream<WindowedValue<KV<K, OutputT>>> outStream = dStream.transform(
             new Function<JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>>,
@@ -296,7 +296,7 @@ final class StreamingTransformTranslator {
         final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final boolean hasDefault = transform.isInsertDefault();
-        final SparkPCollectionView pviews = context.getPviews();
+        final SparkPCollectionView pviews = context.getPViews();
 
         JavaDStream<WindowedValue<InputT>> dStream =
             ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream();
@@ -306,7 +306,7 @@ final class StreamingTransformTranslator {
           @Override
           public JavaRDD<WindowedValue<OutputT>> call(JavaRDD<WindowedValue<InputT>> rdd)
               throws Exception {
-            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
                 TranslationUtils.getSideInputs(transform.getSideInputs(),
                     JavaSparkContext.fromSparkContext(rdd.context()),
                     pviews);
@@ -336,7 +336,7 @@ final class StreamingTransformTranslator {
                 CombineFnUtil.toFnWithContext(transform.getFn());
         final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
-        final SparkPCollectionView pviews = context.getPviews();
+        final SparkPCollectionView pviews = context.getPViews();
 
         JavaDStream<WindowedValue<KV<K, InputT>>> dStream =
             ((UnboundedDataset<KV<K, InputT>>) context.borrowDataset(transform)).getDStream();
@@ -347,7 +347,7 @@ final class StreamingTransformTranslator {
           @Override
           public JavaRDD<WindowedValue<KV<K, OutputT>>> call(
               JavaRDD<WindowedValue<KV<K, InputT>>> rdd) throws Exception {
-            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
                 TranslationUtils.getSideInputs(transform.getSideInputs(),
                     JavaSparkContext.fromSparkContext(rdd.context()),
                     pviews);
@@ -371,7 +371,7 @@ final class StreamingTransformTranslator {
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
         final WindowingStrategy<?, ?> windowingStrategy =
             context.getInput(transform).getWindowingStrategy();
-        final SparkPCollectionView pviews = context.getPviews();
+        final SparkPCollectionView pviews = context.getPViews();
 
         JavaDStream<WindowedValue<InputT>> dStream =
             ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream();
@@ -387,7 +387,7 @@ final class StreamingTransformTranslator {
             final Accumulator<NamedAggregators> accum =
                 SparkAggregators.getNamedAggregators(jsc);
 
-            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
                 TranslationUtils.getSideInputs(transform.getSideInputs(),
                     jsc, pviews);
             return rdd.mapPartitions(
@@ -409,7 +409,7 @@ final class StreamingTransformTranslator {
         final DoFn<InputT, OutputT> doFn = transform.getFn();
         rejectStateAndTimers(doFn);
         final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
-        final SparkPCollectionView pviews = context.getPviews();
+        final SparkPCollectionView pviews = context.getPViews();
         final WindowingStrategy<?, ?> windowingStrategy =
             context.getInput(transform).getWindowingStrategy();
         @SuppressWarnings("unchecked")
@@ -424,7 +424,7 @@ final class StreamingTransformTranslator {
             final Accumulator<NamedAggregators> accum =
                 SparkAggregators.getNamedAggregators(new JavaSparkContext(rdd.context()));
 
-            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+            final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
                 TranslationUtils.getSideInputs(transform.getSideInputs(),
                     JavaSparkContext.fromSparkContext(rdd.context()), pviews);
               return rdd.mapPartitionsToPair(new MultiDoFnFunction<>(accum, doFn,

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
deleted file mode 100644
index 946f786..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.util;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Broadcast helper.
- */
-public class BroadcastHelper<T> implements Serializable {
-
-  /**
-   * If the property {@code beam.spark.directBroadcast} is set to
-   * {@code true} then Spark serialization (Kryo) will be used to broadcast values
-   * in View objects. By default this property is not set, and values are coded using
-   * the appropriate {@link Coder}.
-   */
-  private static final Logger LOG = LoggerFactory.getLogger(BroadcastHelper.class);
-  private Broadcast<byte[]> bcast;
-  private final Coder<T> coder;
-  private transient T value;
-  private transient byte[] bytes = null;
-
-  private BroadcastHelper(byte[] bytes, Coder<T> coder) {
-    this.bytes = bytes;
-    this.coder = coder;
-  }
-
-  public static <T> BroadcastHelper<T> create(byte[] bytes, Coder<T> coder) {
-    return new BroadcastHelper<>(bytes, coder);
-  }
-
-  public synchronized T getValue() {
-    if (value == null) {
-       value = deserialize();
-    }
-    return value;
-  }
-
-  public void broadcast(JavaSparkContext jsc) {
-    this.bcast = jsc.broadcast(bytes);
-  }
-
-  public void unpersist() {
-    this.bcast.unpersist();
-  }
-
-  private T deserialize() {
-    T val;
-    try {
-      val = coder.decode(new ByteArrayInputStream(bcast.value()), new Coder.Context(true));
-    } catch (IOException ioe) {
-      // this should not ever happen, log it if it does.
-      LOG.warn(ioe.getMessage());
-      val = null;
-    }
-    return val;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java
new file mode 100644
index 0000000..1fd2ea8
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Broadcast helper for side inputs. Helps to do the transformation from
+ * bytes transform to broadcast transform to value by coder
+ */
+public class SideInputBroadcast<T> implements Serializable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SideInputBroadcast.class);
+  private Broadcast<byte[]> bcast;
+  private final Coder<T> coder;
+  private transient T value;
+  private transient byte[] bytes = null;
+
+  private SideInputBroadcast(byte[] bytes, Coder<T> coder) {
+    this.bytes = bytes;
+    this.coder = coder;
+  }
+
+  public static <T> SideInputBroadcast<T> create(byte[] bytes, Coder<T> coder) {
+    return new SideInputBroadcast<>(bytes, coder);
+  }
+
+  public synchronized T getValue() {
+    if (value == null) {
+       value = deserialize();
+    }
+    return value;
+  }
+
+  public void broadcast(JavaSparkContext jsc) {
+    this.bcast = jsc.broadcast(bytes);
+  }
+
+  public void unpersist() {
+    this.bcast.unpersist();
+  }
+
+  private T deserialize() {
+    T val;
+    try {
+      val = coder.decode(new ByteArrayInputStream(bcast.value()), new Coder.Context(true));
+    } catch (IOException ioe) {
+      // this should not ever happen, log it if it does.
+      LOG.warn(ioe.getMessage());
+      val = null;
+    }
+    return val;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
index 8167ee0..c8e9850 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
@@ -37,10 +37,10 @@ import org.apache.beam.sdk.values.TupleTag;
  * A {@link SideInputReader} for thw SparkRunner.
  */
 public class SparkSideInputReader implements SideInputReader {
-  private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs;
+  private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
 
-  public SparkSideInputReader(Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
-                                  sideInputs) {
+  public SparkSideInputReader(
+      Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs) {
     this.sideInputs = sideInputs;
   }
 
@@ -49,7 +49,7 @@ public class SparkSideInputReader implements SideInputReader {
   public <T> T get(PCollectionView<T> view, BoundedWindow window) {
     //--- validate sideInput.
     checkNotNull(view, "The PCollectionView passed to sideInput cannot be null ");
-    KV<WindowingStrategy<?, ?>, BroadcastHelper<?>> windowedBroadcastHelper =
+    KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>> windowedBroadcastHelper =
         sideInputs.get(view.getTagInternal());
     checkNotNull(windowedBroadcastHelper, "SideInput for view " + view + " is not available.");
 
@@ -61,7 +61,6 @@ public class SparkSideInputReader implements SideInputReader {
     //--- match the appropriate sideInput window.
     // a tag will point to all matching sideInputs, that is all windows.
     // now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it.
-    @SuppressWarnings("unchecked")
     Iterable<WindowedValue<?>> availableSideInputs =
         (Iterable<WindowedValue<?>>) windowedBroadcastHelper.getValue().getValue();
     Iterable<WindowedValue<?>> sideInputForWindow =

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 352a7d8..7346bd9 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -178,8 +178,7 @@ public class ResumeFromCheckpointStreamingTest {
           .apply(ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
                @ProcessElement
                public void process(ProcessContext c) {
-
-                  // Check side input is passed correctly
+                  // Check side input is passed correctly also after resuming from checkpoint
                   Assert.assertEquals(c.sideInput(expectedView), Arrays.asList(EXPECTED));
                   c.output(c.element());
                 }

http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index bfb4988..35b5ed3 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -125,32 +125,6 @@
   </Match>
 
   <Match>
-    <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$CodedBroadcastHelper"/>
-    <Or>
-      <Field name="bcast" />
-      <Field name="value" />
-    </Or>
-    <Bug pattern="IS2_INCONSISTENT_SYNC"/>
-    <!--
-      Spark's Broadcast variables are a distributed and cached objects
-      and should not be treated as "normal" objects.
-    -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$DirectBroadcastHelper"/>
-    <Or>
-      <Field name="bcast" />
-      <Field name="value" />
-    </Or>
-    <Bug pattern="IS2_INCONSISTENT_SYNC"/>
-    <!--
-      Spark's Broadcast variables are a distributed and cached objects
-      and should not be treated as "normal" objects.
-    -->
-  </Match>
-
-  <Match>
     <Class name="org.apache.beam.runners.spark.aggregators.metrics.sink.CsvSink"/>
     <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
     <!-- Intentionally overriding parent name because inheritors should replace the parent. -->