You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 06:54:44 UTC
svn commit: r1079197 [2/2] - in /hadoop/mapreduce/branches/yahoo-merge/src:
java/ java/org/apache/hadoop/mapred/ java/org/apache/hadoop/mapreduce/
java/org/apache/hadoop/mapreduce/counters/
java/org/apache/hadoop/mapreduce/jobhistory/ java/org/apache/h...
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java?rev=1079197&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java Tue Mar 8 05:54:43 2011
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.util.ResourceBundles;
+
+/**
+ * An abstract class to provide common implementation of the
+ * generic counter group in both mapred and mapreduce package.
+ *
+ * @param <T> type of the counter for the group
+ */
+@InterfaceAudience.Private
+public abstract class AbstractCounterGroup<T extends Counter>
+ implements CounterGroupBase<T> {
+
+ private final String name;
+ private String displayName;
+ private final Map<String, T> counters = Maps.newTreeMap();
+ private final Limits limits;
+
+ public AbstractCounterGroup(String name, String displayName,
+ Limits limits) {
+ this.name = name;
+ this.displayName = displayName;
+ this.limits = limits;
+ }
+
+ @Override
+ public synchronized String getName() {
+ return name;
+ }
+
+ @Override
+ public synchronized String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public synchronized void setDisplayName(String displayName) {
+ this.displayName = displayName;
+ }
+
+ @Override
+ public synchronized void addCounter(T counter) {
+ counters.put(counter.getName(), counter);
+ limits.incrCounters();
+ }
+
+ @Override
+ public synchronized T addCounter(String counterName, String displayName,
+ long value) {
+ String saveName = limits.filterCounterName(counterName);
+ T counter = findCounterImpl(saveName, false);
+ if (counter == null) {
+ return addCounterImpl(saveName, displayName, value);
+ }
+ counter.setValue(value);
+ return counter;
+ }
+
+ private T addCounterImpl(String name, String displayName, long value) {
+ T counter = newCounter(name, displayName, value);
+ addCounter(counter);
+ return counter;
+ }
+
+ @Override
+ public T findCounter(String counterName, String displayName) {
+ String saveName = limits.filterCounterName(counterName);
+ T counter = findCounterImpl(saveName, false);
+ if (counter == null) {
+ return addCounterImpl(saveName, displayName, 0);
+ }
+ return counter;
+ }
+
+ @Override
+ public synchronized T findCounter(String counterName, boolean create) {
+ return findCounterImpl(limits.filterCounterName(counterName), create);
+ }
+
+ private T findCounterImpl(String counterName, boolean create) {
+ T counter = counters.get(counterName);
+ if (counter == null && create) {
+ String localized =
+ ResourceBundles.getCounterName(getName(), counterName, counterName);
+ return addCounterImpl(counterName, localized, 0);
+ }
+ return counter;
+ }
+
+ @Override
+ public T findCounter(String counterName) {
+ return findCounter(counterName, true);
+ }
+
+ /**
+ * Abstract factory method to create a new counter of type T
+ * @param counterName of the counter
+ * @param displayName of the counter
+ * @param value of the counter
+ * @return a new counter
+ */
+ protected abstract T newCounter(String counterName, String displayName,
+ long value);
+
+ /**
+ * Abstract factory method to create a new counter of type T
+ * @return a new counter object
+ */
+ protected abstract T newCounter();
+
+ @Override
+ public synchronized Iterator<T> iterator() {
+ return counters.values().iterator();
+ }
+
+ /**
+ * GenericGroup ::= displayName #counter counter*
+ */
+ @Override
+ public synchronized void write(DataOutput out) throws IOException {
+ Text.writeString(out, displayName);
+ WritableUtils.writeVInt(out, counters.size());
+ for(Counter counter: counters.values()) {
+ counter.write(out);
+ }
+ }
+
+ @Override
+ public synchronized void readFields(DataInput in) throws IOException {
+ displayName = Text.readString(in);
+ counters.clear();
+ int size = WritableUtils.readVInt(in);
+ for (int i = 0; i < size; i++) {
+ T counter = newCounter();
+ counter.readFields(in);
+ counters.put(counter.getName(), counter);
+ limits.incrCounters();
+ }
+ }
+
+ @Override
+ public synchronized int size() {
+ return counters.size();
+ }
+
+ @Override
+ public synchronized boolean equals(Object genericRight) {
+ if (genericRight instanceof CounterGroupBase<?>) {
+ @SuppressWarnings("unchecked")
+ CounterGroupBase<T> right = (CounterGroupBase<T>) genericRight;
+ return Iterators.elementsEqual(iterator(), right.iterator());
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized int hashCode() {
+ return counters.hashCode();
+ }
+
+ @Override
+ public void incrAllCounters(CounterGroupBase<T> rightGroup) {
+ try {
+ for (Counter right : rightGroup) {
+ Counter left = findCounter(right.getName(), right.getDisplayName());
+ left.increment(right.getValue());
+ }
+ } catch (LimitExceededException e) {
+ counters.clear();
+ throw e;
+ }
+ }
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java?rev=1079197&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java Tue Mar 8 05:54:43 2011
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.*;
+
+/**
+ * An abstract class to provide common implementation for the Counters
+ * container in both mapred and mapreduce packages.
+ *
+ * @param <C> type of counter inside the counters
+ * @param <G> type of group inside the counters
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class AbstractCounters<C extends Counter,
+ G extends CounterGroupBase<C>>
+ implements Writable, Iterable<G> {
+
+ protected static final Log LOG = LogFactory.getLog("mapreduce.Counters");
+
+ /**
+ * A cache from enum values to the associated counter.
+ */
+ private Map<Enum<?>, C> cache = Maps.newIdentityHashMap();
+ private Map<String, G> fgroups = Maps.newTreeMap(); // framework & fs groups
+ private Map<String, G> groups = Maps.newTreeMap(); // other groups
+ private final CounterGroupFactory<C, G> groupFactory;
+
+ // For framework counter serialization without strings
+ enum GroupType { FRAMEWORK, FILESYSTEM };
+
+ // Writes only framework and fs counters if false.
+ private boolean writeAllCounters = true;
+
+ private static final Map<String, String> legacyMap = Maps.newHashMap();
+ static {
+ legacyMap.put("org.apache.hadoop.mapred.Task$Counter",
+ TaskCounter.class.getName());
+ legacyMap.put("org.apache.hadoop.mapred.JobInProgress$Counter",
+ JobCounter.class.getName());
+ }
+
+ @InterfaceAudience.Private
+ public AbstractCounters(CounterGroupFactory<C, G> gf) {
+ groupFactory = gf;
+ }
+
+ /**
+ * Construct from another counters object.
+ * @param <C1> type of the other counter
+ * @param <G1> type of the other counter group
+ * @param counters the counters object to copy
+ * @param groupFactory the factory for new groups
+ */
+ @InterfaceAudience.Private
+ public <C1 extends Counter, G1 extends CounterGroupBase<C1>>
+ AbstractCounters(AbstractCounters<C1, G1> counters,
+ CounterGroupFactory<C, G> groupFactory) {
+ this.groupFactory = groupFactory;
+ for(G1 group: counters) {
+ String name = group.getName();
+ G newGroup = groupFactory.newGroup(name, group.getDisplayName());
+ (isFrameworkGroup(name) ? fgroups : groups).put(name, newGroup);
+ for(Counter counter: group) {
+ newGroup.addCounter(counter.getName(), counter.getDisplayName(),
+ counter.getValue());
+ }
+ }
+ }
+
+ /** Add a group.
+ * @param group object to add
+ * @return the group
+ */
+ @InterfaceAudience.Private
+ public synchronized G addGroup(G group) {
+ if (isFrameworkGroup(group)) {
+ fgroups.put(group.getName(), group);
+ } else {
+ groupFactory.limits().checkGroups(groups.size() + 1);
+ groups.put(group.getName(), group);
+ }
+ return group;
+ }
+
+ /**
+ * Add a new group
+ * @param name of the group
+ * @param displayName of the group
+ * @return the group
+ */
+ @InterfaceAudience.Private
+ public G addGroup(String name, String displayName) {
+ return addGroup(groupFactory.newGroup(name, displayName));
+ }
+
+ /**
+ * Find a counter, create one if necessary
+ * @param groupName of the counter
+ * @param counterName name of the counter
+ * @return the matching counter
+ */
+ public C findCounter(String groupName, String counterName) {
+ G grp = getGroup(groupName);
+ return grp.findCounter(counterName);
+ }
+
+ /**
+ * Find the counter for the given enum. The same enum will always return the
+ * same counter.
+ * @param key the counter key
+ * @return the matching counter object
+ */
+ public synchronized C findCounter(Enum<?> key) {
+ C counter = cache.get(key);
+ if (counter == null) {
+ counter = findCounter(key.getDeclaringClass().getName(), key.name());
+ cache.put(key, counter);
+ }
+ return counter;
+ }
+
+ /**
+ * Find the file system counter for the given scheme and enum.
+ * @param scheme of the file system
+ * @param key the enum of the counter
+ * @return the file system counter
+ */
+ @InterfaceAudience.Private
+ public synchronized C findCounter(String scheme, FileSystemCounter key) {
+ return ((FileSystemCounterGroup<C>) getGroup(
+ FileSystemCounter.class.getName())).findCounter(scheme, key);
+ }
+
+ /**
+ * Returns the names of all counter classes.
+ * @return Set of counter names.
+ */
+ public synchronized Iterable<String> getGroupNames() {
+ return Iterables.concat(fgroups.keySet(), groups.keySet());
+ }
+
+ @Override
+ public Iterator<G> iterator() {
+ return Iterators.concat(fgroups.values().iterator(),
+ groups.values().iterator());
+ }
+
+ /**
+ * Returns the named counter group, or an empty group if there is none
+ * with the specified name.
+ * @param groupName name of the group
+ * @return the group
+ */
+ public synchronized G getGroup(String groupName) {
+ boolean isFGroup = isFrameworkGroup(groupName);
+ G group = isFGroup ? fgroups.get(groupName) : groups.get(groupName);
+ if (group == null) {
+ group = groupFactory.newGroup(filterGroupName(groupName));
+ if (isFGroup) {
+ fgroups.put(groupName, group);
+ } else {
+ groupFactory.limits().checkGroups(groups.size() + 1);
+ groups.put(groupName, group);
+ }
+ }
+ return group;
+ }
+
+ private String filterGroupName(String oldName) {
+ String newName = legacyMap.get(oldName);
+ if (newName == null) {
+ return groupFactory.limits().filterGroupName(oldName);
+ }
+ LOG.warn("Group "+ oldName +" is deprecated. Use "+ newName +" instead");
+ return newName;
+ }
+
+ /**
+ * Returns the total number of counters, by summing the number of counters
+ * in each group.
+ * @return the total number of counters
+ */
+ public synchronized int countCounters() {
+ int result = 0;
+ for (G group : this) {
+ result += group.size();
+ }
+ return result;
+ }
+
+ /**
+ * Write the set of groups.
+ * Counters ::= version #fgroups (groupId, group)* #groups (group)*
+ */
+ @Override
+ public synchronized void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, groupFactory.version());
+ WritableUtils.writeVInt(out, fgroups.size()); // framework groups first
+ for (G group : fgroups.values()) {
+ if (group instanceof FrameworkCounterGroup<?, ?>) {
+ WritableUtils.writeVInt(out, GroupType.FRAMEWORK.ordinal());
+ WritableUtils.writeVInt(out, getFrameworkGroupId(group.getName()));
+ group.write(out);
+ } else if (group instanceof FileSystemCounterGroup<?>) {
+ WritableUtils.writeVInt(out, GroupType.FILESYSTEM.ordinal());
+ group.write(out);
+ }
+ }
+ if (writeAllCounters) {
+ WritableUtils.writeVInt(out, groups.size());
+ for (G group : groups.values()) {
+ Text.writeString(out, group.getName());
+ group.write(out);
+ }
+ } else {
+ WritableUtils.writeVInt(out, 0);
+ }
+ }
+
+ @Override
+ public synchronized void readFields(DataInput in) throws IOException {
+ int version = WritableUtils.readVInt(in);
+ if (version != groupFactory.version()) {
+ throw new IOException("Counters version mismatch, expected "+
+ groupFactory.version() +" got "+ version);
+ }
+ int numFGroups = WritableUtils.readVInt(in);
+ fgroups.clear();
+ GroupType[] groupTypes = GroupType.values();
+ while (numFGroups-- > 0) {
+ GroupType groupType = groupTypes[WritableUtils.readVInt(in)];
+ G group;
+ switch (groupType) {
+ case FILESYSTEM: // with nothing
+ group = groupFactory.newFileSystemGroup();
+ break;
+ case FRAMEWORK: // with group id
+ group = groupFactory.newFrameworkGroup(WritableUtils.readVInt(in));
+ break;
+ default: // Silence dumb compiler, as it would've thrown earlier
+ throw new IOException("Unexpected counter group type: "+ groupType);
+ }
+ group.readFields(in);
+ fgroups.put(group.getName(), group);
+ }
+ int numGroups = WritableUtils.readVInt(in);
+ while (numGroups-- > 0) {
+ groupFactory.limits().checkGroups(groups.size() + 1);
+ G group = groupFactory.newGenericGroup(Text.readString(in), null,
+ groupFactory.limits());
+ group.readFields(in);
+ groups.put(group.getName(), group);
+ }
+ }
+
+ /**
+ * Return textual representation of the counter values.
+ * @return the string
+ */
+ @Override
+ public synchronized String toString() {
+ StringBuilder sb = new StringBuilder("Counters: " + countCounters());
+ for (G group: this) {
+ sb.append("\n\t").append(group.getDisplayName());
+ for (Counter counter: group) {
+ sb.append("\n\t\t").append(counter.getDisplayName()).append("=")
+ .append(counter.getValue());
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Increments multiple counters by their amounts in another Counters
+ * instance.
+ * @param other the other Counters instance
+ */
+ public synchronized void incrAllCounters(AbstractCounters<C, G> other) {
+ for(G right : other) {
+ G left = groups.get(right.getName());
+ if (left == null) {
+ groupFactory.limits().checkGroups(groups.size() + 1);
+ left = groupFactory.newGroup(right.getName(), right.getDisplayName());
+ groups.put(right.getName(), left);
+ }
+ left.incrAllCounters(right);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public boolean equals(Object genericRight) {
+ if (genericRight instanceof AbstractCounters<?, ?>) {
+ return Iterators.elementsEqual(iterator(),
+ ((AbstractCounters<C, G>)genericRight).iterator());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return groups.hashCode();
+ }
+
+ /**
+ * Set the "writeAllCounters" option to true or false
+ * @param send if true all counters would be serialized, otherwise only
+ * framework counters would be serialized in
+ * {@link #write(DataOutput)}
+ */
+ @InterfaceAudience.Private
+ public void setWriteAllCounters(boolean send) {
+ writeAllCounters = send;
+ }
+
+ /**
+ * Get the "writeAllCounters" option
+ * @return true of all counters would serialized
+ */
+ @InterfaceAudience.Private
+ public boolean getWriteAllCounters() {
+ return writeAllCounters;
+ }
+
+ @InterfaceAudience.Private
+ public Limits limits() {
+ return groupFactory.limits();
+ }
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java?rev=1079197&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupBase.java Tue Mar 8 05:54:43 2011
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counter;
+
+/**
+ * The common counter group interface.
+ *
+ * @param <T> type of the counter for the group
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface CounterGroupBase<T extends Counter>
+ extends Writable, Iterable<T> {
+
+ /**
+ * Get the internal name of the group
+ * @return the internal name
+ */
+ String getName();
+
+ /**
+ * Get the display name of the group.
+ * @return the human readable name
+ */
+ String getDisplayName();
+
+ /**
+ * Set the display name of the group
+ * @param displayName of the group
+ */
+ void setDisplayName(String displayName);
+
+ /** Add a counter to this group.
+ * @param counter to add
+ */
+ void addCounter(T counter);
+
+ /**
+ * Add a counter to this group
+ * @param name of the counter
+ * @param displayName of the counter
+ * @param value of the counter
+ * @return the counter
+ */
+ T addCounter(String name, String displayName, long value);
+
+ /**
+ * Find a counter in the group.
+ * @param counterName the name of the counter
+ * @param displayName the display name of the counter
+ * @return the counter that was found or added
+ */
+ T findCounter(String counterName, String displayName);
+
+ /**
+ * Find a counter in the group
+ * @param counterName the name of the counter
+ * @param create create the counter if not found if true
+ * @return the counter that was found or added or null if create is false
+ */
+ T findCounter(String counterName, boolean create);
+
+ /**
+ * Find a counter in the group.
+ * @param counterName the name of the counter
+ * @return the counter that was found or added
+ */
+ T findCounter(String counterName);
+
+ /**
+ * @return the number of counters in this group.
+ */
+ int size();
+
+ /**
+ * Increment all counters by a group of counters
+ * @param rightGroup the group to be added to this group
+ */
+ void incrAllCounters(CounterGroupBase<T> rightGroup);
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupFactory.java?rev=1079197&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupFactory.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/CounterGroupFactory.java Tue Mar 8 05:54:43 2011
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.util.ResourceBundles;
+
+/**
+ * An abstract class to provide common implementation of the
+ * group factory in both mapred and mapreduce packages.
+ *
+ * @param <C> type of the counter
+ * @param <G> type of the group
+ */
+@InterfaceAudience.Private
+public abstract class CounterGroupFactory<C extends Counter,
+ G extends CounterGroupBase<C>> {
+
+ public interface FrameworkGroupFactory<F> {
+ F newGroup(String name);
+ }
+
+ // Integer mapping (for serialization) for framework groups
+ private static final Map<String, Integer> s2i = Maps.newHashMap();
+ private static final List<String> i2s = Lists.newArrayList();
+ private static final int VERSION = 1;
+
+ private final Limits limits = new Limits();
+
+ private final Map<String, FrameworkGroupFactory<G>> fmap = Maps.newHashMap();
+ {
+ // Add builtin counter class here and the version when changed.
+ addFrameworkGroup(TaskCounter.class);
+ addFrameworkGroup(JobCounter.class);
+ }
+
+ // Initialize the framework counter group mapping
+ private synchronized <T extends Enum<T>>
+ void addFrameworkGroup(final Class<T> cls) {
+ updateFrameworkGroupMapping(cls);
+ fmap.put(cls.getName(), newFrameworkGroupFactory(cls));
+ }
+
+ // Update static mappings (c2i, i2s) of framework groups
+ private static synchronized void updateFrameworkGroupMapping(Class<?> cls) {
+ String name = cls.getName();
+ Integer i = s2i.get(name);
+ if (i != null) return;
+ i2s.add(name);
+ s2i.put(name, i2s.size() - 1);
+ }
+
+ /**
+ * Required override to return a new framework group factory
+ * @param <T> type of the counter enum class
+ * @param cls the counter enum class
+ * @return a new framework group factory
+ */
+ protected abstract <T extends Enum<T>>
+ FrameworkGroupFactory<G> newFrameworkGroupFactory(Class<T> cls);
+
+ /**
+ * Create a new counter group
+ * @param name of the group
+ * @return a new counter group
+ */
+ public G newGroup(String name) {
+ return newGroup(name, ResourceBundles.getCounterGroupName(name, name));
+ }
+
+ /**
+ * Create a new counter group
+ * @param name of the group
+ * @param displayName of the group
+ * @return a new counter group
+ */
+ public G newGroup(String name, String displayName) {
+ FrameworkGroupFactory<G> gf = fmap.get(name);
+ if (gf != null) return gf.newGroup(name);
+ if (name.equals(FileSystemCounter.class.getName())) {
+ return newFileSystemGroup();
+ }
+ return newGenericGroup(name, displayName, limits);
+ }
+
+ /**
+ * Create a new framework group
+ * @param id of the group
+ * @return a new framework group
+ */
+ public G newFrameworkGroup(int id) {
+ String name;
+ synchronized(CounterGroupFactory.class) {
+ if (id < 0 || id >= i2s.size()) throwBadFrameGroupIdException(id);
+ name = i2s.get(id); // should not throw here.
+ }
+ FrameworkGroupFactory<G> gf = fmap.get(name);
+ if (gf == null) throwBadFrameGroupIdException(id);
+ return gf.newGroup(name);
+ }
+
+ /**
+ * Get the id of a framework group
+ * @param name of the group
+ * @return the framework group id
+ */
+ public static synchronized int getFrameworkGroupId(String name) {
+ Integer i = s2i.get(name);
+ if (i == null) throwBadFrameworkGroupNameException(name);
+ return i;
+ }
+
+ /**
+ * @return the counter factory version
+ */
+ public int version() {
+ return VERSION;
+ }
+
+ /**
+ * Check whether a group name is a name of a framework group
+ * @param name to check
+ * @return true for framework group names
+ */
+ public static synchronized boolean isFrameworkGroup(String name) {
+ return s2i.get(name) != null;
+ }
+
+ /**
+ * Check whether a group object is name of a framework group
+ * @param group object to check
+ * @return true for framework groups
+ */
+ public static boolean isFrameworkGroup(CounterGroupBase<?> group) {
+ return isFrameworkGroup(group.getName());
+ }
+
+ private static void throwBadFrameGroupIdException(int id) {
+ throw new IllegalArgumentException("bad framework group id: "+ id);
+ }
+
+ private static void throwBadFrameworkGroupNameException(String name) {
+ throw new IllegalArgumentException("bad framework group name: "+ name);
+ }
+
+ /**
+ * Abstract factory method to create a generic (vs framework) counter group
+ * @param name of the group
+ * @param displayName of the group
+ * @param limits limits of the counters
+ * @return a new generic counter group
+ */
+ protected abstract G newGenericGroup(String name, String displayName,
+ Limits limits);
+
+ /**
+ * Abstract factory method to create a file system counter group
+ * @return a new file system counter group
+ */
+ protected abstract G newFileSystemGroup();
+
+ @InterfaceAudience.Private
+ public Limits limits() {
+ return limits;
+ }
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java?rev=1079197&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java Tue Mar 8 05:54:43 2011
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentMap;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+import static com.google.common.base.Preconditions.*;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.util.ResourceBundles;
+
+/**
+ * An abstract class to provide common implementation of the filesystem
+ * counter group in both mapred and mapreduce packages.
+ *
+ * @param <C> the type of the Counter for the group
+ */
+@InterfaceAudience.Private
+public abstract class FileSystemCounterGroup<C extends Counter>
+ implements CounterGroupBase<C> {
+
+ static final int MAX_NUM_SCHEMES = 100; // intern/sanity check
+ static final ConcurrentMap<String, String> schemes = Maps.newConcurrentMap();
+
+ // C[] would need Array.newInstance which requires a Class<C> reference.
+ // Just a few local casts probably worth not having to carry it around.
+ private final Map<String, Object[]> map = Maps.newTreeMap();
+ private String displayName;
+
+ @InterfaceAudience.Private
+ public class FSCounter extends AbstractCounter {
+ final String scheme;
+ final FileSystemCounter key;
+ private long value;
+
+ public FSCounter(String scheme, FileSystemCounter ref) {
+ this.scheme = scheme;
+ key = ref;
+ }
+
+ @Override
+ public String getName() {
+ return Joiner.on('_').join(scheme, key.name());
+ }
+
+ @Override
+ public String getDisplayName() {
+ return Joiner.on(": ").join(scheme, localizeCounterName(key.name()));
+ }
+
+ protected String localizeCounterName(String counterName) {
+ return ResourceBundles.getCounterName(FileSystemCounter.class.getName(),
+ counterName, counterName);
+ }
+
+ @Override
+ public long getValue() {
+ return value;
+ }
+
+ @Override
+ public void setValue(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public void increment(long incr) {
+ value += incr;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ assert false : "shouldn't be called";
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ assert false : "shouldn't be called";
+ }
+ }
+
+ @Override
+ public String getName() {
+ return FileSystemCounter.class.getName();
+ }
+
+ @Override
+ public String getDisplayName() {
+ if (displayName == null) {
+ displayName = ResourceBundles.getCounterGroupName(getName(),
+ "File System Counters");
+ }
+ return displayName;
+ }
+
+ @Override
+ public void setDisplayName(String displayName) {
+ this.displayName = displayName;
+ }
+
+ @Override
+ public void addCounter(C counter) {
+ C ours;
+ if (counter instanceof FileSystemCounterGroup<?>.FSCounter) {
+ @SuppressWarnings("unchecked")
+ FSCounter c = (FSCounter) counter;
+ ours = findCounter(c.scheme, c.key);
+ }
+ else {
+ ours = findCounter(counter.getName());
+ }
+ ours.setValue(counter.getValue());
+ }
+
+ @Override
+ public C addCounter(String name, String displayName, long value) {
+ C counter = findCounter(name);
+ counter.setValue(value);
+ return counter;
+ }
+
+ // Parse generic counter name into [scheme, key]
+ private String[] parseCounterName(String counterName) {
+ int schemeEnd = counterName.indexOf('_');
+ if (schemeEnd < 0) {
+ throw new IllegalArgumentException("bad fs counter name");
+ }
+ return new String[]{counterName.substring(0, schemeEnd - 1),
+ counterName.substring(schemeEnd + 1)};
+ }
+
+ @Override
+ public C findCounter(String counterName, String displayName) {
+ return findCounter(counterName);
+ }
+
+ @Override
+ public C findCounter(String counterName, boolean create) {
+ try {
+ String[] pair = parseCounterName(counterName);
+ return findCounter(pair[0], FileSystemCounter.valueOf(pair[1]));
+ }
+ catch (Exception e) {
+ if (create) throw new IllegalArgumentException(e);
+ return null;
+ }
+ }
+
+ @Override
+ public C findCounter(String counterName) {
+ return findCounter(counterName, true);
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized C findCounter(String scheme, FileSystemCounter key) {
+ final String canonicalScheme = checkScheme(scheme);
+ Object[] counters = map.get(canonicalScheme);
+ int ord = key.ordinal();
+ if (counters == null) {
+ counters = new Object[FileSystemCounter.values().length];
+ map.put(canonicalScheme, counters);
+ counters[ord] = newCounter(canonicalScheme, key);
+ }
+ else if (counters[ord] == null) {
+ counters[ord] = newCounter(canonicalScheme, key);
+ }
+ return (C) counters[ord];
+ }
+
+ private String checkScheme(String scheme) {
+ String fixed = scheme.toUpperCase(Locale.US);
+ String interned = schemes.putIfAbsent(fixed, fixed);
+ if (schemes.size() > MAX_NUM_SCHEMES) {
+ // mistakes or abuses
+ throw new IllegalArgumentException("too many schemes? "+ schemes.size() +
+ " when process scheme: "+ scheme);
+ }
+ return interned == null ? fixed : interned;
+ }
+
+ /**
+ * Abstract factory method to create a file system counter
+ * @param scheme of the file system
+ * @param key the enum of the file system counter
+ * @return a new file system counter
+ */
+ protected abstract C newCounter(String scheme, FileSystemCounter key);
+
+ @Override
+ public int size() {
+ // It's used for reserve space anyway.
+ return map.size() * FileSystemCounter.values().length;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void incrAllCounters(CounterGroupBase<C> other) {
+ if (checkNotNull(other, "other group")
+ instanceof FileSystemCounterGroup<?>) {
+ for (Counter counter : other) {
+ FSCounter c = (FSCounter) counter;
+ findCounter(c.scheme, c.key) .increment(counter.getValue());
+ }
+ }
+ }
+
+ /**
+ * FileSystemGroup ::= #scheme (scheme #counter (key value)*)*
+ */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, map.size()); // #scheme
+ for (Map.Entry<String, Object[]> entry : map.entrySet()) {
+ WritableUtils.writeString(out, entry.getKey()); // scheme
+ // #counter for the above scheme
+ WritableUtils.writeVInt(out, numSetCounters(entry.getValue()));
+ for (Object counter : entry.getValue()) {
+ if (counter == null) continue;
+ @SuppressWarnings("unchecked")
+ FSCounter c = (FSCounter) counter;
+ WritableUtils.writeVInt(out, c.key.ordinal()); // key
+ WritableUtils.writeVLong(out, c.getValue()); // value
+ }
+ }
+ }
+
+ private int numSetCounters(Object[] counters) {
+ int n = 0;
+ for (Object counter : counters) if (counter != null) ++n;
+ return n;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int numSchemes = WritableUtils.readVInt(in); // #scheme
+ FileSystemCounter[] enums = FileSystemCounter.values();
+ for (int i = 0; i < numSchemes; ++i) {
+ String scheme = WritableUtils.readString(in); // scheme
+ int numCounters = WritableUtils.readVInt(in); // #counter
+ for (int j = 0; j < numCounters; ++j) {
+ findCounter(scheme, enums[WritableUtils.readVInt(in)]) // key
+ .setValue(WritableUtils.readVLong(in)); // value
+ }
+ }
+ }
+
+ @Override
+ public Iterator<C> iterator() {
+ return new AbstractIterator<C>() {
+ Iterator<Object[]> it = map.values().iterator();
+ Object[] counters = it.hasNext() ? it.next() : null;
+ int i = 0;
+ @Override
+ protected C computeNext() {
+ while (counters != null) {
+ while (i < counters.length) {
+ @SuppressWarnings("unchecked")
+ C counter = (C) counters[i++];
+ if (counter != null) return counter;
+ }
+ i = 0;
+ counters = it.hasNext() ? it.next() : null;
+ }
+ return endOfData();
+ }
+ };
+ }
+
+ @Override
+ public synchronized boolean equals(Object genericRight) {
+ if (genericRight instanceof CounterGroupBase<?>) {
+ @SuppressWarnings("unchecked")
+ CounterGroupBase<C> right = (CounterGroupBase<C>) genericRight;
+ return Iterators.elementsEqual(iterator(), right.iterator());
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized int hashCode() {
+ // need to be deep as counters is an array
+ int hash = FileSystemCounter.class.hashCode();
+ for (Object[] counters : map.values()) {
+ if (counters != null) hash ^= Arrays.hashCode(counters);
+ }
+ return hash;
+ }
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java?rev=1079197&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java Tue Mar 8 05:54:43 2011
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import static com.google.common.base.Preconditions.*;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.util.ResourceBundles;
+
+/**
+ * An abstract class to provide common implementation for the framework
+ * counter group in both mapred and mapreduce packages.
+ *
+ * @param <T> type of the counter enum class
+ * @param <C> type of the counter
+ */
+@InterfaceAudience.Private
+public abstract class FrameworkCounterGroup<T extends Enum<T>,
+ C extends Counter> implements CounterGroupBase<C> {
+
+ private final Class<T> enumClass; // for Enum.valueOf
+ private final Object[] counters; // local casts are OK and save a class ref
+ private String displayName = null;
+
+ /**
+ * A counter facade for framework counters.
+ * Use old (which extends new) interface to make compatibility easier.
+ */
+ @InterfaceAudience.Private
+ public class FrameworkCounter extends AbstractCounter {
+ final T key;
+ private long value;
+
+ public FrameworkCounter(T ref) {
+ key = ref;
+ }
+
+ @Override
+ public String getName() {
+ return key.name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return localizeCounterName(getName());
+ }
+
+ @Override
+ public long getValue() {
+ return value;
+ }
+
+ @Override
+ public void setValue(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public void increment(long incr) {
+ value += incr;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ assert false : "shouldn't be called";
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ assert false : "shouldn't be called";
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public FrameworkCounterGroup(Class<T> enumClass) {
+ this.enumClass = enumClass;
+ T[] enums = enumClass.getEnumConstants();
+ counters = new Object[enums.length];
+ }
+
+ @Override
+ public String getName() {
+ return enumClass.getName();
+ }
+
+ @Override
+ public String getDisplayName() {
+ if (displayName == null) {
+ displayName = ResourceBundles.getCounterGroupName(getName(), getName());
+ }
+ return displayName;
+ }
+
+ @Override
+ public void setDisplayName(String displayName) {
+ this.displayName = displayName;
+ }
+
+ private String localizeCounterName(String counterName) {
+ return ResourceBundles.getCounterName(getName(), counterName, counterName);
+ }
+
+ private T valueOf(String name) {
+ return Enum.valueOf(enumClass, name);
+ }
+
+ @Override
+ public void addCounter(C counter) {
+ C ours = findCounter(counter.getName());
+ ours.setValue(counter.getValue());
+ }
+
+ @Override
+ public C addCounter(String name, String displayName, long value) {
+ C counter = findCounter(name);
+ counter.setValue(value);
+ return counter;
+ }
+
+ @Override
+ public C findCounter(String counterName, String displayName) {
+ return findCounter(counterName);
+ }
+
+ @Override
+ public C findCounter(String counterName, boolean create) {
+ try {
+ return findCounter(valueOf(counterName));
+ }
+ catch (Exception e) {
+ if (create) throw new IllegalArgumentException(e);
+ return null;
+ }
+ }
+
+ @Override
+ public C findCounter(String counterName) {
+ return findCounter(valueOf(counterName));
+ }
+
+ @SuppressWarnings("unchecked")
+ private C findCounter(T key) {
+ int i = key.ordinal();
+ if (counters[i] == null) {
+ counters[i] = newCounter(key);
+ }
+ return (C) counters[i];
+ }
+
+ /**
+ * Abstract factory method for new framework counter
+ * @param key for the enum value of a counter
+ * @return a new counter for the key
+ */
+ protected abstract C newCounter(T key);
+
+ @Override
+ public int size() {
+ // It's used for reserve space anyway.
+ return counters.length;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void incrAllCounters(CounterGroupBase<C> other) {
+ if (checkNotNull(other, "other counter group")
+ instanceof FrameworkCounterGroup<?, ?>) {
+ for (Counter counter : other) {
+ findCounter(((FrameworkCounter) counter).key)
+ .increment(counter.getValue());
+ }
+ }
+ }
+
+ /**
+ * FrameworkGroup ::= #counter (key value)*
+ */
+ @Override
+ @SuppressWarnings("unchecked")
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, numSetCounters());
+ for (int i = 0; i < counters.length; ++i) {
+ Counter counter = (C) counters[i];
+ if (counter != null) {
+ WritableUtils.writeVInt(out, i);
+ WritableUtils.writeVLong(out, counter.getValue());
+ }
+ }
+ }
+
+ private int numSetCounters() {
+ int n = 0;
+ for (int i = 0; i < counters.length; ++i) {
+ if (counters[i] != null) ++n;
+ }
+ return n;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ clear();
+ int len = WritableUtils.readVInt(in);
+ T[] enums = enumClass.getEnumConstants();
+ for (int i = 0; i < len; ++i) {
+ int ord = WritableUtils.readVInt(in);
+ Counter counter = newCounter(enums[ord]);
+ counter.setValue(WritableUtils.readVLong(in));
+ counters[ord] = counter;
+ }
+ }
+
+ private void clear() {
+ for (int i = 0; i < counters.length; ++i) {
+ counters[i] = null;
+ }
+ }
+
+ @Override
+ public Iterator<C> iterator() {
+ return new AbstractIterator<C>() {
+ int i = 0;
+ @Override
+ protected C computeNext() {
+ while (i < counters.length) {
+ @SuppressWarnings("unchecked")
+ C counter = (C) counters[i++];
+ if (counter != null) return counter;
+ }
+ return endOfData();
+ }
+ };
+ }
+
+ @Override
+ public synchronized boolean equals(Object genericRight) {
+ if (genericRight instanceof CounterGroupBase<?>) {
+ @SuppressWarnings("unchecked")
+ CounterGroupBase<C> right = (CounterGroupBase<C>) genericRight;
+ return Iterators.elementsEqual(iterator(), right.iterator());
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized int hashCode() {
+ // need to be deep as counters is an array
+ return Arrays.deepHashCode(new Object[]{enumClass, counters, displayName});
+ }
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java?rev=1079197&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java Tue Mar 8 05:54:43 2011
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A generic counter implementation
+ */
+@InterfaceAudience.Private
+public class GenericCounter extends AbstractCounter {
+
+ private String name;
+ private String displayName;
+ private long value = 0;
+
+ public GenericCounter() {
+ // mostly for readFields
+ }
+
+ public GenericCounter(String name, String displayName) {
+ this.name = name;
+ this.displayName = displayName;
+ }
+
+ public GenericCounter(String name, String displayName, long value) {
+ this.name = name;
+ this.displayName = displayName;
+ this.value = value;
+ }
+
+ @Override @Deprecated
+ public synchronized void setDisplayName(String displayName) {
+ this.displayName = displayName;
+ }
+
+ @Override
+ public synchronized void readFields(DataInput in) throws IOException {
+ name = Text.readString(in);
+ displayName = in.readBoolean() ? Text.readString(in) : name;
+ value = WritableUtils.readVLong(in);
+ }
+
+ /**
+ * GenericCounter ::= keyName isDistinctDisplayName [displayName] value
+ */
+ @Override
+ public synchronized void write(DataOutput out) throws IOException {
+ Text.writeString(out, name);
+ boolean distinctDisplayName = ! name.equals(displayName);
+ out.writeBoolean(distinctDisplayName);
+ if (distinctDisplayName) {
+ Text.writeString(out, displayName);
+ }
+ WritableUtils.writeVLong(out, value);
+ }
+
+ @Override
+ public synchronized String getName() {
+ return name;
+ }
+
+ @Override
+ public synchronized String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public synchronized long getValue() {
+ return value;
+ }
+
+ @Override
+ public synchronized void setValue(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public synchronized void increment(long incr) {
+ value += incr;
+ }
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/LimitExceededException.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/LimitExceededException.java?rev=1079197&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/LimitExceededException.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/LimitExceededException.java Tue Mar 8 05:54:43 2011
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class LimitExceededException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ public LimitExceededException(String msg) {
+ super(msg);
+ }
+
+ // Only allows chaining of related exceptions
+ public LimitExceededException(LimitExceededException cause) {
+ super(cause);
+ }
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/Limits.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/Limits.java?rev=1079197&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/Limits.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/Limits.java Tue Mar 8 05:54:43 2011
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.mapreduce.MRJobConfig.*;
+
+@InterfaceAudience.Private
+public class Limits {
+
+ static final Configuration conf = new Configuration();
+ static final int GROUP_NAME_MAX = conf.getInt(COUNTER_GROUP_NAME_MAX_KEY,
+ COUNTER_GROUP_NAME_MAX_DEFAULT);
+ static final int COUNTER_NAME_MAX = conf.getInt(COUNTER_NAME_MAX_KEY,
+ COUNTER_NAME_MAX_DEFAULT);
+ static final int GROUPS_MAX = conf.getInt(COUNTER_GROUPS_MAX_KEY,
+ COUNTER_GROUPS_MAX_DEFAULT);
+ static final int COUNTERS_MAX = conf.getInt(COUNTERS_MAX_KEY,
+ COUNTERS_MAX_DEFAULT);
+
+ private int totalCounters;
+ private LimitExceededException firstViolation;
+
+ public static String filterName(String name, int maxLen) {
+ return name.length() > maxLen ? name.substring(0, maxLen - 1) : name;
+ }
+
+ public String filterCounterName(String name) {
+ return filterName(name, COUNTER_NAME_MAX);
+ }
+
+ public String filterGroupName(String name) {
+ return filterName(name, GROUP_NAME_MAX);
+ }
+
+ public synchronized void checkCounters(int size) {
+ if (firstViolation != null) {
+ throw new LimitExceededException(firstViolation);
+ }
+ if (size > COUNTERS_MAX) {
+ firstViolation = new LimitExceededException("Too many counters: "+ size +
+ " max="+ COUNTERS_MAX);
+ throw firstViolation;
+ }
+ }
+
+ public synchronized void incrCounters() {
+ checkCounters(totalCounters + 1);
+ ++totalCounters;
+ }
+
+ public synchronized void checkGroups(int size) {
+ if (firstViolation != null) {
+ throw new LimitExceededException(firstViolation);
+ }
+ if (size > GROUPS_MAX) {
+ firstViolation = new LimitExceededException("Too many counter groups: "+
+ size +" max="+ GROUPS_MAX);
+ }
+ }
+
+ public synchronized LimitExceededException violation() {
+ return firstViolation;
+ }
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/package-info.java?rev=1079197&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/package-info.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/counters/package-info.java Tue Mar 8 05:54:43 2011
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the implementations of different types of
+ * map-reduce counters.
+ *
+ * cf. MAPREDUCE-901 for rationales.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.mapreduce.counters;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java Tue Mar 8 05:54:43 2011
@@ -22,18 +22,15 @@ import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.EOFException;
-import java.io.StringBufferInputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.avro.Schema;
-import org.apache.avro.AvroRuntimeException;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.DatumReader;
@@ -171,13 +168,10 @@ public class EventReader implements Clos
Counters result = new Counters();
for (JhCounterGroup g : counters.groups) {
CounterGroup group =
- new CounterGroup(g.name.toString(), g.displayName.toString());
+ result.addGroup(g.name.toString(), g.displayName.toString());
for (JhCounter c : g.counts) {
- group.addCounter(new Counter(c.name.toString(),
- c.displayName.toString(),
- c.value));
+ group.addCounter(c.name.toString(), c.displayName.toString(), c.value);
}
- result.addGroup(group);
}
return result;
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Tue Mar 8 05:54:43 2011
@@ -111,8 +111,10 @@ public interface ClientProtocol extends
* Version 34: Modified submitJob to use Credentials instead of TokenStorage.
* Version 35: Added the method getQueueAdmins(queueName) as part of
* MAPREDUCE-1664.
+ * Version 36: More efficient serialization format for framework counters
+ * (MAPREDUCE-901)
*/
- public static final long versionID = 35L;
+ public static final long versionID = 36L;
/**
* Allocate a name for the job.
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/CountersStrings.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/CountersStrings.java?rev=1079197&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/CountersStrings.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/CountersStrings.java Tue Mar 8 05:54:43 2011
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.util;
+
+import java.text.ParseException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.counters.AbstractCounters;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * String conversion utilities for counters.
+ * Candidate for deprecation since we start to use JSON in 0.21+
+ */
+@InterfaceAudience.Private
+public class CountersStrings {
+ private static final char GROUP_OPEN = '{';
+ private static final char GROUP_CLOSE = '}';
+ private static final char COUNTER_OPEN = '[';
+ private static final char COUNTER_CLOSE = ']';
+ private static final char UNIT_OPEN = '(';
+ private static final char UNIT_CLOSE = ')';
+ private static char[] charsToEscape = {GROUP_OPEN, GROUP_CLOSE,
+ COUNTER_OPEN, COUNTER_CLOSE,
+ UNIT_OPEN, UNIT_CLOSE};
+ /**
+ * Make the pre 0.21 counter string (for e.g. old job history files)
+ * [(actual-name)(display-name)(value)]
+ * @param counter to stringify
+ * @return the stringified result
+ */
+ public static String toEscapedCompactString(Counter counter) {
+
+ // First up, obtain the strings that need escaping. This will help us
+ // determine the buffer length apriori.
+ String escapedName, escapedDispName;
+ long currentValue;
+ synchronized(counter) {
+ escapedName = escape(counter.getName());
+ escapedDispName = escape(counter.getDisplayName());
+ currentValue = counter.getValue();
+ }
+ int length = escapedName.length() + escapedDispName.length() + 4;
+
+
+ length += 8; // For the following delimiting characters
+ StringBuilder builder = new StringBuilder(length);
+ builder.append(COUNTER_OPEN);
+
+ // Add the counter name
+ builder.append(UNIT_OPEN);
+ builder.append(escapedName);
+ builder.append(UNIT_CLOSE);
+
+ // Add the display name
+ builder.append(UNIT_OPEN);
+ builder.append(escapedDispName);
+ builder.append(UNIT_CLOSE);
+
+ // Add the value
+ builder.append(UNIT_OPEN);
+ builder.append(currentValue);
+ builder.append(UNIT_CLOSE);
+
+ builder.append(COUNTER_CLOSE);
+
+ return builder.toString();
+ }
+
+ /**
+ * Make the 0.21 counter group string.
+ * format: {(actual-name)(display-name)(value)[][][]}
+ * where [] are compact strings for the counters within.
+ * @param <G> type of the group
+ * @param group to stringify
+ * @return the stringified result
+ */
+ public static <G extends CounterGroupBase<?>>
+ String toEscapedCompactString(G group) {
+ List<String> escapedStrs = Lists.newArrayList();
+ int length;
+ String escapedName, escapedDispName;
+ synchronized(group) {
+ // First up, obtain the strings that need escaping. This will help us
+ // determine the buffer length apriori.
+ escapedName = escape(group.getName());
+ escapedDispName = escape(group.getDisplayName());
+ int i = 0;
+ length = escapedName.length() + escapedDispName.length();
+ for (Counter counter : group) {
+ String escapedStr = toEscapedCompactString(counter);
+ escapedStrs.add(escapedStr);
+ length += escapedStr.length();
+ }
+ }
+ length += 6; // for all the delimiting characters below
+ StringBuilder builder = new StringBuilder(length);
+ builder.append(GROUP_OPEN); // group start
+
+ // Add the group name
+ builder.append(UNIT_OPEN);
+ builder.append(escapedName);
+ builder.append(UNIT_CLOSE);
+
+ // Add the display name
+ builder.append(UNIT_OPEN);
+ builder.append(escapedDispName);
+ builder.append(UNIT_CLOSE);
+
+ // write the value
+ for(String escaped : escapedStrs) {
+ builder.append(escaped);
+ }
+
+ builder.append(GROUP_CLOSE); // group end
+ return builder.toString();
+ }
+
+ /**
+ * Make the pre 0.21 counters string
+ * @param <C> type of the counter
+ * @param <G> type of the counter group
+ * @param <T> type of the counters object
+ * @param counters the object to stringify
+ * @return the string in the following format
+ * {(groupName)(group-displayName)[(counterName)(displayName)(value)]*}*
+ */
+ public static <C extends Counter, G extends CounterGroupBase<C>,
+ T extends AbstractCounters<C, G>>
+ String toEscapedCompactString(T counters) {
+ String[] groupsArray;
+ int length = 0;
+ synchronized(counters) {
+ groupsArray = new String[counters.countCounters()];
+ int i = 0;
+ // First up, obtain the escaped string for each group so that we can
+ // determine the buffer length apriori.
+ for (G group : counters) {
+ String escapedString = toEscapedCompactString(group);
+ groupsArray[i++] = escapedString;
+ length += escapedString.length();
+ }
+ }
+
+ // Now construct the buffer
+ StringBuilder builder = new StringBuilder(length);
+ for (String group : groupsArray) {
+ builder.append(group);
+ }
+ return builder.toString();
+ }
+
+ // Escapes all the delimiters for counters i.e {,[,(,),],}
+ private static String escape(String string) {
+ return StringUtils.escapeString(string, StringUtils.ESCAPE_CHAR,
+ charsToEscape);
+ }
+
+ // Unescapes all the delimiters for counters i.e {,[,(,),],}
+ private static String unescape(String string) {
+ return StringUtils.unEscapeString(string, StringUtils.ESCAPE_CHAR,
+ charsToEscape);
+ }
+
+ // Extracts a block (data enclosed within delimeters) ignoring escape
+ // sequences. Throws ParseException if an incomplete block is found else
+ // returns null.
+ private static String getBlock(String str, char open, char close,
+ IntWritable index) throws ParseException {
+ StringBuilder split = new StringBuilder();
+ int next = StringUtils.findNext(str, open, StringUtils.ESCAPE_CHAR,
+ index.get(), split);
+ split.setLength(0); // clear the buffer
+ if (next >= 0) {
+ ++next; // move over '('
+
+ next = StringUtils.findNext(str, close, StringUtils.ESCAPE_CHAR,
+ next, split);
+ if (next >= 0) {
+ ++next; // move over ')'
+ index.set(next);
+ return split.toString(); // found a block
+ } else {
+ throw new ParseException("Unexpected end of block", next);
+ }
+ }
+ return null; // found nothing
+ }
+
+ /**
+ * Parse a pre 0.21 counters string into a counter object.
+ * @param <C> type of the counter
+ * @param <G> type of the counter group
+ * @param <T> type of the counters object
+ * @param compactString to parse
+ * @param counters an empty counters object to hold the result
+ * @return the counters object holding the result
+ * @throws ParseException
+ */
+ @SuppressWarnings("deprecation")
+ public static <C extends Counter, G extends CounterGroupBase<C>,
+ T extends AbstractCounters<C, G>>
+ T parseEscapedCompactString(String compactString, T counters)
+ throws ParseException {
+ IntWritable index = new IntWritable(0);
+
+ // Get the group to work on
+ String groupString =
+ getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, index);
+
+ while (groupString != null) {
+ IntWritable groupIndex = new IntWritable(0);
+
+ // Get the actual name
+ String groupName =
+ getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex);
+ groupName = unescape(groupName);
+
+ // Get the display name
+ String groupDisplayName =
+ getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex);
+ groupDisplayName = unescape(groupDisplayName);
+
+ // Get the counters
+ G group = counters.getGroup(groupName);
+ group.setDisplayName(groupDisplayName);
+
+ String counterString =
+ getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE, groupIndex);
+
+ while (counterString != null) {
+ IntWritable counterIndex = new IntWritable(0);
+
+ // Get the actual name
+ String counterName =
+ getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex);
+ counterName = unescape(counterName);
+
+ // Get the display name
+ String counterDisplayName =
+ getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex);
+ counterDisplayName = unescape(counterDisplayName);
+
+ // Get the value
+ long value =
+ Long.parseLong(getBlock(counterString, UNIT_OPEN, UNIT_CLOSE,
+ counterIndex));
+
+ // Add the counter
+ Counter counter = group.findCounter(counterName);
+ counter.setDisplayName(counterDisplayName);
+ counter.increment(value);
+
+ // Get the next counter
+ counterString =
+ getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE, groupIndex);
+ }
+
+ groupString = getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, index);
+ }
+ return counters;
+ }
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/ResourceBundles.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/ResourceBundles.java?rev=1079197&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/ResourceBundles.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/ResourceBundles.java Tue Mar 8 05:54:43 2011
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.util;
+
+import java.util.ResourceBundle;
+import java.util.MissingResourceException;
+
+/**
+ * Helper class to handle resource bundles in a saner way
+ */
+public class ResourceBundles {
+
+ /**
+ * Get a resource bundle
+ * @param bundleName of the resource
+ * @return the resource bundle
+ * @throws MissingResourceException
+ */
+ public static ResourceBundle getBundle(String bundleName) {
+ return ResourceBundle.getBundle(bundleName.replace('$', '_'));
+ }
+
+ /**
+ * Get a resource given bundle name and key
+ * @param <T> type of the resource
+ * @param bundleName name of the resource bundle
+ * @param key to lookup the resource
+ * @param suffix for the key to lookup
+ * @param defaultValue of the resource
+ * @return the resource or the defaultValue
+ * @throws ClassCastException if the resource found doesn't match T
+ */
+ @SuppressWarnings("unchecked")
+ public static synchronized <T> T getValue(String bundleName, String key,
+ String suffix, T defaultValue) {
+ T value;
+ try {
+ ResourceBundle bundle = getBundle(bundleName);
+ value = (T) bundle.getObject(getLookupKey(key, suffix));
+ }
+ catch (Exception e) {
+ return defaultValue;
+ }
+ return value == null ? defaultValue : value;
+ }
+
+ private static String getLookupKey(String key, String suffix) {
+ if (suffix == null || suffix.isEmpty()) return key;
+ return key + suffix;
+ }
+
+ /**
+ * Get the counter group display name
+ * @param group the group name to lookup
+ * @param defaultValue of the group
+ * @return the group display name
+ */
+ public static String getCounterGroupName(String group, String defaultValue) {
+ return getValue(group, "CounterGroupName", "", defaultValue);
+ }
+
+ /**
+ * Get the counter display name
+ * @param group the counter group name for the counter
+ * @param counter the counter name to lookup
+ * @param defaultValue of the counter
+ * @return the counter display name
+ */
+ public static String getCounterName(String group, String counter,
+ String defaultValue) {
+ return getValue(group, counter, ".name", defaultValue);
+ }
+}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Tue Mar 8 05:54:43 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.TaskCounter;
@@ -244,12 +245,10 @@ public class TestMiniMRWithDFS extends T
result = launchWordCount(jobConf, inDir, outDir, input, 0, 1);
assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
Counters counters = result.job.getCounters();
- long hdfsRead =
- counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
- Task.getFileSystemCounterNames("hdfs")[0]).getCounter();
- long hdfsWrite =
- counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
- Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
+ long hdfsRead = counters.findCounter("HDFS",
+ FileSystemCounter.BYTES_READ).getValue();
+ long hdfsWrite = counters.findCounter("HDFS",
+ FileSystemCounter.BYTES_WRITTEN).getValue();
long rawSplitBytesRead =
counters.findCounter(TaskCounter.SPLIT_RAW_BYTES).getCounter();
assertEquals(result.output.length(), hdfsWrite);
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java Tue Mar 8 05:54:43 2011
@@ -279,7 +279,7 @@ public class TestSeveral extends TestCas
ByteArrayOutputStream out = new ByteArrayOutputStream();
int exitCode = TestJobClient.runTool(conf, new JobClient(),
new String[] { "-counter", jobId.toString(),
- "org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS" },
+ "org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS" },
out);
assertEquals(0, exitCode);
assertEquals(numReduces, Integer.parseInt(out.toString().trim()));
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/TestCounters.java?rev=1079197&r1=1079196&r2=1079197&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/TestCounters.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/TestCounters.java Tue Mar 8 05:54:43 2011
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.mapreduce;
-import java.io.IOException;
import java.util.Random;
import org.junit.Test;
@@ -39,7 +38,8 @@ public class TestCounters {
for (int i = 0; i < NUMBER_TESTS; i++) {
long initValue = rand.nextInt();
long expectedValue = initValue;
- Counter counter = new Counter("foo", "bar", expectedValue);
+ Counter counter = new Counters().findCounter("test", "foo");
+ counter.setValue(initValue);
assertEquals("Counter value is not initialized correctly",
expectedValue, counter.getValue());
for (int j = 0; j < NUMBER_INC; j++) {