You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/11/02 03:02:29 UTC

git commit: CRUNCH-292: Hack around job counter limits in Hadoop-2 for in-memory pipelines

Updated Branches:
  refs/heads/master 9a689d1ed -> a7002c117


CRUNCH-292: Hack around job counter limits in Hadoop-2 for in-memory pipelines


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a7002c11
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a7002c11
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a7002c11

Branch: refs/heads/master
Commit: a7002c1178ae9e8dd522d029c96d80ba7d616df8
Parents: 9a689d1
Author: Josh Wills <jw...@apache.org>
Authored: Fri Nov 1 10:04:21 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Nov 1 13:50:12 2013 -0700

----------------------------------------------------------------------
 .../apache/crunch/impl/mem/CountersWrapper.java | 127 +++++++++++++++++++
 .../org/apache/crunch/impl/mem/MemPipeline.java |   4 +-
 .../apache/crunch/impl/mem/CountersTest.java    |  54 ++++++++
 3 files changed, 183 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/a7002c11/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java
new file mode 100644
index 0000000..7fe893c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java
@@ -0,0 +1,127 @@
+/**
+ * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.crunch.impl.mem;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+
+import javax.annotation.Nullable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+class CountersWrapper extends Counters {
+
+  private Counters active;
+  private final Map<String, Map<String, Counter>> lookupCache = Maps.newHashMap();
+  private Set<Counters> allCounters = Sets.newHashSet();
+
+  CountersWrapper() {
+    this.active = new Counters();
+    allCounters.add(active);
+  }
+
+  CountersWrapper(org.apache.hadoop.mapred.Counters counters) {
+    this.active = new Counters(counters);
+    allCounters.add(active);
+  }
+
+  @Override
+  public Counter findCounter(String groupName, String counterName) {
+    Map<String, Counter> c = lookupCache.get(groupName);
+    if (c == null) {
+      c = Maps.newHashMap();
+      lookupCache.put(groupName, c);
+    }
+    Counter counter = c.get(counterName);
+    if (counter == null) {
+      try {
+        counter = active.findCounter(groupName, counterName);
+      } catch (Exception e) {
+        // Recover from this by creating a new active instance
+        active = new Counters();
+        allCounters.add(active);
+        counter = active.findCounter(groupName, counterName);
+      }
+      c.put(counterName, counter);
+    }
+    return counter;
+  }
+
+  @Override
+  public synchronized Counter findCounter(Enum<?> key) {
+    return findCounter(key.getClass().getName(), key.name());
+  }
+
+  @Override
+  public synchronized Collection<String> getGroupNames() {
+    return lookupCache.keySet();
+  }
+
+  @Override
+  public Iterator<CounterGroup> iterator() {
+    return Iterators.concat(Iterables.transform(allCounters, new Function<Counters, Iterator<CounterGroup>>() {
+      @Override
+      public Iterator<CounterGroup> apply(@Nullable Counters input) {
+        return input.iterator();
+      }
+    }).iterator());
+  }
+
+  @Override
+  public synchronized CounterGroup getGroup(String groupName) {
+    if (allCounters.size() == 1) {
+      return active.getGroup(groupName);
+    } else {
+      throw new UnsupportedOperationException(
+          "CounterWrapper cannot return CounterGroup when there are too many Counters");
+    }
+  }
+
+  public synchronized void write(DataOutput out) throws IOException {
+    throw new UnsupportedOperationException("CountersWrapper may not be written");
+  }
+
+  public synchronized void readFields(DataInput in) throws IOException {
+    throw new UnsupportedOperationException("CountersWrapper may not be read");
+  }
+
+  @Override
+  public synchronized int countCounters() {
+    int cntrs = 0;
+    for (Counters c : allCounters) {
+      cntrs += c.countCounters();
+    }
+    return cntrs;
+  }
+
+  public synchronized void incrAllCounters(Counters other) {
+    for (CounterGroup cg : other) {
+      for (Counter c : cg) {
+        findCounter(cg.getName(), c.getName()).increment(c.getValue());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a7002c11/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index cc9ad69..5c0f6b0 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -67,7 +67,7 @@ import com.google.common.collect.Sets;
 public class MemPipeline implements Pipeline {
 
   private static final Log LOG = LogFactory.getLog(MemPipeline.class);
-  private static Counters COUNTERS = new Counters();
+  private static Counters COUNTERS = new CountersWrapper();
   private static final MemPipeline INSTANCE = new MemPipeline();
 
   private int outputIndex = 0;
@@ -77,7 +77,7 @@ public class MemPipeline implements Pipeline {
   }
   
   public static void clearCounters() {
-    COUNTERS = new Counters();
+    COUNTERS = new CountersWrapper();
   }
 
   public static Pipeline getInstance() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/a7002c11/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java
new file mode 100644
index 0000000..458ecc7
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.crunch.impl.mem;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mem.collect.MemCollection;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class CountersTest {
+
+  @Test
+  public void counterTest() throws Exception {
+    Pipeline pipeline = MemPipeline.getInstance();
+
+    // Single row PCollection.
+    PCollection<String> objects = MemPipeline.collectionOf(Arrays.asList(new String[]{"hello world"}));
+    System.out.println("Objects: " + ((MemCollection) objects).getCollection());
+
+    // Counter creating Map.
+    PCollection<String> objects2 = objects.parallelDo("Create counters",
+        new MapFn<String, String>() {
+          @Override
+          public String map(String input) {
+            for(int i = 0; i < 200; ++i) {
+              this.increment("testCounter", String.valueOf(i));
+            }
+            return input;
+          }
+        },
+        Writables.strings()
+    );
+
+    // Run it!
+    pipeline.done();
+    System.out.println("Objects2: " + ((MemCollection) objects2).getCollection());
+  }
+}