You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/05/01 18:53:03 UTC

git commit: CRUNCH-384: Upgrade Spark to 0.9.1 and Scala to 2.10; fix a bunch of things, so that counters and standalone distributed Spark jobs work.

Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 b12945f7c -> ad2532703


CRUNCH-384: Upgrade Spark to 0.9.1 and Scala to 2.10; fix a bunch of things,
so that counters and standalone distributed Spark jobs work.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ad253270
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ad253270
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ad253270

Branch: refs/heads/apache-crunch-0.8
Commit: ad25327031317acc05daf6c2ea55da0ab7d13a03
Parents: b12945f
Author: Josh Wills <jw...@apache.org>
Authored: Thu Apr 24 17:56:05 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu May 1 09:09:58 2014 -0700

----------------------------------------------------------------------
 .../TaskInputOutputContextFactory.java          |  76 ++++++++
 .../org/apache/crunch/SparkUnionResultsIT.java  |   4 +-
 .../impl/spark/CounterAccumulatorParam.java     |  27 ++-
 .../apache/crunch/impl/spark/SparkPipeline.java |   9 +
 .../apache/crunch/impl/spark/SparkRuntime.java  |  48 +++--
 .../crunch/impl/spark/SparkRuntimeContext.java  | 175 ++++++-------------
 .../org/apache/hadoop/mapred/SparkCounter.java  |  76 ++++++++
 pom.xml                                         |  16 +-
 8 files changed, 277 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ad253270/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/TaskInputOutputContextFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/TaskInputOutputContextFactory.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/TaskInputOutputContextFactory.java
