You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/11/02 03:02:29 UTC
git commit: CRUNCH-292: Hack around job counter limits in Hadoop-2
for in-memory pipelines
Updated Branches:
refs/heads/master 9a689d1ed -> a7002c117
CRUNCH-292: Hack around job counter limits in Hadoop-2 for in-memory pipelines
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a7002c11
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a7002c11
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a7002c11
Branch: refs/heads/master
Commit: a7002c1178ae9e8dd522d029c96d80ba7d616df8
Parents: 9a689d1
Author: Josh Wills <jw...@apache.org>
Authored: Fri Nov 1 10:04:21 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Nov 1 13:50:12 2013 -0700
----------------------------------------------------------------------
.../apache/crunch/impl/mem/CountersWrapper.java | 127 +++++++++++++++++++
.../org/apache/crunch/impl/mem/MemPipeline.java | 4 +-
.../apache/crunch/impl/mem/CountersTest.java | 54 ++++++++
3 files changed, 183 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/a7002c11/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java
new file mode 100644
index 0000000..7fe893c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/CountersWrapper.java
@@ -0,0 +1,127 @@
+/**
+ * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.crunch.impl.mem;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+
+import javax.annotation.Nullable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+class CountersWrapper extends Counters {
+
+ private Counters active;
+ private final Map<String, Map<String, Counter>> lookupCache = Maps.newHashMap();
+ private Set<Counters> allCounters = Sets.newHashSet();
+
+ CountersWrapper() {
+ this.active = new Counters();
+ allCounters.add(active);
+ }
+
+ CountersWrapper(org.apache.hadoop.mapred.Counters counters) {
+ this.active = new Counters(counters);
+ allCounters.add(active);
+ }
+
+ @Override
+ public Counter findCounter(String groupName, String counterName) {
+ Map<String, Counter> c = lookupCache.get(groupName);
+ if (c == null) {
+ c = Maps.newHashMap();
+ lookupCache.put(groupName, c);
+ }
+ Counter counter = c.get(counterName);
+ if (counter == null) {
+ try {
+ counter = active.findCounter(groupName, counterName);
+ } catch (Exception e) {
+ // Recover from this by creating a new active instance
+ active = new Counters();
+ allCounters.add(active);
+ counter = active.findCounter(groupName, counterName);
+ }
+ c.put(counterName, counter);
+ }
+ return counter;
+ }
+
+ @Override
+ public synchronized Counter findCounter(Enum<?> key) {
+ return findCounter(key.getClass().getName(), key.name());
+ }
+
+ @Override
+ public synchronized Collection<String> getGroupNames() {
+ return lookupCache.keySet();
+ }
+
+ @Override
+ public Iterator<CounterGroup> iterator() {
+ return Iterators.concat(Iterables.transform(allCounters, new Function<Counters, Iterator<CounterGroup>>() {
+ @Override
+ public Iterator<CounterGroup> apply(@Nullable Counters input) {
+ return input.iterator();
+ }
+ }).iterator());
+ }
+
+ @Override
+ public synchronized CounterGroup getGroup(String groupName) {
+ if (allCounters.size() == 1) {
+ return active.getGroup(groupName);
+ } else {
+ throw new UnsupportedOperationException(
+ "CounterWrapper cannot return CounterGroup when there are too many Counters");
+ }
+ }
+
+ public synchronized void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("CountersWrapper may not be written");
+ }
+
+ public synchronized void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("CountersWrapper may not be read");
+ }
+
+ @Override
+ public synchronized int countCounters() {
+ int cntrs = 0;
+ for (Counters c : allCounters) {
+ cntrs += c.countCounters();
+ }
+ return cntrs;
+ }
+
+ public synchronized void incrAllCounters(Counters other) {
+ for (CounterGroup cg : other) {
+ for (Counter c : cg) {
+ findCounter(cg.getName(), c.getName()).increment(c.getValue());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/a7002c11/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index cc9ad69..5c0f6b0 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -67,7 +67,7 @@ import com.google.common.collect.Sets;
public class MemPipeline implements Pipeline {
private static final Log LOG = LogFactory.getLog(MemPipeline.class);
- private static Counters COUNTERS = new Counters();
+ private static Counters COUNTERS = new CountersWrapper();
private static final MemPipeline INSTANCE = new MemPipeline();
private int outputIndex = 0;
@@ -77,7 +77,7 @@ public class MemPipeline implements Pipeline {
}
public static void clearCounters() {
- COUNTERS = new Counters();
+ COUNTERS = new CountersWrapper();
}
public static Pipeline getInstance() {
http://git-wip-us.apache.org/repos/asf/crunch/blob/a7002c11/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java
new file mode 100644
index 0000000..458ecc7
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.crunch.impl.mem;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mem.collect.MemCollection;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class CountersTest {
+
+ @Test
+ public void counterTest() throws Exception {
+ Pipeline pipeline = MemPipeline.getInstance();
+
+ // Single row PCollection.
+ PCollection<String> objects = MemPipeline.collectionOf(Arrays.asList(new String[]{"hello world"}));
+ System.out.println("Objects: " + ((MemCollection) objects).getCollection());
+
+ // Counter creating Map.
+ PCollection<String> objects2 = objects.parallelDo("Create counters",
+ new MapFn<String, String>() {
+ @Override
+ public String map(String input) {
+ for(int i = 0; i < 200; ++i) {
+ this.increment("testCounter", String.valueOf(i));
+ }
+ return input;
+ }
+ },
+ Writables.strings()
+ );
+
+ // Run it!
+ pipeline.done();
+ System.out.println("Objects2: " + ((MemCollection) objects2).getCollection());
+ }
+}