You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/05/15 02:25:56 UTC
git commit: CRUNCH-394: Ensure Broadcast variable w/serialized
Configuration is never null in SparkRuntimeContext
Repository: crunch
Updated Branches:
refs/heads/master 25c22e58d -> f2cf619ca
CRUNCH-394: Ensure Broadcast variable w/serialized Configuration is never null in SparkRuntimeContext
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f2cf619c
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f2cf619c
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f2cf619c
Branch: refs/heads/master
Commit: f2cf619ca35307f39b86fc348aba8ff633ee0c5c
Parents: 25c22e5
Author: Josh Wills <jw...@apache.org>
Authored: Wed May 14 17:11:56 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Wed May 14 17:11:56 2014 -0700
----------------------------------------------------------------------
.../main/java/org/apache/crunch/impl/spark/SparkRuntime.java | 2 +-
.../java/org/apache/crunch/impl/spark/SparkRuntimeContext.java | 5 ++++-
2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/f2cf619c/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index 2cb2fb3..22375ee 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -110,7 +110,7 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
this.conf = conf;
this.counters = sparkContext.accumulator(Maps.<String, Map<String, Long>>newHashMap(),
new CounterAccumulatorParam());
- this.ctxt = new SparkRuntimeContext(counters);
+ this.ctxt = new SparkRuntimeContext(counters, sparkContext.broadcast(WritableUtils.toByteArray(conf)));
this.outputTargets = Maps.newTreeMap(DEPTH_COMPARATOR);
this.outputTargets.putAll(outputTargets);
this.toMaterialize = toMaterialize;
http://git-wip-us.apache.org/repos/asf/crunch/blob/f2cf619c/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
index 102ad4a..cea317c 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
@@ -50,8 +50,11 @@ public class SparkRuntimeContext implements Serializable {
private transient Configuration conf;
private transient TaskInputOutputContext context;
- public SparkRuntimeContext(Accumulator<Map<String, Map<String, Long>>> counters) {
+ public SparkRuntimeContext(
+ Accumulator<Map<String, Map<String, Long>>> counters,
+ Broadcast<byte[]> broadConf) {
this.counters = counters;
+ this.broadConf = broadConf;
}
public void setConf(Broadcast<byte[]> broadConf) {