new file mode 100644
index 0000000..1aa65b3
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/TaskInputOutputContextFactory.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import java.lang.reflect.Constructor;
+
+public class TaskInputOutputContextFactory {
+  private static final Log LOG = LogFactory.getLog(TaskInputOutputContextFactory.class);
+
+  private static final TaskInputOutputContextFactory INSTANCE = new TaskInputOutputContextFactory();
+
+  public static TaskInputOutputContext create(
+      Configuration conf,
+      TaskAttemptID taskAttemptId,
+      StatusReporter reporter) {
+    return INSTANCE.createInternal(conf, taskAttemptId, reporter);
+  }
+
+  private Constructor<? extends TaskInputOutputContext> taskIOConstructor;
+  private int arity;
+
+  private TaskInputOutputContextFactory() {
+    String ic = TaskInputOutputContext.class.isInterface() ?
+        "org.apache.hadoop.mapreduce.task.MapContextImpl" :
+        "org.apache.hadoop.mapreduce.MapContext";
+    try {
+      Class<? extends TaskInputOutputContext> implClass = (Class<? extends TaskInputOutputContext>) Class.forName(ic);
+      this.taskIOConstructor = (Constructor<? extends TaskInputOutputContext>) implClass.getConstructor(
+          Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class,
+          OutputCommitter.class, StatusReporter.class, InputSplit.class);
+      this.arity = 7;
+    } catch (Exception e) {
+      LOG.fatal("Could not access TaskInputOutputContext constructor, exiting", e);
+    }
+  }
+
+  private TaskInputOutputContext createInternal(Configuration conf, TaskAttemptID taskAttemptId,
+                                                StatusReporter reporter) {
+    Object[] args = new Object[arity];
+    args[0] = conf;
+    args[1] = taskAttemptId;
+    args[5] = reporter;
+    try {
+      return taskIOConstructor.newInstance(args);
+    } catch (Exception e) {
+      LOG.error("Could not construct a TaskInputOutputContext instance", e);
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/ad253270/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
index 785f45a..4858d6c 100644
--- a/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
@@ -39,6 +39,7 @@ public class SparkUnionResultsIT extends CrunchTestSupport implements Serializab
   static class StringLengthMapFn extends MapFn<String, Pair<String, Long>> {
     @Override
     public Pair<String, Long> map(String input) {
+      increment("my", "counter");
       return new Pair<String, Long>(input, 10L);
     }
   }
@@ -93,7 +94,8 @@ public class SparkUnionResultsIT extends CrunchTestSupport implements Serializab
     PTable<String, Long> set2Counts = pipeline.read(At.textFile(inputPath2, Writables.strings())).count();
     PTables.asPTable(set2Counts.union(set1Lengths)).groupByKey().ungroup()
         .write(At.sequenceFile(output, Writables.strings(), Writables.longs()));
-    pipeline.done();
+    PipelineResult res = pipeline.done();
+    assertEquals(4, res.getStageResults().get(0).getCounterValue("my", "counter"));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/crunch/blob/ad253270/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java
index e1cb5c7..cd2692c 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java
@@ -22,24 +22,35 @@ import org.apache.spark.AccumulatorParam;
 
 import java.util.Map;
 
-public class CounterAccumulatorParam implements AccumulatorParam<Map<String, Long>> {
+public class CounterAccumulatorParam implements AccumulatorParam<Map<String, Map<String, Long>>> {
   @Override
-  public Map<String, Long> addAccumulator(Map<String, Long> current, Map<String, Long> added) {
-    for (Map.Entry<String, Long> e : added.entrySet()) {
-      Long cnt = current.get(e.getKey());
-      cnt = (cnt == null) ? e.getValue() : cnt + e.getValue();
-      current.put(e.getKey(), cnt);
+  public Map<String, Map<String, Long>> addAccumulator(
+      Map<String, Map<String, Long>> current,
+      Map<String, Map<String, Long>> added) {
+    for (Map.Entry<String, Map<String, Long>> e : added.entrySet()) {
+      Map<String, Long> grp = current.get(e.getKey());
+      if (grp == null) {
+        grp = Maps.newTreeMap();
+        current.put(e.getKey(), grp);
+      }
+      for (Map.Entry<String, Long> f : e.getValue().entrySet()) {
+        Long cnt = grp.get(f.getKey());
+        cnt = (cnt == null) ? f.getValue() : cnt + f.getValue();
+        grp.put(f.getKey(), cnt);
+      }
     }
     return current;
   }
 
   @Override
-  public Map<String, Long> addInPlace(Map<String, Long> first, Map<String, Long> second) {
+  public Map<String, Map<String, Long>> addInPlace(
+      Map<String, Map<String, Long>> first,
+      Map<String, Map<String, Long>> second) {
     return addAccumulator(first, second);
   }
 
   @Override
-  public Map<String, Long> zero(Map<String, Long> counts) {
+  public Map<String, Map<String, Long>> zero(Map<String, Map<String, Long>> counts) {
     return Maps.newHashMap();
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/ad253270/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
index 49e1d35..05e6e0c 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
@@ -43,11 +43,17 @@ public class SparkPipeline extends DistributedPipeline {
 
   private final String sparkConnect;
   private JavaSparkContext sparkContext;
+  private Class<?> jarClass;
   private final Map<PCollection<?>, StorageLevel> cachedCollections = Maps.newHashMap();
 
   public SparkPipeline(String sparkConnect, String appName) {
+    this(sparkConnect, appName, null);
+  }
+
+  public SparkPipeline(String sparkConnect, String appName, Class<?> jarClass) {
     super(appName, new Configuration(), new SparkCollectFactory());
     this.sparkConnect = Preconditions.checkNotNull(sparkConnect);
+    this.jarClass = jarClass;
   }
 
   public SparkPipeline(JavaSparkContext sparkContext, String appName) {
@@ -113,6 +119,9 @@ public class SparkPipeline extends DistributedPipeline {
     }
     if (sparkContext == null) {
       this.sparkContext = new JavaSparkContext(sparkConnect, getName());
+      if (jarClass != null) {
+        sparkContext.addJar(JavaSparkContext.jarOfClass(jarClass)[0]);
+      }
     }
     SparkRuntime runtime = new SparkRuntime(this, sparkContext, getConfiguration(), outputTargets, toMaterialize,
         cachedCollections);

http://git-wip-us.apache.org/repos/asf/crunch/blob/ad253270/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index ecc7023..2016c50 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -18,10 +18,11 @@
 package org.apache.crunch.impl.spark;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractFuture;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CombineFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PipelineExecution;
@@ -39,12 +40,15 @@ import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaRDDLike;
@@ -54,7 +58,6 @@ import org.apache.spark.storage.StorageLevel;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Comparator;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -65,17 +68,19 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class SparkRuntime extends AbstractFuture<PipelineResult> implements PipelineExecution {
 
+  private static final Log LOG = LogFactory.getLog(SparkRuntime.class);
+
   private SparkPipeline pipeline;
   private JavaSparkContext sparkContext;
   private Configuration conf;
   private CombineFn combineFn;
   private SparkRuntimeContext ctxt;
+  private Accumulator<Map<String, Map<String, Long>>> counters;
   private Map<PCollectionImpl<?>, Set<Target>> outputTargets;
   private Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
   private Map<PCollection<?>, StorageLevel> toCache;
   private final CountDownLatch doneSignal = new CountDownLatch(1);
   private AtomicReference<Status> status = new AtomicReference<Status>(Status.READY);
-  private PipelineResult result;
   private boolean started;
   private Thread monitorThread;
 
@@ -103,9 +108,9 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
     this.pipeline = pipeline;
     this.sparkContext = sparkContext;
     this.conf = conf;
-    this.ctxt = new SparkRuntimeContext(
-        sparkContext.broadcast(conf),
-        sparkContext.accumulator(Maps.<String, Long>newHashMap(), new CounterAccumulatorParam()));
+    this.counters = sparkContext.accumulator(Maps.<String, Map<String, Long>>newHashMap(),
+        new CounterAccumulatorParam());
+    this.ctxt = new SparkRuntimeContext(counters);
     this.outputTargets = Maps.newTreeMap(DEPTH_COMPARATOR);
     this.outputTargets.putAll(outputTargets);
     this.toMaterialize = toMaterialize;
@@ -203,13 +208,11 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
     for (PCollectionImpl<?> pcollect : outputTargets.keySet()) {
       targetDeps.put(pcollect, pcollect.getTargetDependencies());
     }
-
     while (!targetDeps.isEmpty() && doneSignal.getCount() > 0) {
       Set<Target> allTargets = Sets.newHashSet();
       for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
         allTargets.addAll(outputTargets.get(pcollect));
       }
-
       Map<PCollectionImpl<?>, JavaRDDLike<?, ?>> pcolToRdd = Maps.newTreeMap(DEPTH_COMPARATOR);
       for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
         if (Sets.intersection(allTargets, targetDeps.get(pcollect)).isEmpty()) {
@@ -227,6 +230,7 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
         }
         for (Target t : targets) {
           Configuration conf = new Configuration(getConfiguration());
+          getRuntimeContext().setConf(sparkContext.broadcast(WritableUtils.toByteArray(conf)));
           if (t instanceof MapReduceTarget) { //TODO: check this earlier
             Converter c = t.getConverter(ptype);
             JavaPairRDD<?, ?> outRDD;
@@ -239,7 +243,6 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
                   .map(new PairMapFunction(ptype.getOutputMapFn(), ctxt))
                   .map(new OutputConverterFunction(c));
             }
-
             try {
               Job job = new Job(conf);
               if (t instanceof PathTarget) {
@@ -281,16 +284,26 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
     }
     if (status.get() != Status.FAILED || status.get() != Status.KILLED) {
       status.set(Status.SUCCEEDED);
-      result = new PipelineResult(
-          ImmutableList.of(new PipelineResult.StageResult("Spark", null, start, System.currentTimeMillis())),
-          Status.SUCCEEDED);
-      set(result);
+      set(new PipelineResult(
+          ImmutableList.of(new PipelineResult.StageResult("Spark", getCounters(), start, System.currentTimeMillis())),
+          Status.SUCCEEDED));
     } else {
       set(PipelineResult.EMPTY);
     }
     doneSignal.countDown();
   }
 
+  private Counters getCounters() {
+    Counters c = new Counters();
+    for (Map.Entry<String, Map<String, Long>> e : counters.value().entrySet()) {
+      CounterGroup cg = c.getGroup(e.getKey());
+      for (Map.Entry<String, Long> f : e.getValue().entrySet()) {
+        cg.findCounter(f.getKey()).setValue(f.getValue());
+      }
+    }
+    return c;
+  }
+
   @Override
   public PipelineResult get() throws InterruptedException, ExecutionException {
     if (getStatus() == Status.READY) {
@@ -315,7 +328,12 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
 
   @Override
   public PipelineResult getResult() {
-    return result;
+    try {
+      return get();
+    } catch (Exception e) {
+      LOG.error("Exception retrieving PipelineResult, returning EMPTY", e);
+      return PipelineResult.EMPTY;
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/ad253270/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
index 78436c2..102ad4a 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
@@ -18,19 +18,15 @@
 package org.apache.crunch.impl.spark;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
-import javassist.util.proxy.MethodFilter;
-import javassist.util.proxy.MethodHandler;
-import javassist.util.proxy.ProxyFactory;
+import com.google.common.collect.Maps;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.DoFn;
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.TaskInputOutputContextFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapred.SparkCounter;
 import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
@@ -38,32 +34,35 @@ import org.apache.spark.Accumulator;
 import org.apache.spark.SparkFiles;
 import org.apache.spark.broadcast.Broadcast;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.lang.reflect.Method;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 public class SparkRuntimeContext implements Serializable {
 
-  private Broadcast<Configuration> broadConf;
-  private Accumulator<Map<String, Long>> counters;
+  private Broadcast<byte[]> broadConf;
+  private final Accumulator<Map<String, Map<String, Long>>> counters;
+  private transient Configuration conf;
   private transient TaskInputOutputContext context;
 
-  public SparkRuntimeContext(
-      Broadcast<Configuration> broadConf,
-      Accumulator<Map<String, Long>> counters) {
-    this.broadConf = broadConf;
+  public SparkRuntimeContext(Accumulator<Map<String, Map<String, Long>>> counters) {
     this.counters = counters;
   }
 
+  public void setConf(Broadcast<byte[]> broadConf) {
+    this.broadConf = broadConf;
+  }
+
   public void initialize(DoFn<?, ?> fn) {
     if (context == null) {
       configureLocalFiles();
-      context = getTaskIOContext(broadConf, counters);
+      context = TaskInputOutputContextFactory.create(getConfiguration(), new TaskAttemptID(),
+          new SparkReporter(counters));
     }
     fn.setContext(context);
     fn.initialize();
@@ -76,7 +75,6 @@ public class SparkRuntimeContext implements Serializable {
         List<String> allFiles = Lists.newArrayList();
         for (URI uri : uris) {
           File f = new File(uri.getPath());
-          String sparkFile = SparkFiles.get(f.getName());
           allFiles.add(SparkFiles.get(f.getName()));
         }
         String sparkFiles = Joiner.on(',').join(allFiles);
@@ -90,117 +88,60 @@ public class SparkRuntimeContext implements Serializable {
   }
 
   public Configuration getConfiguration() {
-    return broadConf.value();
+    if (conf == null) {
+      conf = new Configuration();
+      try {
+        ByteArrayInputStream bais = new ByteArrayInputStream(broadConf.value());
+        conf.readFields(new DataInputStream(bais));
+        bais.close();
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading broadcast configuration", e);
+      }
+    }
+    return conf;
   }
 
-  public static TaskInputOutputContext getTaskIOContext(
-      final Broadcast<Configuration> conf,
-      final Accumulator<Map<String, Long>> counters) {
-    ProxyFactory factory = new ProxyFactory();
-    Class<TaskInputOutputContext> superType = TaskInputOutputContext.class;
-    Class[] types = new Class[0];
-    Object[] args = new Object[0];
-    final TaskAttemptID taskAttemptId = new TaskAttemptID();
-    if (superType.isInterface()) {
-      factory.setInterfaces(new Class[] { superType });
-    } else {
-      types = new Class[] { Configuration.class, TaskAttemptID.class, RecordWriter.class, OutputCommitter.class,
-          StatusReporter.class };
-      args = new Object[] { conf.value(), taskAttemptId, null, null, null };
-      factory.setSuperclass(superType);
+  private static class SparkReporter extends StatusReporter implements Serializable {
+
+    Accumulator<Map<String, Map<String, Long>>> accum;
+    private transient Map<String, Map<String, Counter>> counters;
+
+    public SparkReporter(Accumulator<Map<String, Map<String, Long>>> accum) {
+      this.accum = accum;
+      this.counters = Maps.newHashMap();
+    }
+
+    @Override
+    public Counter getCounter(Enum<?> anEnum) {
+      return getCounter(anEnum.getDeclaringClass().toString(), anEnum.name());
     }
 
-    final Set<String> handledMethods = ImmutableSet.of("getConfiguration", "getCounter",
-        "progress", "getTaskAttemptID");
-    factory.setFilter(new MethodFilter() {
-      @Override
-      public boolean isHandled(Method m) {
-        return handledMethods.contains(m.getName());
+    @Override
+    public Counter getCounter(String group, String name) {
+      Map<String, Counter> grp = counters.get(group);
+      if (grp == null) {
+        grp = Maps.newTreeMap();
+        counters.put(group, grp);
       }
-    });
-    MethodHandler handler = new MethodHandler() {
-      @Override
-      public Object invoke(Object arg0, Method m, Method arg2, Object[] args) throws Throwable {
-        String name = m.getName();
-        if ("getConfiguration".equals(name)) {
-          return conf.value();
-        } else if ("progress".equals(name)) {
-          // no-op
-          return null;
-        } else if ("getTaskAttemptID".equals(name)) {
-          return taskAttemptId;
-        } else if ("getCounter".equals(name)){ // getCounter
-          if (args.length == 1) {
-            return getCounter(counters, args[0].getClass().getName(), ((Enum) args[0]).name());
-          } else {
-            return getCounter(counters, (String) args[0], (String) args[1]);
-          }
-        } else {
-          throw new IllegalStateException("Unhandled method " + name);
-        }
+      if (!grp.containsKey(name)) {
+        grp.put(name, new SparkCounter(group, name, accum));
       }
-    };
+      return grp.get(name);
+    }
 
-    try {
-      Object newInstance = factory.create(types, args, handler);
-      return (TaskInputOutputContext<?, ?, ?, ?>) newInstance;
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new RuntimeException(e);
+    @Override
+    public void progress() {
     }
-  }
 
-  private static Counter getCounter(final Accumulator<Map<String, Long>> accum, final String group,
-                                    final String counterName) {
-    ProxyFactory factory = new ProxyFactory();
-    Class<Counter> superType = Counter.class;
-    Class[] types = new Class[0];
-    Object[] args = new Object[0];
-    if (superType.isInterface()) {
-      factory.setInterfaces(new Class[] { superType });
-    } else {
-      types = new Class[] { String.class, String.class };
-      args = new Object[] { group, counterName };
-      factory.setSuperclass(superType);
+    @Override
+    public float getProgress() {
+      return 0;
     }
 
-    final Set<String> handledMethods = ImmutableSet.of("getDisplayName", "getName",
-        "getValue", "increment", "setValue", "setDisplayName");
-    factory.setFilter(new MethodFilter() {
-      @Override
-      public boolean isHandled(Method m) {
-        return handledMethods.contains(m.getName());
-      }
-    });
-    MethodHandler handler = new MethodHandler() {
-      @Override
-      public Object invoke(Object arg0, Method m, Method arg2, Object[] args) throws Throwable {
-        String name = m.getName();
-        if ("increment".equals(name)) {
-          accum.add(ImmutableMap.of(group + ":" + counterName, (Long) args[0]));
-          return null;
-        } else if ("getDisplayName".equals(name)) {
-          return counterName;
-        } else if ("getName".equals(name)) {
-          return counterName;
-        } else if ("setDisplayName".equals(name)) {
-          // No-op
-          return null;
-        } else if ("setValue".equals(name)) {
-          throw new UnsupportedOperationException("Cannot set counter values in Spark, only increment them");
-        } else if ("getValue".equals(name)) {
-          throw new UnsupportedOperationException("Cannot read counters during Spark execution");
-        } else {
-          throw new IllegalStateException("Unhandled method " + name);
-        }
-      }
-    };
-    try {
-      Object newInstance = factory.create(types, args, handler);
-      return (Counter) newInstance;
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new RuntimeException(e);
+    @Override
+    public void setStatus(String s) {
+
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/ad253270/crunch-spark/src/main/java/org/apache/hadoop/mapred/SparkCounter.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/hadoop/mapred/SparkCounter.java b/crunch-spark/src/main/java/org/apache/hadoop/mapred/SparkCounter.java
new file mode 100644
index 0000000..4964a55
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/hadoop/mapred/SparkCounter.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.spark.Accumulator;
+
+import java.util.Map;
+
+public class SparkCounter extends Counters.Counter {
+
+  private String group;
+  private String name;
+  private long value = 0;
+  private Accumulator<Map<String, Map<String, Long>>> accum;
+
+  public SparkCounter(String group, String name, Accumulator<Map<String, Map<String, Long>>> accum) {
+    this.group = group;
+    this.name = name;
+    this.accum = accum;
+  }
+
+  public SparkCounter(String group, String name, long value) {
+    this.group = group;
+    this.name = name;
+    this.value = value;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public String getDisplayName() {
+    return name;
+  }
+
+  @Override
+  public long getValue() {
+    return value;
+  }
+
+  @Override
+  public long getCounter() {
+    return getValue();
+  }
+
+  @Override
+  public void increment(long inc) {
+    this.value += inc;
+    accum.add(ImmutableMap.<String, Map<String, Long>>of(group, ImmutableMap.of(name, inc)));
+  }
+
+  @Override
+  public void setValue(long newValue) {
+    long delta = newValue - value;
+    accum.add(ImmutableMap.<String, Map<String, Long>>of(group, ImmutableMap.of(name, delta)));
+    this.value = newValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/ad253270/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1dcceb3..9c9c862 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,11 +88,10 @@ under the License.
     <hadoop.version>1.1.2</hadoop.version>
     <hbase.version>0.94.3</hbase.version>
 
-    <!-- Can be overridden by the scala-2.10 profile, but these are the default values -->
-    <scala.base.version>2.9.3</scala.base.version>
-    <scala.version>2.9.3</scala.version>
+    <scala.base.version>2.10</scala.base.version>
+    <scala.version>2.10.4</scala.version>
     <scalatest.version>1.9.1</scalatest.version>
-    <spark.version>0.8.1-incubating</spark.version>
+    <spark.version>0.9.1</spark.version>
   </properties>
 
   <scm>
@@ -532,15 +531,6 @@ under the License.
         </dependencies>
       </dependencyManagement>
     </profile>
-    <profile>
-      <id>scala-2.10</id>
-      <properties>
-        <scala.base.version>2.10</scala.base.version>
-        <scala.version>2.10.3</scala.version>
-        <scalatest.version>2.1.0</scalatest.version>
-        <spark.version>0.9.0-incubating</spark.version>
-      </properties>
-    </profile>
   </profiles>
 
   <build>