You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2009/10/18 12:00:40 UTC
svn commit: r826393 - in /hadoop/mapreduce/branches/branch-0.21: ./
src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/
src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/
Author: cdouglas
Date: Sun Oct 18 10:00:39 2009
New Revision: 826393
URL: http://svn.apache.org/viewvc?rev=826393&view=rev
Log:
MAPREDUCE-1061. Add unit test validating byte specifications for gridmix jobs.
Modified:
hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=826393&r1=826392&r2=826393&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Sun Oct 18 10:00:39 2009
@@ -752,3 +752,6 @@
MAPREDUCE-1104. Initialize RecoveryManager in JobTracker cstr called by
Mumak. (Hong Tang via cdouglas)
+
+ MAPREDUCE-1061. Add unit test validating byte specifications for gridmix
+ jobs. (cdouglas)
Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=826393&r1=826392&r2=826393&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Sun Oct 18 10:00:39 2009
@@ -200,10 +200,7 @@
}
// scan input dir contents
submitter.refreshFilePool();
- } catch (IOException e) {
- LOG.error("Startup failed", e);
- if (factory != null) factory.abort(); // abort pipeline
- } catch (InterruptedException e) {
+ } catch (Throwable e) {
LOG.error("Startup failed", e);
if (factory != null) factory.abort(); // abort pipeline
} finally {
@@ -214,8 +211,10 @@
if (factory != null) {
// wait for input exhaustion
factory.join();
- if (null != factory.error()) {
- throw factory.error();
+ final Throwable badTraceException = factory.error();
+ if (null != badTraceException) {
+ LOG.error("Error in trace", badTraceException);
+ throw new IOException("Error in trace", badTraceException);
}
// wait for pending tasks to be submitted
submitter.shutdown();
Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=826393&r1=826392&r2=826393&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Sun Oct 18 10:00:39 2009
@@ -530,6 +530,16 @@
}
}
}
+ @Override
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ while (written < outBytes) {
+ final int len = (int) Math.min(outBytes - written, val.getCapacity());
+ fillBytes(val, len);
+ context.write(NullWritable.get(), val);
+ written += len;
+ }
+ }
}
static class GridmixRecordReader
Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=826393&r1=826392&r2=826393&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Sun Oct 18 10:00:39 2009
@@ -112,13 +112,13 @@
public MockJob(Configuration conf) {
this(conf.getInt(MIN_BYTES_IN, 1 << 20),
- conf.getInt(VAR_BYTES_IN, 10 << 20),
+ conf.getInt(VAR_BYTES_IN, 5 << 20),
conf.getInt(MIN_BYTES_OUT, 1 << 20),
- conf.getInt(VAR_BYTES_OUT, 10 << 20),
+ conf.getInt(VAR_BYTES_OUT, 5 << 20),
conf.getInt(MIN_REC_SIZE , 100),
conf.getInt(VAR_REC_SIZE , 1 << 15),
- conf.getInt(MAX_MAPS, 6),
- conf.getInt(MAX_REDS, 4));
+ conf.getInt(MAX_MAPS, 5),
+ conf.getInt(MAX_REDS, 3));
}
public MockJob(int min_bytes_in, int var_bytes_in,
@@ -126,7 +126,7 @@
int min_rec_size, int var_rec_size,
int max_maps, int max_reds) {
final Random r = new Random();
- name = String.format("MOCKJOB%04d", seq.getAndIncrement());
+ name = String.format("MOCKJOB%05d", seq.getAndIncrement());
submitTime = timestamp.addAndGet(TimeUnit.MILLISECONDS.convert(
r.nextInt(10), TimeUnit.SECONDS));
int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=826393&r1=826392&r2=826393&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Sun Oct 18 10:00:39 2009
@@ -19,6 +19,9 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
@@ -30,12 +33,20 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import static org.apache.hadoop.mapreduce.TaskCounter.*;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -64,9 +75,13 @@
public static void initCluster() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(JTConfig.JT_RETIREJOBS, false);
+ conf.setInt(JTConfig.JT_RETIREJOB_CACHE_SIZE, 1000);
+ conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, true);
+ conf.setInt(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, 1);
dfsCluster = new MiniDFSCluster(conf, 3, true, null);
dfs = dfsCluster.getFileSystem();
- mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1);
+ mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1, null, null,
+ new JobConf(conf));
}
@AfterClass
@@ -81,6 +96,7 @@
static class TestMonitor extends JobMonitor {
+ static final long SLOPBYTES = 5 * 1024;
private final int expected;
private final BlockingQueue<Job> retiredJobs;
@@ -90,9 +106,142 @@
retiredJobs = new LinkedBlockingQueue<Job>();
}
- public void verify(ArrayList<JobStory> submitted) {
+ public void verify(ArrayList<JobStory> submitted) throws Exception {
final ArrayList<Job> succeeded = new ArrayList<Job>();
assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded));
+ final HashMap<String,JobStory> sub = new HashMap<String,JobStory>();
+ for (JobStory spec : submitted) {
+ sub.put(spec.getName(), spec);
+ }
+ for (Job job : succeeded) {
+ final String jobname = job.getJobName();
+ if ("GRIDMIX_GENDATA".equals(jobname)) {
+ final Path in = new Path("foo").makeQualified(dfs);
+ final Path out = new Path("/gridmix").makeQualified(dfs);
+ final ContentSummary generated = dfs.getContentSummary(in);
+ assertTrue("Mismatched data gen", // +/- 100k for logs
+ (GENDATA << 20) < generated.getLength() + GENSLOP ||
+ (GENDATA << 20) > generated.getLength() - GENSLOP);
+ FileStatus[] outstat = dfs.listStatus(out);
+ assertEquals("Mismatched job count", NJOBS, outstat.length);
+ continue;
+ }
+ final JobStory spec =
+ sub.get(job.getJobName().replace("GRIDMIX", "MOCKJOB"));
+ assertNotNull("No spec for " + job.getJobName(), spec);
+ assertNotNull("No counters for " + job.getJobName(), job.getCounters());
+
+ final int nMaps = spec.getNumberMaps();
+ final int nReds = spec.getNumberReduces();
+
+ System.out.println(jobname + ": " + nMaps + "/" + nReds);
+ final TaskReport[] mReports = job.getTaskReports(TaskType.MAP);
+ assertEquals("Mismatched map count", nMaps, mReports.length);
+ check(TaskType.MAP, job, spec, mReports,
+ 0, 1, nReds * SLOPBYTES, nReds + 1);
+
+ final TaskReport[] rReports = job.getTaskReports(TaskType.REDUCE);
+ assertEquals("Mismatched reduce count", nReds, rReports.length);
+ check(TaskType.REDUCE, job, spec, rReports,
+ nMaps * SLOPBYTES, nMaps + 1, 0, 1);
+ }
+ }
+
+ public void check(final TaskType type, Job job, JobStory spec,
+ final TaskReport[] runTasks,
+ long extraInputBytes, int extraInputRecords,
+ long extraOutputBytes, int extraOutputRecords) throws Exception {
+
+ long[] runInputRecords = new long[runTasks.length];
+ long[] runInputBytes = new long[runTasks.length];
+ long[] runOutputRecords = new long[runTasks.length];
+ long[] runOutputBytes = new long[runTasks.length];
+ long[] specInputRecords = new long[runTasks.length];
+ long[] specInputBytes = new long[runTasks.length];
+ long[] specOutputRecords = new long[runTasks.length];
+ long[] specOutputBytes = new long[runTasks.length];
+
+ for (int i = 0; i < runTasks.length; ++i) {
+ final TaskInfo specInfo;
+ final Counters counters = runTasks[i].getTaskCounters();
+ switch (type) {
+ case MAP:
+ runInputBytes[i] = counters.findCounter("FileSystemCounters",
+ "HDFS_BYTES_READ").getValue();
+ runInputRecords[i] =
+ (int)counters.findCounter(MAP_INPUT_RECORDS).getValue();
+ runOutputBytes[i] =
+ counters.findCounter(MAP_OUTPUT_BYTES).getValue();
+ runOutputRecords[i] =
+ (int)counters.findCounter(MAP_OUTPUT_RECORDS).getValue();
+
+ specInfo = spec.getTaskInfo(TaskType.MAP, i);
+ break;
+ case REDUCE:
+ runInputBytes[i] =
+ counters.findCounter(REDUCE_SHUFFLE_BYTES).getValue();
+ runInputRecords[i] =
+ (int)counters.findCounter(REDUCE_INPUT_RECORDS).getValue();
+ runOutputBytes[i] =
+ counters.findCounter("FileSystemCounters",
+ "HDFS_BYTES_WRITTEN").getValue();
+ runOutputRecords[i] =
+ (int)counters.findCounter(REDUCE_OUTPUT_RECORDS).getValue();
+
+ specInfo = spec.getTaskInfo(TaskType.REDUCE, i);
+ break;
+ default:
+ specInfo = null;
+ fail("Unexpected type: " + type);
+ }
+ specInputBytes[i] = specInfo.getInputBytes();
+ specInputRecords[i] = specInfo.getInputRecords();
+ specOutputRecords[i] = specInfo.getOutputRecords();
+ specOutputBytes[i] = specInfo.getOutputBytes();
+ System.out.printf(type + " SPEC: %9d -> %9d :: %5d -> %5d\n",
+ specInputBytes[i], specOutputBytes[i],
+ specInputRecords[i], specOutputRecords[i]);
+ System.out.printf(type + " RUN: %9d -> %9d :: %5d -> %5d\n",
+ runInputBytes[i], runOutputBytes[i],
+ runInputRecords[i], runOutputRecords[i]);
+ }
+
+ // Check input bytes
+ Arrays.sort(specInputBytes);
+ Arrays.sort(runInputBytes);
+ for (int i = 0; i < runTasks.length; ++i) {
+ assertTrue("Mismatched input bytes " +
+ specInputBytes[i] + "/" + runInputBytes[i],
+ runInputBytes[i] - specInputBytes[i] <= extraInputBytes);
+ }
+
+ // Check input records
+ Arrays.sort(specInputRecords);
+ Arrays.sort(runInputRecords);
+ for (int i = 0; i < runTasks.length; ++i) {
+ assertTrue("Mismatched input records " +
+ specInputRecords[i] + "/" + runInputRecords[i],
+ runInputRecords[i] - specInputRecords[i] <= extraInputRecords);
+ }
+
+ // Check output bytes
+ Arrays.sort(specOutputBytes);
+ Arrays.sort(runOutputBytes);
+ for (int i = 0; i < runTasks.length; ++i) {
+ assertTrue("Mismatched output bytes " +
+ specOutputBytes[i] + "/" + runOutputBytes[i],
+ runOutputBytes[i] - specOutputBytes[i] <= extraOutputBytes);
+ }
+
+ // Check output records
+ Arrays.sort(specOutputRecords);
+ Arrays.sort(runOutputRecords);
+ for (int i = 0; i < runTasks.length; ++i) {
+ assertTrue("Mismatched output records " +
+ specOutputRecords[i] + "/" + runOutputRecords[i],
+ runOutputRecords[i] - specOutputRecords[i] <= extraOutputRecords);
+ }
+
}
@Override
@@ -110,7 +259,7 @@
private DebugJobFactory factory;
private TestMonitor monitor;
- public void checkMonitor() {
+ public void checkMonitor() throws Exception {
monitor.verify(factory.getSubmitted());
}
@@ -146,12 +295,6 @@
int res = ToolRunner.run(conf, client, argv);
assertEquals("Client exited with nonzero status", 0, res);
client.checkMonitor();
- final ContentSummary generated = dfs.getContentSummary(in);
- assertTrue("Mismatched data gen", // +/- 100k for logs
- (GENDATA << 20) < generated.getLength() + GENSLOP ||
- (GENDATA << 20) > generated.getLength() - GENSLOP);
- FileStatus[] outstat = dfs.listStatus(out);
- assertEquals("Mismatched job count", NJOBS, outstat.length);
}
}