You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/01/31 15:21:53 UTC
git commit: CRUNCH-158: Fix DoFn intialization in MemCollection and
add support for Counters to the in-memory impl
Updated Branches:
refs/heads/master b681953c5 -> 0bec4e4e6
CRUNCH-158: Fix DoFn intialization in MemCollection and add support for Counters to the in-memory impl
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/0bec4e4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/0bec4e4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/0bec4e4e
Branch: refs/heads/master
Commit: 0bec4e4e6f8bbd7a36c25129ebf2d5686e12616f
Parents: b681953
Author: Josh Wills <jw...@apache.org>
Authored: Wed Jan 30 07:12:10 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Jan 30 07:12:10 2013 -0800
----------------------------------------------------------------------
.../org/apache/crunch/impl/mem/MemPipeline.java | 8 ++++-
.../crunch/impl/mem/collect/MemCollection.java | 22 +++++++++++---
2 files changed, 23 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0bec4e4e/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 3e28a0c..95c9e72 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -28,7 +28,6 @@ import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.Source;
-import org.apache.crunch.SourceTarget;
import org.apache.crunch.TableSource;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mem.collect.MemCollection;
@@ -42,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -49,9 +49,13 @@ import com.google.common.collect.Lists;
public class MemPipeline implements Pipeline {
private static final Log LOG = LogFactory.getLog(MemPipeline.class);
-
+ private static Counters COUNTERS = new Counters();
private static final MemPipeline INSTANCE = new MemPipeline();
+ public static Counters getCounters() {
+ return COUNTERS;
+ }
+
public static Pipeline getInstance() {
return INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0bec4e4e/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index ffc38ae..defad1b 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -103,7 +103,7 @@ public class MemCollection<S> implements PCollection<S> {
public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type,
ParallelDoOptions options) {
InMemoryEmitter<T> emitter = new InMemoryEmitter<T>();
- doFn.initialize();
+ doFn.setContext(getInMemoryContext(getPipeline().getConfiguration()));
for (S s : collect) {
doFn.process(s, emitter);
}
@@ -126,7 +126,6 @@ public class MemCollection<S> implements PCollection<S> {
ParallelDoOptions options) {
InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>();
doFn.setContext(getInMemoryContext(getPipeline().getConfiguration()));
- doFn.initialize();
for (S s : collect) {
doFn.process(s, emitter);
}
@@ -250,13 +249,26 @@ public class MemCollection<S> implements PCollection<S> {
factory.setFilter(new MethodFilter() {
@Override
public boolean isHandled(Method m) {
- return m.getName().equals("getConfiguration");
+ String name = m.getName();
+ return "getConfiguration".equals(name) || "getCounter".equals(name) || "progress".equals(name);
}
});
MethodHandler handler = new MethodHandler() {
@Override
- public Object invoke(Object arg0, Method arg1, Method arg2, Object[] arg3) throws Throwable {
- return conf;
+ public Object invoke(Object arg0, Method m, Method arg2, Object[] args) throws Throwable {
+ String name = m.getName();
+ if ("getConfiguration".equals(name)) {
+ return conf;
+ } else if ("progress".equals(name)) {
+ // no-op
+ return null;
+ } else { // getCounter
+ if (args.length == 1) {
+ return MemPipeline.getCounters().findCounter((Enum<?>) args[0]);
+ } else {
+ return MemPipeline.getCounters().findCounter((String) args[0], (String) args[1]);
+ }
+ }
}
};
try {