You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/01/31 15:21:53 UTC

git commit: CRUNCH-158: Fix DoFn intialization in MemCollection and add support for Counters to the in-memory impl

Updated Branches:
  refs/heads/master b681953c5 -> 0bec4e4e6


CRUNCH-158: Fix DoFn intialization in MemCollection and add support for Counters to the in-memory impl


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/0bec4e4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/0bec4e4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/0bec4e4e

Branch: refs/heads/master
Commit: 0bec4e4e6f8bbd7a36c25129ebf2d5686e12616f
Parents: b681953
Author: Josh Wills <jw...@apache.org>
Authored: Wed Jan 30 07:12:10 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Jan 30 07:12:10 2013 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/impl/mem/MemPipeline.java    |    8 ++++-
 .../crunch/impl/mem/collect/MemCollection.java     |   22 +++++++++++---
 2 files changed, 23 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0bec4e4e/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 3e28a0c..95c9e72 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -28,7 +28,6 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.PipelineResult;
 import org.apache.crunch.Source;
-import org.apache.crunch.SourceTarget;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.Target;
 import org.apache.crunch.impl.mem.collect.MemCollection;
@@ -42,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -49,9 +49,13 @@ import com.google.common.collect.Lists;
 public class MemPipeline implements Pipeline {
 
   private static final Log LOG = LogFactory.getLog(MemPipeline.class);
-
+  private static Counters COUNTERS = new Counters();
   private static final MemPipeline INSTANCE = new MemPipeline();
 
+  public static Counters getCounters() {
+    return COUNTERS;
+  }
+  
   public static Pipeline getInstance() {
     return INSTANCE;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0bec4e4e/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index ffc38ae..defad1b 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -103,7 +103,7 @@ public class MemCollection<S> implements PCollection<S> {
   public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type,
       ParallelDoOptions options) {
     InMemoryEmitter<T> emitter = new InMemoryEmitter<T>();
-    doFn.initialize();
+    doFn.setContext(getInMemoryContext(getPipeline().getConfiguration()));
     for (S s : collect) {
       doFn.process(s, emitter);
     }
@@ -126,7 +126,6 @@ public class MemCollection<S> implements PCollection<S> {
       ParallelDoOptions options) {
     InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>();
     doFn.setContext(getInMemoryContext(getPipeline().getConfiguration()));
-    doFn.initialize();
     for (S s : collect) {
       doFn.process(s, emitter);
     }
@@ -250,13 +249,26 @@ public class MemCollection<S> implements PCollection<S> {
     factory.setFilter(new MethodFilter() {
       @Override
       public boolean isHandled(Method m) {
-        return m.getName().equals("getConfiguration");
+        String name = m.getName();
+        return "getConfiguration".equals(name) || "getCounter".equals(name) || "progress".equals(name);
       }
     });
     MethodHandler handler = new MethodHandler() {
       @Override
-      public Object invoke(Object arg0, Method arg1, Method arg2, Object[] arg3) throws Throwable {
-        return conf;
+      public Object invoke(Object arg0, Method m, Method arg2, Object[] args) throws Throwable {
+        String name = m.getName();
+        if ("getConfiguration".equals(name)) {
+          return conf;
+        } else if ("progress".equals(name)) {
+          // no-op
+          return null;
+        } else { // getCounter
+          if (args.length == 1) {
+            return MemPipeline.getCounters().findCounter((Enum<?>) args[0]);
+          } else {
+            return MemPipeline.getCounters().findCounter((String) args[0], (String) args[1]);
+          }
+        }
       }
     };
     try {