You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/05/15 02:25:56 UTC

git commit: CRUNCH-394: Ensure Broadcast variable w/serialized Configuration is never null in SparkRuntimeContext

Repository: crunch
Updated Branches:
  refs/heads/master 25c22e58d -> f2cf619ca


CRUNCH-394: Ensure Broadcast variable w/serialized Configuration is never null in SparkRuntimeContext


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f2cf619c
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f2cf619c
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f2cf619c

Branch: refs/heads/master
Commit: f2cf619ca35307f39b86fc348aba8ff633ee0c5c
Parents: 25c22e5
Author: Josh Wills <jw...@apache.org>
Authored: Wed May 14 17:11:56 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Wed May 14 17:11:56 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/impl/spark/SparkRuntime.java    | 2 +-
 .../java/org/apache/crunch/impl/spark/SparkRuntimeContext.java  | 5 ++++-
 2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/f2cf619c/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index 2cb2fb3..22375ee 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -110,7 +110,7 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
     this.conf = conf;
     this.counters = sparkContext.accumulator(Maps.<String, Map<String, Long>>newHashMap(),
         new CounterAccumulatorParam());
-    this.ctxt = new SparkRuntimeContext(counters);
+    this.ctxt = new SparkRuntimeContext(counters, sparkContext.broadcast(WritableUtils.toByteArray(conf)));
     this.outputTargets = Maps.newTreeMap(DEPTH_COMPARATOR);
     this.outputTargets.putAll(outputTargets);
     this.toMaterialize = toMaterialize;

http://git-wip-us.apache.org/repos/asf/crunch/blob/f2cf619c/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
index 102ad4a..cea317c 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
@@ -50,8 +50,11 @@ public class SparkRuntimeContext implements Serializable {
   private transient Configuration conf;
   private transient TaskInputOutputContext context;
 
-  public SparkRuntimeContext(Accumulator<Map<String, Map<String, Long>>> counters) {
+  public SparkRuntimeContext(
+      Accumulator<Map<String, Map<String, Long>>> counters,
+      Broadcast<byte[]> broadConf) {
     this.counters = counters;
+    this.broadConf = broadConf;
   }
 
   public void setConf(Broadcast<byte[]> broadConf) {