You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:30:07 UTC
svn commit: r1077569 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/split/
test/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Mar 4 04:30:06 2011
New Revision: 1077569
URL: http://svn.apache.org/viewvc?rev=1077569&view=rev
Log:
commit 864ffc38bdc730e24656262c2168080d8a61f945
Author: Mahadev Konar <ma...@cdev6022.inktomisearch.com>
Date: Fri Jul 16 20:51:55 2010 +0000
MAPREDUCE-1943. From https://issues.apache.org/jira/secure/attachment/12449985/MAPREDUCE-1943-yahoo-hadoop-0.20S.patch (mahadev)
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestBlockLimits.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Counters.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplit.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Counters.java?rev=1077569&r1=1077568&r2=1077569&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Counters.java Fri Mar 4 04:30:06 2011
@@ -61,6 +61,17 @@ public class Counters implements Writabl
private static char[] charsToEscape = {GROUP_OPEN, GROUP_CLOSE,
COUNTER_OPEN, COUNTER_CLOSE,
UNIT_OPEN, UNIT_CLOSE};
+ /** limit on the size of the name of the group **/
+ private static final int GROUP_NAME_LIMIT = 128;
+ /** limit on the size of the counter name **/
+ private static final int COUNTER_NAME_LIMIT = 64;
+ /** the max number of counters **/
+ static final int MAX_COUNTER_LIMIT = 80;
+ /** the max groups allowed **/
+ static final int MAX_GROUP_LIMIT = 50;
+
+ /** the number of current counters**/
+ private int numCounters = 0;
//private static Log log = LogFactory.getLog("Counters.class");
@@ -141,7 +152,7 @@ public class Counters implements Writabl
* <p><code>Group</code>handles localization of the class name and the
* counter names.</p>
*/
- public static class Group implements Writable, Iterable<Counter> {
+ public class Group implements Writable, Iterable<Counter> {
private String groupName;
private String displayName;
private Map<String, Counter> subcounters = new HashMap<String, Counter>();
@@ -162,16 +173,7 @@ public class Counters implements Writabl
(bundle == null ? "nothing" : "bundle"));
}
}
-
- /**
- * Returns the specified resource bundle, or throws an exception.
- * @throws MissingResourceException if the bundle isn't found
- */
- private static ResourceBundle getResourceBundle(String enumClassName) {
- String bundleName = enumClassName.replace('$','_');
- return ResourceBundle.getBundle(bundleName);
- }
-
+
/**
* Returns raw name of the group. This is the name of the enum class
* for this group of counters.
@@ -298,13 +300,20 @@ public class Counters implements Writabl
* @return the counter
*/
public synchronized Counter getCounterForName(String name) {
- Counter result = subcounters.get(name);
+ String shortName = getShortName(name, COUNTER_NAME_LIMIT);
+ Counter result = subcounters.get(shortName);
if (result == null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Adding " + name);
+ LOG.debug("Adding " + shortName);
}
- result = new Counter(name, localize(name + ".name", name), 0L);
- subcounters.put(name, result);
+ numCounters = (numCounters == 0) ? Counters.this.size(): numCounters;
+ if (numCounters >= MAX_COUNTER_LIMIT) {
+ throw new RuntimeException("Exceeded the number of counters: "
+ + "Num = " + numCounters + " " + "Limit: " + MAX_COUNTER_LIMIT);
+ }
+ result = new Counter(shortName, localize(shortName + ".name", shortName), 0L);
+ subcounters.put(shortName, result);
+ numCounters++;
}
return result;
}
@@ -365,7 +374,16 @@ public class Counters implements Writabl
* typical usage.
*/
private Map<Enum, Counter> cache = new IdentityHashMap<Enum, Counter>();
-
+
+ /**
+ * Returns the specified resource bundle, or throws an exception.
+ * @throws MissingResourceException if the bundle isn't found
+ */
+ private static ResourceBundle getResourceBundle(String enumClassName) {
+ String bundleName = enumClassName.replace('$','_');
+ return ResourceBundle.getBundle(bundleName);
+ }
+
/**
* Returns the names of all counter classes.
* @return Set of counter names.
@@ -383,13 +401,20 @@ public class Counters implements Writabl
* with the specified name.
*/
public synchronized Group getGroup(String groupName) {
- Group result = counters.get(groupName);
+ String shortGroupName = getShortName(groupName, GROUP_NAME_LIMIT);
+ Group result = counters.get(shortGroupName);
if (result == null) {
- result = new Group(groupName);
- counters.put(groupName, result);
+ /** check if we have exceeded the max number on groups **/
+ if (counters.size() > MAX_GROUP_LIMIT) {
+ throw new RuntimeException("Number of groups in Counters" +
+ " exceeded: limit "
+ + MAX_GROUP_LIMIT);
+ }
+ result = new Group(shortGroupName);
+ counters.put(shortGroupName, result);
}
return result;
- }
+ }
/**
* Find the counter for the given enum. The same enum will always return the
@@ -401,8 +426,10 @@ public class Counters implements Writabl
Counter counter = cache.get(key);
if (counter == null) {
Group group = getGroup(key.getDeclaringClass().getName());
- counter = group.getCounterForName(key.toString());
- cache.put(key, counter);
+ if (group != null) {
+ counter = group.getCounterForName(key.toString());
+ if (counter != null) cache.put(key, counter);
+ }
}
return counter;
}
@@ -414,10 +441,11 @@ public class Counters implements Writabl
* @return the counter for that name
*/
public synchronized Counter findCounter(String group, String name) {
- return getGroup(group).getCounterForName(name);
+ Group retGroup = getGroup(group);
+ return (retGroup == null) ? null: retGroup.getCounterForName(name);
}
- /**
+ /**
* Find a counter by using strings
* @param group the name of the group
* @param id the id of the counter within the group (0 to N-1)
@@ -427,7 +455,8 @@ public class Counters implements Writabl
*/
@Deprecated
public synchronized Counter findCounter(String group, int id, String name) {
- return getGroup(group).getCounterForName(name);
+ Group retGroup = getGroup(group);
+ return (retGroup == null) ? null: retGroup.getCounterForName(name);
}
/**
@@ -448,7 +477,13 @@ public class Counters implements Writabl
* @param amount amount by which counter is to be incremented
*/
public synchronized void incrCounter(String group, String counter, long amount) {
- getGroup(group).getCounterForName(counter).increment(amount);
+ Group retGroup = getGroup(group);
+ if (retGroup != null) {
+ Counter retCounter = retGroup.getCounterForName(counter);
+ if (retCounter != null ) {
+ retCounter.increment(amount);
+ }
+ }
}
/**
@@ -456,7 +491,8 @@ public class Counters implements Writabl
* does not exist.
*/
public synchronized long getCounter(Enum key) {
- return findCounter(key).getValue();
+ Counter retCounter = findCounter(key);
+ return (retCounter == null) ? 0 : retCounter.getValue();
}
/**
@@ -467,9 +503,15 @@ public class Counters implements Writabl
public synchronized void incrAllCounters(Counters other) {
for (Group otherGroup: other) {
Group group = getGroup(otherGroup.getName());
+ if (group == null) {
+ continue;
+ }
group.displayName = otherGroup.displayName;
for (Counter otherCounter : otherGroup) {
Counter counter = group.getCounterForName(otherCounter.getName());
+ if (counter == null) {
+ continue;
+ }
counter.setDisplayName(otherCounter.getDisplayName());
counter.increment(otherCounter.getValue());
}
@@ -614,6 +656,18 @@ public class Counters implements Writabl
}
return builder.toString();
}
+
+ /**
+ * return the short name of a counter/group name
+ * truncates from beginning.
+ * @param name the name of a group or counter
+ * @param limit the limit of characters
+ * @return the short name
+ */
+ static String getShortName(String name, int limit) {
+ return (name.length() > limit ?
+ name.substring(name.length() - limit, name.length()): name);
+ }
// Extracts a block (data enclosed within delimeters) ignoring escape
// sequences. Throws ParseException if an incomplete block is found else
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077569&r1=1077568&r2=1077569&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar 4 04:30:06 2011
@@ -507,6 +507,8 @@ abstract public class Task implements Wr
private InputSplit split = null;
private Progress taskProgress;
private Thread pingThread = null;
+ private static final int PROGRESS_STATUS_LEN_LIMIT = 512;
+
/**
* flag that indicates whether progress update needs to be sent to parent.
* If true, it has been set. If false, it has been reset.
@@ -527,6 +529,12 @@ abstract public class Task implements Wr
return progressFlag.getAndSet(false);
}
public void setStatus(String status) {
+ //Check to see if the status string
+ // is too long and just concatenate it
+ // to progress limit characters.
+ if (status.length() > PROGRESS_STATUS_LEN_LIMIT) {
+ status = status.substring(0, PROGRESS_STATUS_LEN_LIMIT);
+ }
taskProgress.setStatus(status);
// indicate that progress update needs to be sent
setProgressFlag();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077569&r1=1077568&r2=1077569&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 04:30:06 2011
@@ -2505,6 +2505,19 @@ public class TaskTracker
return;
}
+ /** check for counter limits and fail the task in case limits are exceeded **/
+ if (taskStatus.getCounters().size() > Counters.MAX_COUNTER_LIMIT ||
+ taskStatus.getCounters().getGroupNames().size() > Counters.MAX_GROUP_LIMIT) {
+ LOG.warn("Killing task " + task.getTaskID() + " :Exceeded limit on counters.");
+ try {
+ reportDiagnosticInfo("Error: Exceeded counter limits of counter:"
+ + Counters.MAX_COUNTER_LIMIT + " Group:" + Counters.MAX_GROUP_LIMIT);
+ kill(true);
+ } catch(IOException ie) {
+ LOG.error("Error killing task " + task.getTaskID(), ie);
+ }
+ }
+
this.taskStatus.statusUpdate(taskStatus);
this.lastProgressReport = System.currentTimeMillis();
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplit.java?rev=1077569&r1=1077568&r2=1077569&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplit.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplit.java Fri Mar 4 04:30:06 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.Input
public class JobSplit {
static final int META_SPLIT_VERSION = 1;
static final byte[] META_SPLIT_FILE_HEADER;
+
static {
try {
META_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8");
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java?rev=1077569&r1=1077568&r2=1077569&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java Fri Mar 4 04:30:06 2011
@@ -32,7 +32,6 @@ import org.apache.hadoop.io.WritableUtil
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
@@ -44,6 +43,8 @@ public class JobSplitWriter {
private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
private static final byte[] SPLIT_FILE_HEADER;
+ private static final int MAX_BLOCK_LOCATIONS = 100;
+
static {
try {
SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8");
@@ -73,12 +74,12 @@ public class JobSplitWriter {
}
public static void createSplitFiles(Path jobSubmitDir,
- Configuration conf, FileSystem fs,
+ Configuration conf, FileSystem fs,
org.apache.hadoop.mapred.InputSplit[] splits)
throws IOException {
FSDataOutputStream out = createFile(fs,
JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
- SplitMetaInfo[] info = writeOldSplits(splits, out);
+ SplitMetaInfo[] info = writeOldSplits(splits, out, conf);
out.close();
writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
@@ -119,9 +120,15 @@ public class JobSplitWriter {
serializer.open(out);
serializer.serialize(split);
int currCount = out.size();
+ String[] locations = split.getLocations();
+ if (locations.length > MAX_BLOCK_LOCATIONS) {
+ throw new IOException("Max block location exceeded for split: "
+ + split + " splitsize: " + locations.length +
+ " maxsize: " + MAX_BLOCK_LOCATIONS);
+ }
info[i++] =
new JobSplit.SplitMetaInfo(
- split.getLocations(), offset,
+ locations, offset,
split.getLength());
offset += currCount - prevCount;
}
@@ -131,7 +138,7 @@ public class JobSplitWriter {
private static SplitMetaInfo[] writeOldSplits(
org.apache.hadoop.mapred.InputSplit[] splits,
- FSDataOutputStream out) throws IOException {
+ FSDataOutputStream out, Configuration conf) throws IOException {
SplitMetaInfo[] info = new SplitMetaInfo[splits.length];
if (splits.length != 0) {
int i = 0;
@@ -141,8 +148,14 @@ public class JobSplitWriter {
Text.writeString(out, split.getClass().getName());
split.write(out);
int currLen = out.size();
+ String[] locations = split.getLocations();
+ if (locations.length > MAX_BLOCK_LOCATIONS) {
+ throw new IOException("Max block location exceeded for split: "
+ + split + " splitsize: " + locations.length +
+ " maxsize: " + MAX_BLOCK_LOCATIONS);
+ }
info[i++] = new JobSplit.SplitMetaInfo(
- split.getLocations(), offset,
+ locations, offset,
split.getLength());
offset += currLen - prevLen;
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestBlockLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestBlockLimits.java?rev=1077569&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestBlockLimits.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestBlockLimits.java Fri Mar 4 04:30:06 2011
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A JUnit test to test limits on block locations
+ */
+public class TestBlockLimits extends TestCase {
+ private static String TEST_ROOT_DIR =
+ new File(System.getProperty("test.build.data","/tmp"))
+ .toURI().toString().replace(' ', '+');
+
+ public void testWithLimits()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ MiniMRCluster mr = null;
+ try {
+ mr = new MiniMRCluster(2, "file:///", 3);
+ runCustomFormat(mr);
+ } finally {
+ if (mr != null) { mr.shutdown(); }
+ }
+ }
+
+ private void runCustomFormat(MiniMRCluster mr) throws IOException {
+ JobConf job = mr.createJobConf();
+ FileSystem fileSys = FileSystem.get(job);
+ Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local");
+ Path outDir = new Path(testDir, "out");
+ System.out.println("testDir= " + testDir);
+ fileSys.delete(testDir, true);
+ job.setInputFormat(MyInputFormat.class);
+ job.setOutputFormat(MyOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ job.setMapperClass(MyMapper.class);
+ job.setReducerClass(MyReducer.class);
+ job.setNumMapTasks(100);
+ job.setNumReduceTasks(1);
+ job.set("non.std.out", outDir.toString());
+ try {
+ JobClient.runJob(job);
+ assertTrue(false);
+ } catch(IOException ie) {
+ System.out.println("Failed job " + StringUtils.stringifyException(ie));
+ } finally {
+ fileSys.delete(testDir, true);
+ }
+
+ }
+
+ static class MyMapper extends MapReduceBase
+ implements Mapper<WritableComparable, Writable,
+ WritableComparable, Writable> {
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector<WritableComparable, Writable> out,
+ Reporter reporter) throws IOException {
+ }
+ }
+
+ static class MyReducer extends MapReduceBase
+ implements Reducer<WritableComparable, Writable,
+ WritableComparable, Writable> {
+ public void reduce(WritableComparable key, Iterator<Writable> values,
+ OutputCollector<WritableComparable, Writable> output,
+ Reporter reporter) throws IOException {
+ }
+ }
+
+ private static class MyInputFormat
+ implements InputFormat<IntWritable, Text> {
+
+ private static class MySplit implements InputSplit {
+ int first;
+ int length;
+
+ public MySplit() { }
+
+ public MySplit(int first, int length) {
+ this.first = first;
+ this.length = length;
+ }
+
+ public String[] getLocations() {
+ return new String[200];
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, first);
+ WritableUtils.writeVInt(out, length);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ first = WritableUtils.readVInt(in);
+ length = WritableUtils.readVInt(in);
+ }
+ }
+
+ public InputSplit[] getSplits(JobConf job,
+ int numSplits) throws IOException {
+ return new MySplit[]{new MySplit(0, 1), new MySplit(1, 3),
+ new MySplit(4, 2)};
+ }
+
+ public RecordReader<IntWritable, Text> getRecordReader(InputSplit split,
+ JobConf job,
+ Reporter reporter)
+ throws IOException {
+ MySplit sp = (MySplit) split;
+ return null;
+ }
+
+ }
+
+
+ static class MyOutputFormat implements OutputFormat {
+ static class MyRecordWriter implements RecordWriter<Object, Object> {
+ private DataOutputStream out;
+
+ public MyRecordWriter(Path outputFile, JobConf job) throws IOException {
+ }
+
+ public void write(Object key, Object value) throws IOException {
+ return;
+ }
+
+ public void close(Reporter reporter) throws IOException {
+ }
+ }
+
+ public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
+ String name,
+ Progressable progress
+ ) throws IOException {
+ return new MyRecordWriter(new Path(job.get("non.std.out")), job);
+ }
+
+ public void checkOutputSpecs(FileSystem ignored,
+ JobConf job) throws IOException {
+ }
+ }
+
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java?rev=1077569&r1=1077568&r2=1077569&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java Fri Mar 4 04:30:06 2011
@@ -24,13 +24,17 @@ import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
+import java.util.Iterator;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.StringUtils;
+import org.hsqldb.lib.StringUtil;
public class TestUserDefinedCounters extends ClusterMapReduceTestCase {
@@ -44,8 +48,10 @@ public class TestUserDefinedCounters ext
output.collect(key, value);
reporter.incrCounter(EnumCounter.MAP_RECORDS, 1);
reporter.incrCounter("StringCounter", "MapRecords", 1);
+ for (int i =0; i < 50; i++) {
+ reporter.incrCounter("StringCounter", "countername" + i, 1);
+ }
}
-
}
public void testMapReduceJob() throws Exception {
@@ -94,12 +100,26 @@ public class TestUserDefinedCounters ext
reader.close();
assertEquals(4, counter);
}
-
+
assertEquals(4,
runningJob.getCounters().getCounter(EnumCounter.MAP_RECORDS));
+ Counters counters = runningJob.getCounters();
assertEquals(4,
runningJob.getCounters().getGroup("StringCounter")
.getCounter("MapRecords"));
+ assertTrue(counters.getGroupNames().size() <= 51);
+ int i = 0;
+ while (counters.size() < Counters.MAX_COUNTER_LIMIT) {
+ counters.incrCounter("IncrCounter", "limit " + i, 2);
+ i++;
+ }
+ try {
+ counters.incrCounter("IncrCountertest", "test", 2);
+ assertTrue(false);
+ } catch(RuntimeException re) {
+ System.out.println("Exceeded counter " +
+ StringUtils.stringifyException(re));
+ }
}
}