You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2017/01/10 17:27:18 UTC
[2/3] beam git commit: Create broadcast lazily
Create broadcast lazily
Fix Amit's comments + rename BroadcastHelper to SideInputBroadcast
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/662934b1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/662934b1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/662934b1
Branch: refs/heads/master
Commit: 662934b1e88c9697585b05e29d9e7b4a34fc6943
Parents: 130c113
Author: ksalant <ks...@payapal.com>
Authored: Thu Jan 5 09:40:50 2017 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Jan 10 19:12:25 2017 +0200
----------------------------------------------------------------------
.../runners/spark/translation/DoFnFunction.java | 6 +-
.../spark/translation/EvaluationContext.java | 11 ++-
.../translation/GroupCombineFunctions.java | 8 +-
.../spark/translation/MultiDoFnFunction.java | 7 +-
.../translation/SparkAbstractCombineFn.java | 12 +--
.../spark/translation/SparkGlobalCombineFn.java | 13 ++--
.../spark/translation/SparkKeyedCombineFn.java | 13 ++--
.../spark/translation/SparkPCollectionView.java | 42 +++++-----
.../spark/translation/TransformTranslator.java | 20 ++---
.../spark/translation/TranslationUtils.java | 25 +++---
.../streaming/StreamingTransformTranslator.java | 20 ++---
.../runners/spark/util/BroadcastHelper.java | 82 --------------------
.../runners/spark/util/SideInputBroadcast.java | 77 ++++++++++++++++++
.../spark/util/SparkSideInputReader.java | 9 +--
.../ResumeFromCheckpointStreamingTest.java | 3 +-
.../src/main/resources/beam/findbugs-filter.xml | 26 -------
16 files changed, 166 insertions(+), 208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index 6a641b5..af8e089 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -27,7 +27,7 @@ import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.aggregators.SparkAggregators;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.runners.spark.util.SparkSideInputReader;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.WindowedValue;
@@ -50,7 +50,7 @@ public class DoFnFunction<InputT, OutputT>
private final Accumulator<NamedAggregators> accumulator;
private final DoFn<InputT, OutputT> doFn;
private final SparkRuntimeContext runtimeContext;
- private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs;
+ private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
private final WindowingStrategy<?, ?> windowingStrategy;
/**
@@ -64,7 +64,7 @@ public class DoFnFunction<InputT, OutputT>
Accumulator<NamedAggregators> accumulator,
DoFn<InputT, OutputT> doFn,
SparkRuntimeContext runtimeContext,
- Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs,
+ Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
WindowingStrategy<?, ?> windowingStrategy) {
this.accumulator = accumulator;
http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index b1a1142..0ad862d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -192,23 +192,26 @@ public class EvaluationContext {
}
/**
- * Retruns the current views creates in the pipepline.
+ * Retrun the current views creates in the pipepline.
+ *
* @return SparkPCollectionView
*/
- public SparkPCollectionView getPviews() {
+ public SparkPCollectionView getPViews() {
return pviews;
}
/**
* Adds/Replaces a view to the current views creates in the pipepline.
+ *
* @param view - Identifier of the view
* @param value - Actual value of the view
* @param coder - Coder of the value
*/
- public void putPView(PCollectionView<?> view,
+ public void putPView(
+ PCollectionView<?> view,
Iterable<WindowedValue<?>> value,
Coder<Iterable<WindowedValue<?>>> coder) {
- pviews.putPView(view, value, coder, jsc);
+ pviews.putPView(view, value, coder);
}
<T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index 4875b0c..bb95065 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -24,8 +24,8 @@ import java.util.Map;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.coders.CoderHelpers;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.runners.spark.util.ByteArray;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
@@ -102,7 +102,7 @@ public class GroupCombineFunctions {
final Coder<OutputT> oCoder,
final SparkRuntimeContext runtimeContext,
final WindowingStrategy<?, ?> windowingStrategy,
- final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
+ final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
sideInputs,
boolean hasDefault) {
// handle empty input RDD, which will natively skip the entire execution as Spark will not
@@ -190,8 +190,8 @@ public class GroupCombineFunctions {
final KvCoder<K, InputT> inputCoder,
final SparkRuntimeContext runtimeContext,
final WindowingStrategy<?, ?> windowingStrategy,
- final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>,
- BroadcastHelper<?>>> sideInputs) {
+ final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
+ sideInputs) {
//--- coders.
final Coder<K> keyCoder = inputCoder.getKeyCoder();
final Coder<InputT> viCoder = inputCoder.getValueCoder();
http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 8a55369..0f9417a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -29,7 +29,7 @@ import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.aggregators.SparkAggregators;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.runners.spark.util.SparkSideInputReader;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.WindowedValue;
@@ -56,7 +56,7 @@ public class MultiDoFnFunction<InputT, OutputT>
private final DoFn<InputT, OutputT> doFn;
private final SparkRuntimeContext runtimeContext;
private final TupleTag<OutputT> mainOutputTag;
- private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs;
+ private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
private final WindowingStrategy<?, ?> windowingStrategy;
/**
@@ -72,8 +72,7 @@ public class MultiDoFnFunction<InputT, OutputT>
DoFn<InputT, OutputT> doFn,
SparkRuntimeContext runtimeContext,
TupleTag<OutputT> mainOutputTag,
- Map<TupleTag<?>, KV<WindowingStrategy<?, ?>,
- BroadcastHelper<?>>> sideInputs,
+ Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
WindowingStrategy<?, ?> windowingStrategy) {
this.accumulator = accumulator;
http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
index 6aeb0db..fa1c3fc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
@@ -29,7 +29,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.runners.spark.util.SparkSideInputReader;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineWithContext;
@@ -49,14 +49,14 @@ import org.apache.beam.sdk.values.TupleTag;
*/
public class SparkAbstractCombineFn implements Serializable {
protected final SparkRuntimeContext runtimeContext;
- protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs;
+ protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
protected final WindowingStrategy<?, ?> windowingStrategy;
- public SparkAbstractCombineFn(SparkRuntimeContext runtimeContext,
- Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
- sideInputs,
- WindowingStrategy<?, ?> windowingStrategy) {
+ public SparkAbstractCombineFn(
+ SparkRuntimeContext runtimeContext,
+ Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
+ WindowingStrategy<?, ?> windowingStrategy) {
this.runtimeContext = runtimeContext;
this.sideInputs = sideInputs;
this.windowingStrategy = windowingStrategy;
http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
index 5339fb3..23f5d20 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -46,12 +46,11 @@ import org.joda.time.Instant;
public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstractCombineFn {
private final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
- public SparkGlobalCombineFn(CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT>
- combineFn,
- SparkRuntimeContext runtimeContext,
- Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
- sideInputs,
- WindowingStrategy<?, ?> windowingStrategy) {
+ public SparkGlobalCombineFn(
+ CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn,
+ SparkRuntimeContext runtimeContext,
+ Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
+ WindowingStrategy<?, ?> windowingStrategy) {
super(runtimeContext, sideInputs, windowingStrategy);
this.combineFn = combineFn;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
index 910f7f0..b5d243f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -46,12 +46,11 @@ import org.joda.time.Instant;
public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstractCombineFn {
private final CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn;
- public SparkKeyedCombineFn(CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT,
- OutputT> combineFn,
- SparkRuntimeContext runtimeContext,
- Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
- sideInputs,
- WindowingStrategy<?, ?> windowingStrategy) {
+ public SparkKeyedCombineFn(
+ CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn,
+ SparkRuntimeContext runtimeContext,
+ Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
+ WindowingStrategy<?, ?> windowingStrategy) {
super(runtimeContext, sideInputs, windowingStrategy);
this.combineFn = combineFn;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
index e888182..f71cb6b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.beam.runners.spark.coders.CoderHelpers;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
@@ -34,7 +34,8 @@ import scala.Tuple2;
public class SparkPCollectionView implements Serializable {
// Holds the view --> broadcast mapping. Transient so it will be null from resume
- private transient volatile Map<PCollectionView<?>, BroadcastHelper> broadcastHelperMap = null;
+ private transient volatile Map<PCollectionView<?>, SideInputBroadcast>
+ broadcastHelperMap = null;
// Holds the Actual data of the views in serialize form
private Map<PCollectionView<?>,
@@ -45,24 +46,25 @@ public class SparkPCollectionView implements Serializable {
void putPView(
PCollectionView<?> view,
Iterable<WindowedValue<?>> value,
- Coder<Iterable<WindowedValue<?>>> coder,
- JavaSparkContext context) {
+ Coder<Iterable<WindowedValue<?>>> coder) {
pviews.put(view, new Tuple2<>(CoderHelpers.toByteArray(value, coder), coder));
- // overwrite/create broadcast - Future improvement is to initialize the BH lazily
- getPCollectionView(view, context, true);
- }
- BroadcastHelper getPCollectionView(
- PCollectionView<?> view,
- JavaSparkContext context) {
- return getPCollectionView(view, context, false);
+ // Currently unsynchronized unpersist, if needed can be changed to blocking
+ if (broadcastHelperMap != null) {
+ synchronized (SparkPCollectionView.class) {
+ SideInputBroadcast helper = broadcastHelperMap.get(view);
+ if (helper != null) {
+ helper.unpersist();
+ broadcastHelperMap.remove(view);
+ }
+ }
+ }
}
- private BroadcastHelper getPCollectionView(
+ SideInputBroadcast getPCollectionView(
PCollectionView<?> view,
- JavaSparkContext context,
- boolean overwrite) {
+ JavaSparkContext context) {
// initialize broadcastHelperMap if needed
if (broadcastHelperMap == null) {
synchronized (SparkPCollectionView.class) {
@@ -73,7 +75,7 @@ public class SparkPCollectionView implements Serializable {
}
//lazily broadcast views
- BroadcastHelper helper = broadcastHelperMap.get(view);
+ SideInputBroadcast helper = broadcastHelperMap.get(view);
if (helper == null) {
synchronized (SparkPCollectionView.class) {
helper = broadcastHelperMap.get(view);
@@ -81,21 +83,15 @@ public class SparkPCollectionView implements Serializable {
helper = createBroadcastHelper(view, context);
}
}
- } else if (overwrite) {
- synchronized (SparkPCollectionView.class) {
- // Currently unsynchronized unpersist, if needed can be changed to blocking
- helper.unpersist();
- helper = createBroadcastHelper(view, context);
- }
}
return helper;
}
- private BroadcastHelper createBroadcastHelper(
+ private SideInputBroadcast createBroadcastHelper(
PCollectionView<?> view,
JavaSparkContext context) {
Tuple2<byte[], Coder<Iterable<WindowedValue<?>>>> tuple2 = pviews.get(view);
- BroadcastHelper helper = BroadcastHelper.create(tuple2._1, tuple2._2);
+ SideInputBroadcast helper = SideInputBroadcast.create(tuple2._1, tuple2._2);
helper.broadcast(context);
broadcastHelperMap.put(view, helper);
return helper;
http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 0cf3dc6..3e941e4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -40,7 +40,7 @@ import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper;
import org.apache.beam.runners.spark.io.hadoop.TemplatedAvroKeyOutputFormat;
import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -178,7 +178,7 @@ public final class TransformTranslator {
CombineFnUtil.toFnWithContext(transform.getFn());
final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
- final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+ final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
TranslationUtils.getSideInputs(transform.getSideInputs(), context);
final boolean hasDefault = transform.isInsertDefault();
@@ -210,7 +210,7 @@ public final class TransformTranslator {
CombineFnUtil.toFnWithContext(transform.getFn());
final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
- final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+ final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
TranslationUtils.getSideInputs(transform.getSideInputs(), context);
@SuppressWarnings("unchecked")
@@ -237,7 +237,7 @@ public final class TransformTranslator {
context.getInput(transform).getWindowingStrategy();
Accumulator<NamedAggregators> accum =
SparkAggregators.getNamedAggregators(context.getSparkContext());
- Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+ Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
TranslationUtils.getSideInputs(transform.getSideInputs(), context);
context.putDataset(transform,
new BoundedDataset<>(inRDD.mapPartitions(new DoFnFunction<>(accum, doFn,
@@ -536,9 +536,7 @@ public final class TransformTranslator {
@SuppressWarnings("unchecked")
Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter;
- context.putPView(output,
- iterCast,
- coderInternal);
+ context.putPView(output, iterCast, coderInternal);
}
};
}
@@ -555,9 +553,7 @@ public final class TransformTranslator {
@SuppressWarnings("unchecked")
Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter;
- context.putPView(output,
- iterCast,
- coderInternal);
+ context.putPView(output, iterCast, coderInternal);
}
};
}
@@ -576,9 +572,7 @@ public final class TransformTranslator {
@SuppressWarnings("unchecked")
Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter;
- context.putPView(output,
- iterCast,
- coderInternal);
+ context.putPView(output, iterCast, coderInternal);
}
};
}
http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index ae9cb3e..965330c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
@@ -189,11 +189,11 @@ public final class TranslationUtils {
*
* @param views The {@link PCollectionView}s.
* @param context The {@link EvaluationContext}.
- * @return a map of tagged {@link BroadcastHelper}s and their {@link WindowingStrategy}.
+ * @return a map of tagged {@link SideInputBroadcast}s and their {@link WindowingStrategy}.
*/
- static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
+ static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
getSideInputs(List<PCollectionView<?>> views, EvaluationContext context) {
- return getSideInputs(views, context.getSparkContext(), context.getPviews());
+ return getSideInputs(views, context.getSparkContext(), context.getPViews());
}
/**
@@ -202,22 +202,23 @@ public final class TranslationUtils {
* @param views The {@link PCollectionView}s.
* @param context The {@link JavaSparkContext}.
* @param pviews The {@link SparkPCollectionView}.
- * @return a map of tagged {@link BroadcastHelper}s and their {@link WindowingStrategy}.
+ * @return a map of tagged {@link SideInputBroadcast}s and their {@link WindowingStrategy}.
*/
- public static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
- getSideInputs(List<PCollectionView<?>> views, JavaSparkContext context,
- SparkPCollectionView pviews) {
-
+ public static Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
+ getSideInputs(
+ List<PCollectionView<?>> views,
+ JavaSparkContext context,
+ SparkPCollectionView pviews) {
if (views == null) {
return ImmutableMap.of();
} else {
- Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+ Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
Maps.newHashMap();
for (PCollectionView<?> view : views) {
- BroadcastHelper helper = pviews.getPCollectionView(view, context);
+ SideInputBroadcast helper = pviews.getPCollectionView(view, context);
WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal();
sideInputs.put(view.getTagInternal(),
- KV.<WindowingStrategy<?, ?>, BroadcastHelper<?>>of(windowingStrategy, helper));
+ KV.<WindowingStrategy<?, ?>, SideInputBroadcast<?>>of(windowingStrategy, helper));
}
return sideInputs;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 0b2b4d6..3c89b99 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -43,7 +43,7 @@ import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.runners.spark.translation.WindowingHelpers;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.Read;
@@ -254,7 +254,7 @@ final class StreamingTransformTranslator {
.getDStream();
final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
- final SparkPCollectionView pviews = context.getPviews();
+ final SparkPCollectionView pviews = context.getPViews();
JavaDStream<WindowedValue<KV<K, OutputT>>> outStream = dStream.transform(
new Function<JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>>,
@@ -296,7 +296,7 @@ final class StreamingTransformTranslator {
final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
final boolean hasDefault = transform.isInsertDefault();
- final SparkPCollectionView pviews = context.getPviews();
+ final SparkPCollectionView pviews = context.getPViews();
JavaDStream<WindowedValue<InputT>> dStream =
((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream();
@@ -306,7 +306,7 @@ final class StreamingTransformTranslator {
@Override
public JavaRDD<WindowedValue<OutputT>> call(JavaRDD<WindowedValue<InputT>> rdd)
throws Exception {
- final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+ final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
TranslationUtils.getSideInputs(transform.getSideInputs(),
JavaSparkContext.fromSparkContext(rdd.context()),
pviews);
@@ -336,7 +336,7 @@ final class StreamingTransformTranslator {
CombineFnUtil.toFnWithContext(transform.getFn());
final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
- final SparkPCollectionView pviews = context.getPviews();
+ final SparkPCollectionView pviews = context.getPViews();
JavaDStream<WindowedValue<KV<K, InputT>>> dStream =
((UnboundedDataset<KV<K, InputT>>) context.borrowDataset(transform)).getDStream();
@@ -347,7 +347,7 @@ final class StreamingTransformTranslator {
@Override
public JavaRDD<WindowedValue<KV<K, OutputT>>> call(
JavaRDD<WindowedValue<KV<K, InputT>>> rdd) throws Exception {
- final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+ final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
TranslationUtils.getSideInputs(transform.getSideInputs(),
JavaSparkContext.fromSparkContext(rdd.context()),
pviews);
@@ -371,7 +371,7 @@ final class StreamingTransformTranslator {
final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
final WindowingStrategy<?, ?> windowingStrategy =
context.getInput(transform).getWindowingStrategy();
- final SparkPCollectionView pviews = context.getPviews();
+ final SparkPCollectionView pviews = context.getPViews();
JavaDStream<WindowedValue<InputT>> dStream =
((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream();
@@ -387,7 +387,7 @@ final class StreamingTransformTranslator {
final Accumulator<NamedAggregators> accum =
SparkAggregators.getNamedAggregators(jsc);
- final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+ final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
TranslationUtils.getSideInputs(transform.getSideInputs(),
jsc, pviews);
return rdd.mapPartitions(
@@ -409,7 +409,7 @@ final class StreamingTransformTranslator {
final DoFn<InputT, OutputT> doFn = transform.getFn();
rejectStateAndTimers(doFn);
final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
- final SparkPCollectionView pviews = context.getPviews();
+ final SparkPCollectionView pviews = context.getPViews();
final WindowingStrategy<?, ?> windowingStrategy =
context.getInput(transform).getWindowingStrategy();
@SuppressWarnings("unchecked")
@@ -424,7 +424,7 @@ final class StreamingTransformTranslator {
final Accumulator<NamedAggregators> accum =
SparkAggregators.getNamedAggregators(new JavaSparkContext(rdd.context()));
- final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
+ final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs =
TranslationUtils.getSideInputs(transform.getSideInputs(),
JavaSparkContext.fromSparkContext(rdd.context()), pviews);
return rdd.mapPartitionsToPair(new MultiDoFnFunction<>(accum, doFn,
http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
deleted file mode 100644
index 946f786..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.util;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Broadcast helper.
- */
-public class BroadcastHelper<T> implements Serializable {
-
- /**
- * If the property {@code beam.spark.directBroadcast} is set to
- * {@code true} then Spark serialization (Kryo) will be used to broadcast values
- * in View objects. By default this property is not set, and values are coded using
- * the appropriate {@link Coder}.
- */
- private static final Logger LOG = LoggerFactory.getLogger(BroadcastHelper.class);
- private Broadcast<byte[]> bcast;
- private final Coder<T> coder;
- private transient T value;
- private transient byte[] bytes = null;
-
- private BroadcastHelper(byte[] bytes, Coder<T> coder) {
- this.bytes = bytes;
- this.coder = coder;
- }
-
- public static <T> BroadcastHelper<T> create(byte[] bytes, Coder<T> coder) {
- return new BroadcastHelper<>(bytes, coder);
- }
-
- public synchronized T getValue() {
- if (value == null) {
- value = deserialize();
- }
- return value;
- }
-
- public void broadcast(JavaSparkContext jsc) {
- this.bcast = jsc.broadcast(bytes);
- }
-
- public void unpersist() {
- this.bcast.unpersist();
- }
-
- private T deserialize() {
- T val;
- try {
- val = coder.decode(new ByteArrayInputStream(bcast.value()), new Coder.Context(true));
- } catch (IOException ioe) {
- // this should not ever happen, log it if it does.
- LOG.warn(ioe.getMessage());
- val = null;
- }
- return val;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java
new file mode 100644
index 0000000..1fd2ea8
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputBroadcast.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Broadcast helper for side inputs. Helps to do the transformation from
+ * bytes transform to broadcast transform to value by coder
+ */
+public class SideInputBroadcast<T> implements Serializable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SideInputBroadcast.class);
+ private Broadcast<byte[]> bcast;
+ private final Coder<T> coder;
+ private transient T value;
+ private transient byte[] bytes = null;
+
+ private SideInputBroadcast(byte[] bytes, Coder<T> coder) {
+ this.bytes = bytes;
+ this.coder = coder;
+ }
+
+ public static <T> SideInputBroadcast<T> create(byte[] bytes, Coder<T> coder) {
+ return new SideInputBroadcast<>(bytes, coder);
+ }
+
+ public synchronized T getValue() {
+ if (value == null) {
+ value = deserialize();
+ }
+ return value;
+ }
+
+ public void broadcast(JavaSparkContext jsc) {
+ this.bcast = jsc.broadcast(bytes);
+ }
+
+ public void unpersist() {
+ this.bcast.unpersist();
+ }
+
+ private T deserialize() {
+ T val;
+ try {
+ val = coder.decode(new ByteArrayInputStream(bcast.value()), new Coder.Context(true));
+ } catch (IOException ioe) {
+ // this should not ever happen, log it if it does.
+ LOG.warn(ioe.getMessage());
+ val = null;
+ }
+ return val;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
index 8167ee0..c8e9850 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
@@ -37,10 +37,10 @@ import org.apache.beam.sdk.values.TupleTag;
* A {@link SideInputReader} for thw SparkRunner.
*/
public class SparkSideInputReader implements SideInputReader {
- private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs;
+ private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
- public SparkSideInputReader(Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>>
- sideInputs) {
+ public SparkSideInputReader(
+ Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs) {
this.sideInputs = sideInputs;
}
@@ -49,7 +49,7 @@ public class SparkSideInputReader implements SideInputReader {
public <T> T get(PCollectionView<T> view, BoundedWindow window) {
//--- validate sideInput.
checkNotNull(view, "The PCollectionView passed to sideInput cannot be null ");
- KV<WindowingStrategy<?, ?>, BroadcastHelper<?>> windowedBroadcastHelper =
+ KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>> windowedBroadcastHelper =
sideInputs.get(view.getTagInternal());
checkNotNull(windowedBroadcastHelper, "SideInput for view " + view + " is not available.");
@@ -61,7 +61,6 @@ public class SparkSideInputReader implements SideInputReader {
//--- match the appropriate sideInput window.
// a tag will point to all matching sideInputs, that is all windows.
// now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it.
- @SuppressWarnings("unchecked")
Iterable<WindowedValue<?>> availableSideInputs =
(Iterable<WindowedValue<?>>) windowedBroadcastHelper.getValue().getValue();
Iterable<WindowedValue<?>> sideInputForWindow =
http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 352a7d8..7346bd9 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -178,8 +178,7 @@ public class ResumeFromCheckpointStreamingTest {
.apply(ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
public void process(ProcessContext c) {
-
- // Check side input is passed correctly
+ // Check side input is passed correctly also after resuming from checkpoint
Assert.assertEquals(c.sideInput(expectedView), Arrays.asList(EXPECTED));
c.output(c.element());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/662934b1/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index bfb4988..35b5ed3 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -125,32 +125,6 @@
</Match>
<Match>
- <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$CodedBroadcastHelper"/>
- <Or>
- <Field name="bcast" />
- <Field name="value" />
- </Or>
- <Bug pattern="IS2_INCONSISTENT_SYNC"/>
- <!--
- Spark's Broadcast variables are a distributed and cached objects
- and should not be treated as "normal" objects.
- -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$DirectBroadcastHelper"/>
- <Or>
- <Field name="bcast" />
- <Field name="value" />
- </Or>
- <Bug pattern="IS2_INCONSISTENT_SYNC"/>
- <!--
- Spark's Broadcast variables are a distributed and cached objects
- and should not be treated as "normal" objects.
- -->
- </Match>
-
- <Match>
<Class name="org.apache.beam.runners.spark.aggregators.metrics.sink.CsvSink"/>
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
<!-- Intentionally overriding parent name because inheritors should replace the parent. -->