You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/05/01 18:53:03 UTC
git commit: CRUNCH-384: Upgrade Spark to 0.9.1 and Scala to 2.10;
fix a bunch of things, so that counters and standalone distributed
Spark jobs work.
Repository: crunch
Updated Branches:
refs/heads/apache-crunch-0.8 b12945f7c -> ad2532703
CRUNCH-384: Upgrade Spark to 0.9.1 and Scala to 2.10; fix a bunch of things,
so that counters and standalone distributed Spark jobs work.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ad253270
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ad253270
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ad253270
Branch: refs/heads/apache-crunch-0.8
Commit: ad25327031317acc05daf6c2ea55da0ab7d13a03
Parents: b12945f
Author: Josh Wills <jw...@apache.org>
Authored: Thu Apr 24 17:56:05 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu May 1 09:09:58 2014 -0700
----------------------------------------------------------------------
.../TaskInputOutputContextFactory.java | 76 ++++++++
.../org/apache/crunch/SparkUnionResultsIT.java | 4 +-
.../impl/spark/CounterAccumulatorParam.java | 27 ++-
.../apache/crunch/impl/spark/SparkPipeline.java | 9 +
.../apache/crunch/impl/spark/SparkRuntime.java | 48 +++--
.../crunch/impl/spark/SparkRuntimeContext.java | 175 ++++++-------------
.../org/apache/hadoop/mapred/SparkCounter.java | 76 ++++++++
pom.xml | 16 +-
8 files changed, 277 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/ad253270/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/TaskInputOutputContextFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/TaskInputOutputContextFactory.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/TaskInputOutputContextFactory.java
new file mode 100644
index 0000000..1aa65b3
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/TaskInputOutputContextFactory.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import java.lang.reflect.Constructor;
+
+public class TaskInputOutputContextFactory {
+ private static final Log LOG = LogFactory.getLog(TaskInputOutputContextFactory.class);
+
+ private static final TaskInputOutputContextFactory INSTANCE = new TaskInputOutputContextFactory();
+
+ public static TaskInputOutputContext create(
+ Configuration conf,
+ TaskAttemptID taskAttemptId,
+ StatusReporter reporter) {
+ return INSTANCE.createInternal(conf, taskAttemptId, reporter);
+ }
+
+ private Constructor<? extends TaskInputOutputContext> taskIOConstructor;
+ private int arity;
+
+ private TaskInputOutputContextFactory() {
+ String ic = TaskInputOutputContext.class.isInterface() ?
+ "org.apache.hadoop.mapreduce.task.MapContextImpl" :
+ "org.apache.hadoop.mapreduce.MapContext";
+ try {
+ Class<? extends TaskInputOutputContext> implClass = (Class<? extends TaskInputOutputContext>) Class.forName(ic);
+ this.taskIOConstructor = (Constructor<? extends TaskInputOutputContext>) implClass.getConstructor(
+ Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class,
+ OutputCommitter.class, StatusReporter.class, InputSplit.class);
+ this.arity = 7;
+ } catch (Exception e) {
+ LOG.fatal("Could not access TaskInputOutputContext constructor, exiting", e);
+ }
+ }
+
+ private TaskInputOutputContext createInternal(Configuration conf, TaskAttemptID taskAttemptId,
+ StatusReporter reporter) {
+ Object[] args = new Object[arity];
+ args[0] = conf;
+ args[1] = taskAttemptId;
+ args[5] = reporter;
+ try {
+ return taskIOConstructor.newInstance(args);
+ } catch (Exception e) {
+ LOG.error("Could not construct a TaskInputOutputContext instance", e);
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/ad253270/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
index 785f45a..4858d6c 100644
--- a/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java
@@ -39,6 +39,7 @@ public class SparkUnionResultsIT extends CrunchTestSupport implements Serializab
static class StringLengthMapFn extends MapFn<String, Pair<String, Long>> {
@Override
public Pair<String, Long> map(String input) {
+ increment("my", "counter");
return new Pair<String, Long>(input, 10L);
}
}
@@ -93,7 +94,8 @@ public class SparkUnionResultsIT extends CrunchTestSupport implements Serializab
PTable<String, Long> set2Counts = pipeline.read(At.textFile(inputPath2, Writables.strings())).count();
PTables.asPTable(set2Counts.union(set1Lengths)).groupByKey().ungroup()
.write(At.sequenceFile(output, Writables.strings(), Writables.longs()));
- pipeline.done();
+ PipelineResult res = pipeline.done();
+ assertEquals(4, res.getStageResults().get(0).getCounterValue("my", "counter"));
}
@Test
http://git-wip-us.apache.org/repos/asf/crunch/blob/ad253270/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java
index e1cb5c7..cd2692c 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java
@@ -22,24 +22,35 @@ import org.apache.spark.AccumulatorParam;
import java.util.Map;
-public class CounterAccumulatorParam implements AccumulatorParam<Map<String, Long>> {
+public class CounterAccumulatorParam implements AccumulatorParam<Map<String, Map<String, Long>>> {
@Override
- public Map<String, Long> addAccumulator(Map<String, Long> current, Map<String, Long> added) {
- for (Map.Entry<String, Long> e : added.entrySet()) {
- Long cnt = current.get(e.getKey());
- cnt = (cnt == null) ? e.getValue() : cnt + e.getValue();
- current.put(e.getKey(), cnt);
+ public Map<String, Map<String, Long>> addAccumulator(
+ Map<String, Map<String, Long>> current,
+ Map<String, Map<String, Long>> added) {
+ for (Map.Entry<String, Map<String, Long>> e : added.entrySet()) {
+ Map<String, Long> grp = current.get(e.getKey());
+ if (grp == null) {
+ grp = Maps.newTreeMap();
+ current.put(e.getKey(), grp);
+ }
+ for (Map.Entry<String, Long> f : e.getValue().entrySet()) {
+ Long cnt = grp.get(f.getKey());
+ cnt = (cnt == null) ? f.getValue() : cnt + f.getValue();
+ grp.put(f.getKey(), cnt);
+ }
}
return current;
}
@Override
- public Map<String, Long> addInPlace(Map<String, Long> first, Map<String, Long> second) {
+ public Map<String, Map<String, Long>> addInPlace(
+ Map<String, Map<String, Long>> first,
+ Map<String, Map<String, Long>> second) {
return addAccumulator(first, second);
}
@Override
- public Map<String, Long> zero(Map<String, Long> counts) {
+ public Map<String, Map<String, Long>> zero(Map<String, Map<String, Long>> counts) {
return Maps.newHashMap();
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/ad253270/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
index 49e1d35..05e6e0c 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
@@ -43,11 +43,17 @@ public class SparkPipeline extends DistributedPipeline {
private final String sparkConnect;
private JavaSparkContext sparkContext;
+ private Class<?> jarClass;
private final Map<PCollection<?>, StorageLevel> cachedCollections = Maps.newHashMap();
public SparkPipeline(String sparkConnect, String appName) {
+ this(sparkConnect, appName, null);
+ }
+
+ public SparkPipeline(String sparkConnect, String appName, Class<?> jarClass) {
super(appName, new Configuration(), new SparkCollectFactory());
this.sparkConnect = Preconditions.checkNotNull(sparkConnect);
+ this.jarClass = jarClass;
}
public SparkPipeline(JavaSparkContext sparkContext, String appName) {
@@ -113,6 +119,9 @@ public class SparkPipeline extends DistributedPipeline {
}
if (sparkContext == null) {
this.sparkContext = new JavaSparkContext(sparkConnect, getName());
+ if (jarClass != null) {
+ sparkContext.addJar(JavaSparkContext.jarOfClass(jarClass)[0]);
+ }
}
SparkRuntime runtime = new SparkRuntime(this, sparkContext, getConfiguration(), outputTargets, toMaterialize,
cachedCollections);
http://git-wip-us.apache.org/repos/asf/crunch/blob/ad253270/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index ecc7023..2016c50 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -18,10 +18,11 @@
package org.apache.crunch.impl.spark;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractFuture;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CombineFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PipelineExecution;
@@ -39,12 +40,15 @@ import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaRDDLike;
@@ -54,7 +58,6 @@ import org.apache.spark.storage.StorageLevel;
import java.io.IOException;
import java.net.URI;
import java.util.Comparator;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -65,17 +68,19 @@ import java.util.concurrent.atomic.AtomicReference;
public class SparkRuntime extends AbstractFuture<PipelineResult> implements PipelineExecution {
+ private static final Log LOG = LogFactory.getLog(SparkRuntime.class);
+
private SparkPipeline pipeline;
private JavaSparkContext sparkContext;
private Configuration conf;
private CombineFn combineFn;
private SparkRuntimeContext ctxt;
+ private Accumulator<Map<String, Map<String, Long>>> counters;
private Map<PCollectionImpl<?>, Set<Target>> outputTargets;
private Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
private Map<PCollection<?>, StorageLevel> toCache;
private final CountDownLatch doneSignal = new CountDownLatch(1);
private AtomicReference<Status> status = new AtomicReference<Status>(Status.READY);
- private PipelineResult result;
private boolean started;
private Thread monitorThread;
@@ -103,9 +108,9 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
this.pipeline = pipeline;
this.sparkContext = sparkContext;
this.conf = conf;
- this.ctxt = new SparkRuntimeContext(
- sparkContext.broadcast(conf),
- sparkContext.accumulator(Maps.<String, Long>newHashMap(), new CounterAccumulatorParam()));
+ this.counters = sparkContext.accumulator(Maps.<String, Map<String, Long>>newHashMap(),
+ new CounterAccumulatorParam());
+ this.ctxt = new SparkRuntimeContext(counters);
this.outputTargets = Maps.newTreeMap(DEPTH_COMPARATOR);
this.outputTargets.putAll(outputTargets);
this.toMaterialize = toMaterialize;
@@ -203,13 +208,11 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
for (PCollectionImpl<?> pcollect : outputTargets.keySet()) {
targetDeps.put(pcollect, pcollect.getTargetDependencies());
}
-
while (!targetDeps.isEmpty() && doneSignal.getCount() > 0) {
Set<Target> allTargets = Sets.newHashSet();
for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
allTargets.addAll(outputTargets.get(pcollect));
}
-
Map<PCollectionImpl<?>, JavaRDDLike<?, ?>> pcolToRdd = Maps.newTreeMap(DEPTH_COMPARATOR);
for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
if (Sets.intersection(allTargets, targetDeps.get(pcollect)).isEmpty()) {
@@ -227,6 +230,7 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
}
for (Target t : targets) {
Configuration conf = new Configuration(getConfiguration());
+ getRuntimeContext().setConf(sparkContext.broadcast(WritableUtils.toByteArray(conf)));
if (t instanceof MapReduceTarget) { //TODO: check this earlier
Converter c = t.getConverter(ptype);
JavaPairRDD<?, ?> outRDD;
@@ -239,7 +243,6 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
.map(new PairMapFunction(ptype.getOutputMapFn(), ctxt))
.map(new OutputConverterFunction(c));
}
-
try {
Job job = new Job(conf);
if (t instanceof PathTarget) {
@@ -281,16 +284,26 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
}
if (status.get() != Status.FAILED || status.get() != Status.KILLED) {
status.set(Status.SUCCEEDED);
- result = new PipelineResult(
- ImmutableList.of(new PipelineResult.StageResult("Spark", null, start, System.currentTimeMillis())),
- Status.SUCCEEDED);
- set(result);
+ set(new PipelineResult(
+ ImmutableList.of(new PipelineResult.StageResult("Spark", getCounters(), start, System.currentTimeMillis())),
+ Status.SUCCEEDED));
} else {
set(PipelineResult.EMPTY);
}
doneSignal.countDown();
}
+ private Counters getCounters() {
+ Counters c = new Counters();
+ for (Map.Entry<String, Map<String, Long>> e : counters.value().entrySet()) {
+ CounterGroup cg = c.getGroup(e.getKey());
+ for (Map.Entry<String, Long> f : e.getValue().entrySet()) {
+ cg.findCounter(f.getKey()).setValue(f.getValue());
+ }
+ }
+ return c;
+ }
+
@Override
public PipelineResult get() throws InterruptedException, ExecutionException {
if (getStatus() == Status.READY) {
@@ -315,7 +328,12 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
@Override
public PipelineResult getResult() {
- return result;
+ try {
+ return get();
+ } catch (Exception e) {
+ LOG.error("Exception retrieving PipelineResult, returning EMPTY", e);
+ return PipelineResult.EMPTY;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/ad253270/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
index 78436c2..102ad4a 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
@@ -18,19 +18,15 @@
package org.apache.crunch.impl.spark;
import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-import javassist.util.proxy.MethodFilter;
-import javassist.util.proxy.MethodHandler;
-import javassist.util.proxy.ProxyFactory;
+import com.google.common.collect.Maps;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.DoFn;
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.TaskInputOutputContextFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapred.SparkCounter;
import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
@@ -38,32 +34,35 @@ import org.apache.spark.Accumulator;
import org.apache.spark.SparkFiles;
import org.apache.spark.broadcast.Broadcast;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
-import java.lang.reflect.Method;
import java.net.URI;
import java.util.List;
import java.util.Map;
-import java.util.Set;
public class SparkRuntimeContext implements Serializable {
- private Broadcast<Configuration> broadConf;
- private Accumulator<Map<String, Long>> counters;
+ private Broadcast<byte[]> broadConf;
+ private final Accumulator<Map<String, Map<String, Long>>> counters;
+ private transient Configuration conf;
private transient TaskInputOutputContext context;
- public SparkRuntimeContext(
- Broadcast<Configuration> broadConf,
- Accumulator<Map<String, Long>> counters) {
- this.broadConf = broadConf;
+ public SparkRuntimeContext(Accumulator<Map<String, Map<String, Long>>> counters) {
this.counters = counters;
}
+ public void setConf(Broadcast<byte[]> broadConf) {
+ this.broadConf = broadConf;
+ }
+
public void initialize(DoFn<?, ?> fn) {
if (context == null) {
configureLocalFiles();
- context = getTaskIOContext(broadConf, counters);
+ context = TaskInputOutputContextFactory.create(getConfiguration(), new TaskAttemptID(),
+ new SparkReporter(counters));
}
fn.setContext(context);
fn.initialize();
@@ -76,7 +75,6 @@ public class SparkRuntimeContext implements Serializable {
List<String> allFiles = Lists.newArrayList();
for (URI uri : uris) {
File f = new File(uri.getPath());
- String sparkFile = SparkFiles.get(f.getName());
allFiles.add(SparkFiles.get(f.getName()));
}
String sparkFiles = Joiner.on(',').join(allFiles);
@@ -90,117 +88,60 @@ public class SparkRuntimeContext implements Serializable {
}
public Configuration getConfiguration() {
- return broadConf.value();
+ if (conf == null) {
+ conf = new Configuration();
+ try {
+ ByteArrayInputStream bais = new ByteArrayInputStream(broadConf.value());
+ conf.readFields(new DataInputStream(bais));
+ bais.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Error reading broadcast configuration", e);
+ }
+ }
+ return conf;
}
- public static TaskInputOutputContext getTaskIOContext(
- final Broadcast<Configuration> conf,
- final Accumulator<Map<String, Long>> counters) {
- ProxyFactory factory = new ProxyFactory();
- Class<TaskInputOutputContext> superType = TaskInputOutputContext.class;
- Class[] types = new Class[0];
- Object[] args = new Object[0];
- final TaskAttemptID taskAttemptId = new TaskAttemptID();
- if (superType.isInterface()) {
- factory.setInterfaces(new Class[] { superType });
- } else {
- types = new Class[] { Configuration.class, TaskAttemptID.class, RecordWriter.class, OutputCommitter.class,
- StatusReporter.class };
- args = new Object[] { conf.value(), taskAttemptId, null, null, null };
- factory.setSuperclass(superType);
+ private static class SparkReporter extends StatusReporter implements Serializable {
+
+ Accumulator<Map<String, Map<String, Long>>> accum;
+ private transient Map<String, Map<String, Counter>> counters;
+
+ public SparkReporter(Accumulator<Map<String, Map<String, Long>>> accum) {
+ this.accum = accum;
+ this.counters = Maps.newHashMap();
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> anEnum) {
+ return getCounter(anEnum.getDeclaringClass().toString(), anEnum.name());
}
- final Set<String> handledMethods = ImmutableSet.of("getConfiguration", "getCounter",
- "progress", "getTaskAttemptID");
- factory.setFilter(new MethodFilter() {
- @Override
- public boolean isHandled(Method m) {
- return handledMethods.contains(m.getName());
+ @Override
+ public Counter getCounter(String group, String name) {
+ Map<String, Counter> grp = counters.get(group);
+ if (grp == null) {
+ grp = Maps.newTreeMap();
+ counters.put(group, grp);
}
- });
- MethodHandler handler = new MethodHandler() {
- @Override
- public Object invoke(Object arg0, Method m, Method arg2, Object[] args) throws Throwable {
- String name = m.getName();
- if ("getConfiguration".equals(name)) {
- return conf.value();
- } else if ("progress".equals(name)) {
- // no-op
- return null;
- } else if ("getTaskAttemptID".equals(name)) {
- return taskAttemptId;
- } else if ("getCounter".equals(name)){ // getCounter
- if (args.length == 1) {
- return getCounter(counters, args[0].getClass().getName(), ((Enum) args[0]).name());
- } else {
- return getCounter(counters, (String) args[0], (String) args[1]);
- }
- } else {
- throw new IllegalStateException("Unhandled method " + name);
- }
+ if (!grp.containsKey(name)) {
+ grp.put(name, new SparkCounter(group, name, accum));
}
- };
+ return grp.get(name);
+ }
- try {
- Object newInstance = factory.create(types, args, handler);
- return (TaskInputOutputContext<?, ?, ?, ?>) newInstance;
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
+ @Override
+ public void progress() {
}
- }
- private static Counter getCounter(final Accumulator<Map<String, Long>> accum, final String group,
- final String counterName) {
- ProxyFactory factory = new ProxyFactory();
- Class<Counter> superType = Counter.class;
- Class[] types = new Class[0];
- Object[] args = new Object[0];
- if (superType.isInterface()) {
- factory.setInterfaces(new Class[] { superType });
- } else {
- types = new Class[] { String.class, String.class };
- args = new Object[] { group, counterName };
- factory.setSuperclass(superType);
+ @Override
+ public float getProgress() {
+ return 0;
}
- final Set<String> handledMethods = ImmutableSet.of("getDisplayName", "getName",
- "getValue", "increment", "setValue", "setDisplayName");
- factory.setFilter(new MethodFilter() {
- @Override
- public boolean isHandled(Method m) {
- return handledMethods.contains(m.getName());
- }
- });
- MethodHandler handler = new MethodHandler() {
- @Override
- public Object invoke(Object arg0, Method m, Method arg2, Object[] args) throws Throwable {
- String name = m.getName();
- if ("increment".equals(name)) {
- accum.add(ImmutableMap.of(group + ":" + counterName, (Long) args[0]));
- return null;
- } else if ("getDisplayName".equals(name)) {
- return counterName;
- } else if ("getName".equals(name)) {
- return counterName;
- } else if ("setDisplayName".equals(name)) {
- // No-op
- return null;
- } else if ("setValue".equals(name)) {
- throw new UnsupportedOperationException("Cannot set counter values in Spark, only increment them");
- } else if ("getValue".equals(name)) {
- throw new UnsupportedOperationException("Cannot read counters during Spark execution");
- } else {
- throw new IllegalStateException("Unhandled method " + name);
- }
- }
- };
- try {
- Object newInstance = factory.create(types, args, handler);
- return (Counter) newInstance;
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
+ @Override
+ public void setStatus(String s) {
+
}
}
+
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/ad253270/crunch-spark/src/main/java/org/apache/hadoop/mapred/SparkCounter.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/hadoop/mapred/SparkCounter.java b/crunch-spark/src/main/java/org/apache/hadoop/mapred/SparkCounter.java
new file mode 100644
index 0000000..4964a55
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/hadoop/mapred/SparkCounter.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.spark.Accumulator;
+
+import java.util.Map;
+
+public class SparkCounter extends Counters.Counter {
+
+ private String group;
+ private String name;
+ private long value = 0;
+ private Accumulator<Map<String, Map<String, Long>>> accum;
+
+ public SparkCounter(String group, String name, Accumulator<Map<String, Map<String, Long>>> accum) {
+ this.group = group;
+ this.name = name;
+ this.accum = accum;
+ }
+
+ public SparkCounter(String group, String name, long value) {
+ this.group = group;
+ this.name = name;
+ this.value = value;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return name;
+ }
+
+ @Override
+ public long getValue() {
+ return value;
+ }
+
+ @Override
+ public long getCounter() {
+ return getValue();
+ }
+
+ @Override
+ public void increment(long inc) {
+ this.value += inc;
+ accum.add(ImmutableMap.<String, Map<String, Long>>of(group, ImmutableMap.of(name, inc)));
+ }
+
+ @Override
+ public void setValue(long newValue) {
+ long delta = newValue - value;
+ accum.add(ImmutableMap.<String, Map<String, Long>>of(group, ImmutableMap.of(name, delta)));
+ this.value = newValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/ad253270/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1dcceb3..9c9c862 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,11 +88,10 @@ under the License.
<hadoop.version>1.1.2</hadoop.version>
<hbase.version>0.94.3</hbase.version>
- <!-- Can be overridden by the scala-2.10 profile, but these are the default values -->
- <scala.base.version>2.9.3</scala.base.version>
- <scala.version>2.9.3</scala.version>
+ <scala.base.version>2.10</scala.base.version>
+ <scala.version>2.10.4</scala.version>
<scalatest.version>1.9.1</scalatest.version>
- <spark.version>0.8.1-incubating</spark.version>
+ <spark.version>0.9.1</spark.version>
</properties>
<scm>
@@ -532,15 +531,6 @@ under the License.
</dependencies>
</dependencyManagement>
</profile>
- <profile>
- <id>scala-2.10</id>
- <properties>
- <scala.base.version>2.10</scala.base.version>
- <scala.version>2.10.3</scala.version>
- <scalatest.version>2.1.0</scalatest.version>
- <spark.version>0.9.0-incubating</spark.version>
- </properties>
- </profile>
</profiles>
<build>