You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:08:32 UTC
svn commit: r1077373 - in
/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src:
java/org/apache/hadoop/mapred/gridmix/ test/org/apache/hadoop/mapred/gridmix/
Author: omalley
Date: Fri Mar 4 04:08:31 2011
New Revision: 1077373
URL: http://svn.apache.org/viewvc?rev=1077373&view=rev
Log:
commit c07c1e16aaf2316716bc0439f9bf52455c8f0cfe
Author: Rahul Kumar Singh <rk...@yahoo-inc.com>
Date: Wed Apr 7 10:56:55 2010 +0530
MAPREDUCE:1594 from https://issues.apache.org/jira/secure/attachment/12440983/1594-yhadoop-20-1xx-1-5.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1594. Support for SleepJobs in Gridmix (rksingh)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java
Removed:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java Fri Mar 4 04:08:31 2011
@@ -98,7 +98,7 @@ class GenerateData extends GridmixJob {
public Job call() throws IOException, InterruptedException,
ClassNotFoundException {
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
- job = ugi.doAs( new PrivilegedExceptionAction <Job>() {
+ ugi.doAs( new PrivilegedExceptionAction <Job>() {
public Job run() throws IOException, ClassNotFoundException,
InterruptedException {
job.setMapperClass(GenDataMapper.class);
@@ -108,13 +108,13 @@ class GenerateData extends GridmixJob {
job.setInputFormatClass(GenDataFormat.class);
job.setOutputFormatClass(RawBytesOutputFormat.class);
job.setJarByClass(GenerateData.class);
- try {
- FileInputFormat.addInputPath(job, new Path("ignored"));
- } catch (IOException e) {
- LOG.error("Error while adding input path ",e);
- }
- job.submit();
- return job;
+ try {
+ FileInputFormat.addInputPath(job, new Path("ignored"));
+ } catch (IOException e) {
+ LOG.error("Error while adding input path ", e);
+ }
+ job.submit();
+ return job;
}
});
return job;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Fri Mar 4 04:08:31 2011
@@ -161,9 +161,8 @@ public class Gridmix extends Configured
LOG.info(" Submission policy is " + policy.name());
statistics = new Statistics(conf, policy.getPollingInterval(), startFlag,userResolver);
monitor = createJobMonitor(statistics);
- int noOfSubmitterThreads = policy.name().equals(
- GridmixJobSubmissionPolicy.SERIAL.name()) ? 1 :
- Runtime.getRuntime().availableProcessors() + 1;
+ int noOfSubmitterThreads = (policy == GridmixJobSubmissionPolicy.SERIAL) ? 1
+ : Runtime.getRuntime().availableProcessors() + 1;
submitter = createJobSubmitter(
monitor, conf.getInt(
@@ -173,7 +172,7 @@ public class Gridmix extends Configured
factory = createJobFactory(
submitter, traceIn, scratchDir, conf, startFlag, userResolver);
- if (policy.name().equals(GridmixJobSubmissionPolicy.SERIAL.name())) {
+ if (policy==GridmixJobSubmissionPolicy.SERIAL) {
statistics.addJobStatsListeners(factory);
} else {
statistics.addClusterStatsObservers(factory);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Fri Mar 4 04:08:31 2011
@@ -18,44 +18,32 @@
package org.apache.hadoop.mapred.gridmix;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Formatter;
import java.util.List;
-import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.security.PrivilegedExceptionAction;
-import javax.security.auth.login.LoginException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;
-import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -63,7 +51,7 @@ import org.apache.commons.logging.LogFac
/**
* Synthetic job generated from a trace description.
*/
-class GridmixJob implements Callable<Job>, Delayed {
+abstract class GridmixJob implements Callable<Job>, Delayed {
public static final String JOBNAME = "GRIDMIX";
public static final String ORIGNAME = "gridmix.job.name.original";
@@ -79,46 +67,41 @@ class GridmixJob implements Callable<Job
}
};
- private final int seq;
- private final Path outdir;
- protected Job job;
- private final JobStory jobdesc;
- private final UserGroupInformation ugi;
- private final long submissionTimeNanos;
+ protected final int seq;
+ protected final Path outdir;
+ protected final Job job;
+ protected final JobStory jobdesc;
+ protected final UserGroupInformation ugi;
+ protected final long submissionTimeNanos;
+ private static final ConcurrentHashMap<Integer,List<InputSplit>> descCache =
+ new ConcurrentHashMap<Integer,List<InputSplit>>();
public GridmixJob(
final Configuration conf, long submissionMillis, final JobStory jobdesc,
Path outRoot, UserGroupInformation ugi, final int seq) throws IOException {
this.ugi = ugi;
+ this.jobdesc = jobdesc;
+ this.seq = seq;
+
((StringBuilder)nameFormat.get().out()).setLength(JOBNAME.length());
try {
job = this.ugi.doAs(new PrivilegedExceptionAction<Job>() {
- public Job run(){
- try {
- return new Job(
- conf, nameFormat.get().format(
- "%05d", seq).toString());
- } catch (IOException e) {
- LOG.error(" Could not run job submitted " + jobdesc.getName());
- return null;
- }
+ public Job run() throws IOException {
+ Job ret = new Job(conf, nameFormat.get().format("%05d", seq)
+ .toString());
+ ret.getConfiguration().setInt("gridmix.job.seq", seq);
+ ret.getConfiguration().set(ORIGNAME,
+ null == jobdesc.getJobID() ? "<unknown>" : jobdesc.getJobID()
+ .toString());
+ return ret;
}
});
} catch (InterruptedException e) {
throw new IOException(e);
- } catch (IOException e) {
- throw e;
}
- if(job == null) {
- throw new IOException(
- " Could not create Job instance for job " + jobdesc.getName());
- }
-
submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
submissionMillis, TimeUnit.MILLISECONDS);
- this.jobdesc = jobdesc;
- this.seq = seq;
outdir = new Path(outRoot, "" + seq);
}
@@ -134,17 +117,14 @@ class GridmixJob implements Callable<Job
try {
job = this.ugi.doAs(new PrivilegedExceptionAction<Job>() {
- public Job run(){
- try {
- return new Job(conf,name);
- } catch (IOException e) {
- LOG.error(" Could not run job submitted " + name);
- return null;
- }
- }
- });
+ public Job run() throws IOException {
+ Job ret = new Job(conf, name);
+ ret.getConfiguration().setInt("gridmix.job.seq", seq);
+ return ret;
+ }
+ });
} catch (InterruptedException e) {
- LOG.error(" Error while creating new job " , e);
+ throw new IOException(e);
}
}
@@ -161,6 +141,41 @@ class GridmixJob implements Callable<Job
TimeUnit.NANOSECONDS);
}
+ int id() {
+ return seq;
+ }
+
+ Job getJob() {
+ return job;
+ }
+
+ JobStory getJobDesc() {
+ return jobdesc;
+ }
+
+ static void pushDescription(int seq, List<InputSplit> splits) {
+ if (null != descCache.putIfAbsent(seq, splits)) {
+ throw new IllegalArgumentException("Description exists for id " + seq);
+ }
+ }
+
+ static List<InputSplit> pullDescription(JobContext jobCtxt) {
+ return pullDescription(jobCtxt.getConfiguration().getInt(
+ "gridmix.job.seq", -1));
+ }
+
+ static List<InputSplit> pullDescription(int seq) {
+ return descCache.remove(seq);
+ }
+
+ static void clearAll() {
+ descCache.clear();
+ }
+
+ void buildSplits(FilePool inputDir) throws IOException {
+
+ }
+
@Override
public int compareTo(Delayed other) {
if (this == other) {
@@ -181,6 +196,7 @@ class GridmixJob implements Callable<Job
return 0 == diff ? 0 : (diff > 0 ? 1 : -1);
}
+
@Override
public boolean equals(Object other) {
if (this == other) {
@@ -195,66 +211,6 @@ class GridmixJob implements Callable<Job
return id();
}
- int id() {
- return seq;
- }
-
- Job getJob() {
- return job;
- }
-
- JobStory getJobDesc() {
- return jobdesc;
- }
-
- public Job call() throws IOException, InterruptedException,
- ClassNotFoundException {
- job = ugi.doAs(
- new PrivilegedExceptionAction<Job>() {
- public Job run() {
- job.setMapperClass(GridmixMapper.class);
- job.setReducerClass(GridmixReducer.class);
- job.setNumReduceTasks(jobdesc.getNumberReduces());
- job.setMapOutputKeyClass(GridmixKey.class);
- job.setMapOutputValueClass(GridmixRecord.class);
- job.setSortComparatorClass(GridmixKey.Comparator.class);
- job.setGroupingComparatorClass(SpecGroupingComparator.class);
- job.setInputFormatClass(GridmixInputFormat.class);
- job.setOutputFormatClass(RawBytesOutputFormat.class);
- job.setPartitionerClass(DraftPartitioner.class);
- job.setJarByClass(GridmixJob.class);
- job.getConfiguration().setInt("gridmix.job.seq", seq);
- job.getConfiguration().set(
- ORIGNAME, null == jobdesc.getJobID() ? "<unknown>" :
- jobdesc.getJobID().toString());
- job.getConfiguration().setBoolean(
- "mapred.used.genericoptionsparser", true);
- try {
- FileInputFormat.addInputPath(job, new Path("ignored"));
- } catch (IOException e) {
- LOG.error(" Exception while addingInpuPath job " , e);
- return null;
- }
- FileOutputFormat.setOutputPath(job, outdir);
- try {
- job.submit();
- } catch (IOException e) {
- LOG.error(" Exception while submitting job " , e);
- return null;
- } catch (InterruptedException e) {
- LOG.error(" Exception while submitting job " , e);
- return null;
- } catch (ClassNotFoundException e) {
- LOG.error(" Exception while submitting job " , e);
- return null;
- }
- return job;
- }
- });
-
- return job;
- }
-
public static class DraftPartitioner<V> extends Partitioner<GridmixKey,V> {
public int getPartition(GridmixKey key, V value, int numReduceTasks) {
return key.getPartition();
@@ -304,204 +260,6 @@ class GridmixJob implements Callable<Job
}
}
- public static class GridmixMapper
- extends Mapper<NullWritable,GridmixRecord,GridmixKey,GridmixRecord> {
-
- private double acc;
- private double ratio;
- private final ArrayList<RecordFactory> reduces =
- new ArrayList<RecordFactory>();
- private final Random r = new Random();
-
- private final GridmixKey key = new GridmixKey();
- private final GridmixRecord val = new GridmixRecord();
-
- @Override
- protected void setup(Context ctxt)
- throws IOException, InterruptedException {
- final Configuration conf = ctxt.getConfiguration();
- final GridmixSplit split = (GridmixSplit) ctxt.getInputSplit();
- final int maps = split.getMapCount();
- final long[] reduceBytes = split.getOutputBytes();
- final long[] reduceRecords = split.getOutputRecords();
-
- long totalRecords = 0L;
- final int nReduces = ctxt.getNumReduceTasks();
- if (nReduces > 0) {
- int idx = 0;
- int id = split.getId();
- for (int i = 0; i < nReduces; ++i) {
- final GridmixKey.Spec spec = new GridmixKey.Spec();
- if (i == id) {
- spec.bytes_out = split.getReduceBytes(idx);
- spec.rec_out = split.getReduceRecords(idx);
- ++idx;
- id += maps;
- }
- reduces.add(new IntermediateRecordFactory(
- new AvgRecordFactory(reduceBytes[i], reduceRecords[i], conf),
- i, reduceRecords[i], spec, conf));
- totalRecords += reduceRecords[i];
- }
- } else {
- reduces.add(new AvgRecordFactory(reduceBytes[0], reduceRecords[0],
- conf));
- totalRecords = reduceRecords[0];
- }
- final long splitRecords = split.getInputRecords();
- final long inputRecords = splitRecords <= 0 && split.getLength() >= 0
- ? Math.max(1,
- split.getLength() / conf.getInt("gridmix.missing.rec.size", 64*1024))
- : splitRecords;
- ratio = totalRecords / (1.0 * inputRecords);
- acc = 0.0;
- }
-
- @Override
- public void map(NullWritable ignored, GridmixRecord rec,
- Context context) throws IOException, InterruptedException {
- acc += ratio;
- while (acc >= 1.0 && !reduces.isEmpty()) {
- key.setSeed(r.nextLong());
- val.setSeed(r.nextLong());
- final int idx = r.nextInt(reduces.size());
- final RecordFactory f = reduces.get(idx);
- if (!f.next(key, val)) {
- reduces.remove(idx);
- continue;
- }
- context.write(key, val);
- acc -= 1.0;
- }
- }
-
- @Override
- public void cleanup(Context context)
- throws IOException, InterruptedException {
- for (RecordFactory factory : reduces) {
- key.setSeed(r.nextLong());
- while (factory.next(key, val)) {
- context.write(key, val);
- key.setSeed(r.nextLong());
- }
- }
- }
- }
-
- public static class GridmixReducer
- extends Reducer<GridmixKey,GridmixRecord,NullWritable,GridmixRecord> {
-
- private final Random r = new Random();
- private final GridmixRecord val = new GridmixRecord();
-
- private double acc;
- private double ratio;
- private RecordFactory factory;
-
- @Override
- protected void setup(Context context)
- throws IOException, InterruptedException {
- if (!context.nextKey() ||
- context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
- throw new IOException("Missing reduce spec");
- }
- long outBytes = 0L;
- long outRecords = 0L;
- long inRecords = 0L;
- for (GridmixRecord ignored : context.getValues()) {
- final GridmixKey spec = context.getCurrentKey();
- inRecords += spec.getReduceInputRecords();
- outBytes += spec.getReduceOutputBytes();
- outRecords += spec.getReduceOutputRecords();
- }
- if (0 == outRecords && inRecords > 0) {
- LOG.info("Spec output bytes w/o records. Using input record count");
- outRecords = inRecords;
- }
- factory =
- new AvgRecordFactory(outBytes, outRecords, context.getConfiguration());
- ratio = outRecords / (1.0 * inRecords);
- acc = 0.0;
- }
- @Override
- protected void reduce(GridmixKey key, Iterable<GridmixRecord> values,
- Context context) throws IOException, InterruptedException {
- for (GridmixRecord ignored : values) {
- acc += ratio;
- while (acc >= 1.0 && factory.next(null, val)) {
- context.write(NullWritable.get(), val);
- acc -= 1.0;
- }
- }
- }
- @Override
- protected void cleanup(Context context)
- throws IOException, InterruptedException {
- val.setSeed(r.nextLong());
- while (factory.next(null, val)) {
- context.write(NullWritable.get(), val);
- val.setSeed(r.nextLong());
- }
- }
- }
-
- static class GridmixRecordReader
- extends RecordReader<NullWritable,GridmixRecord> {
-
- private RecordFactory factory;
- private final Random r = new Random();
- private final GridmixRecord val = new GridmixRecord();
-
- public GridmixRecordReader() { }
-
- @Override
- public void initialize(InputSplit genericSplit, TaskAttemptContext ctxt)
- throws IOException, InterruptedException {
- final GridmixSplit split = (GridmixSplit)genericSplit;
- final Configuration conf = ctxt.getConfiguration();
- factory = new ReadRecordFactory(split.getLength(),
- split.getInputRecords(), new FileQueue(split, conf), conf);
- }
-
- @Override
- public boolean nextKeyValue() throws IOException {
- val.setSeed(r.nextLong());
- return factory.next(null, val);
- }
- @Override
- public float getProgress() throws IOException {
- return factory.getProgress();
- }
- @Override
- public NullWritable getCurrentKey() {
- return NullWritable.get();
- }
- @Override
- public GridmixRecord getCurrentValue() {
- return val;
- }
- @Override
- public void close() throws IOException {
- factory.close();
- }
- }
-
- static class GridmixInputFormat
- extends InputFormat<NullWritable,GridmixRecord> {
-
- @Override
- public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
- return pullDescription(jobCtxt.getConfiguration().getInt(
- "gridmix.job.seq", -1));
- }
- @Override
- public RecordReader<NullWritable,GridmixRecord> createRecordReader(
- InputSplit split, final TaskAttemptContext taskContext)
- throws IOException {
- return new GridmixRecordReader();
- }
- }
-
static class RawBytesOutputFormat<K>
extends FileOutputFormat<K,GridmixRecord> {
@@ -525,74 +283,4 @@ class GridmixJob implements Callable<Job
};
}
}
-
- // TODO replace with ThreadLocal submitter?
- private static final ConcurrentHashMap<Integer,List<InputSplit>> descCache =
- new ConcurrentHashMap<Integer,List<InputSplit>>();
-
- static void pushDescription(int seq, List<InputSplit> splits) {
- if (null != descCache.putIfAbsent(seq, splits)) {
- throw new IllegalArgumentException("Description exists for id " + seq);
- }
- }
-
- static List<InputSplit> pullDescription(int seq) {
- return descCache.remove(seq);
- }
-
- // not nesc when TL
- static void clearAll() {
- descCache.clear();
- }
-
- void buildSplits(FilePool inputDir) throws IOException {
- long mapInputBytesTotal = 0L;
- long mapOutputBytesTotal = 0L;
- long mapOutputRecordsTotal = 0L;
- final JobStory jobdesc = getJobDesc();
- if (null == jobdesc) {
- return;
- }
- final int maps = jobdesc.getNumberMaps();
- final int reds = jobdesc.getNumberReduces();
- for (int i = 0; i < maps; ++i) {
- final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
- mapInputBytesTotal += info.getInputBytes();
- mapOutputBytesTotal += info.getOutputBytes();
- mapOutputRecordsTotal += info.getOutputRecords();
- }
- final double[] reduceRecordRatio = new double[reds];
- final double[] reduceByteRatio = new double[reds];
- for (int i = 0; i < reds; ++i) {
- final TaskInfo info = jobdesc.getTaskInfo(TaskType.REDUCE, i);
- reduceByteRatio[i] = info.getInputBytes() / (1.0 * mapOutputBytesTotal);
- reduceRecordRatio[i] =
- info.getInputRecords() / (1.0 * mapOutputRecordsTotal);
- }
- final InputStriper striper = new InputStriper(inputDir, mapInputBytesTotal);
- final List<InputSplit> splits = new ArrayList<InputSplit>();
- for (int i = 0; i < maps; ++i) {
- final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
- final long[] specBytes = new long[nSpec];
- final long[] specRecords = new long[nSpec];
- for (int j = 0; j < nSpec; ++j) {
- final TaskInfo info =
- jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps);
- specBytes[j] = info.getOutputBytes();
- specRecords[j] = info.getOutputRecords();
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
- i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
- }
- }
- final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
- splits.add(new GridmixSplit(striper.splitFor(inputDir,
- info.getInputBytes(), 3), maps, i,
- info.getInputBytes(), info.getInputRecords(),
- info.getOutputBytes(), info.getOutputRecords(),
- reduceByteRatio, reduceRecordRatio, specBytes, specRecords));
- }
- pushDescription(id(), splits);
- }
-
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java?rev=1077373&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java Fri Mar 4 04:08:31 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.rumen.JobStory;
+
+import java.io.IOException;
+
+public enum JobCreator {
+
+ LOADJOB("LOADJOB") {
+ @Override
+ public GridmixJob createGridmixJob(
+ Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
+ UserGroupInformation ugi, int seq) throws IOException {
+ return new LoadJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
+ }},
+
+ SLEEPJOB("SLEEPJOB") {
+ @Override
+ public GridmixJob createGridmixJob(
+ Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
+ UserGroupInformation ugi, int seq) throws IOException {
+ return new SleepJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
+ }};
+
+ public static final String GRIDMIX_JOB_TYPE = "gridmix.job.type";
+
+
+ private final String name;
+
+ JobCreator(String name) {
+ this.name = name;
+ }
+
+ public abstract GridmixJob createGridmixJob(
+ final Configuration conf, long submissionMillis, final JobStory jobdesc,
+ Path outRoot, UserGroupInformation ugi, final int seq) throws IOException;
+
+ public static JobCreator getPolicy(
+ Configuration conf, JobCreator defaultPolicy) {
+ return conf.getEnum(GRIDMIX_JOB_TYPE, defaultPolicy);
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Fri Mar 4 04:08:31 2011
@@ -22,7 +22,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskType;
@@ -63,6 +62,7 @@ abstract class JobFactory<T> implements
protected volatile IOException error = null;
protected final JobStoryProducer jobProducer;
protected final ReentrantLock lock = new ReentrantLock(true);
+ protected final JobCreator jobCreator;
/**
* Creating a new instance does not start the thread.
@@ -104,6 +104,7 @@ abstract class JobFactory<T> implements
LOG.debug(" The submission thread name is " + rThread.getName());
}
this.userResolver = userResolver;
+ this.jobCreator = JobCreator.getPolicy(conf,JobCreator.LOADJOB);
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java?rev=1077373&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java Fri Mar 4 04:08:31 2011
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Synthetic job generated from a trace description.
+ */
+class LoadJob extends GridmixJob {
+
+ public static final Log LOG = LogFactory.getLog(LoadJob.class);
+
+ public LoadJob(
+ final Configuration conf, long submissionMillis, final JobStory jobdesc,
+ Path outRoot, UserGroupInformation ugi, final int seq) throws IOException {
+ super(conf,submissionMillis,jobdesc,outRoot,ugi,seq);
+ }
+
+ public Job call() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ ugi.doAs(
+ new PrivilegedExceptionAction<Job>() {
+ public Job run() throws IOException, ClassNotFoundException,
+ InterruptedException {
+ job.setMapperClass(LoadMapper.class);
+ job.setReducerClass(LoadReducer.class);
+ job.setNumReduceTasks(jobdesc.getNumberReduces());
+ job.setMapOutputKeyClass(GridmixKey.class);
+ job.setMapOutputValueClass(GridmixRecord.class);
+ job.setSortComparatorClass(GridmixKey.Comparator.class);
+ job.setGroupingComparatorClass(SpecGroupingComparator.class);
+ job.setInputFormatClass(LoadInputFormat.class);
+ job.setOutputFormatClass(RawBytesOutputFormat.class);
+ job.setPartitionerClass(DraftPartitioner.class);
+ job.setJarByClass(LoadJob.class);
+ job.getConfiguration().setBoolean(
+ "mapred.used.genericoptionsparser", true);
+ FileOutputFormat.setOutputPath(job, outdir);
+ job.submit();
+ return job;
+ }
+ });
+
+ return job;
+ }
+
+ public static class LoadMapper
+ extends Mapper<NullWritable,GridmixRecord,GridmixKey,GridmixRecord> {
+
+ private double acc;
+ private double ratio;
+ private final ArrayList<RecordFactory> reduces =
+ new ArrayList<RecordFactory>();
+ private final Random r = new Random();
+
+ private final GridmixKey key = new GridmixKey();
+ private final GridmixRecord val = new GridmixRecord();
+
+ @Override
+ protected void setup(Context ctxt)
+ throws IOException, InterruptedException {
+ final Configuration conf = ctxt.getConfiguration();
+ final LoadSplit split = (LoadSplit) ctxt.getInputSplit();
+ final int maps = split.getMapCount();
+ final long[] reduceBytes = split.getOutputBytes();
+ final long[] reduceRecords = split.getOutputRecords();
+
+ long totalRecords = 0L;
+ final int nReduces = ctxt.getNumReduceTasks();
+ if (nReduces > 0) {
+ int idx = 0;
+ int id = split.getId();
+ for (int i = 0; i < nReduces; ++i) {
+ final GridmixKey.Spec spec = new GridmixKey.Spec();
+ if (i == id) {
+ spec.bytes_out = split.getReduceBytes(idx);
+ spec.rec_out = split.getReduceRecords(idx);
+ ++idx;
+ id += maps;
+ }
+ reduces.add(new IntermediateRecordFactory(
+ new AvgRecordFactory(reduceBytes[i], reduceRecords[i], conf),
+ i, reduceRecords[i], spec, conf));
+ totalRecords += reduceRecords[i];
+ }
+ } else {
+ reduces.add(new AvgRecordFactory(reduceBytes[0], reduceRecords[0],
+ conf));
+ totalRecords = reduceRecords[0];
+ }
+ final long splitRecords = split.getInputRecords();
+ final long inputRecords = splitRecords <= 0 && split.getLength() >= 0
+ ? Math.max(1,
+ split.getLength() / conf.getInt("gridmix.missing.rec.size", 64*1024))
+ : splitRecords;
+ ratio = totalRecords / (1.0 * inputRecords);
+ acc = 0.0;
+ }
+
+ @Override
+ public void map(NullWritable ignored, GridmixRecord rec,
+ Context context) throws IOException, InterruptedException {
+ acc += ratio;
+ while (acc >= 1.0 && !reduces.isEmpty()) {
+ key.setSeed(r.nextLong());
+ val.setSeed(r.nextLong());
+ final int idx = r.nextInt(reduces.size());
+ final RecordFactory f = reduces.get(idx);
+ if (!f.next(key, val)) {
+ reduces.remove(idx);
+ continue;
+ }
+ context.write(key, val);
+ acc -= 1.0;
+ }
+ }
+
+ @Override
+ public void cleanup(Context context)
+ throws IOException, InterruptedException {
+ for (RecordFactory factory : reduces) {
+ key.setSeed(r.nextLong());
+ while (factory.next(key, val)) {
+ context.write(key, val);
+ key.setSeed(r.nextLong());
+ }
+ }
+ }
+ }
+
+ public static class LoadReducer
+ extends Reducer<GridmixKey,GridmixRecord,NullWritable,GridmixRecord> {
+
+ private final Random r = new Random();
+ private final GridmixRecord val = new GridmixRecord();
+
+ private double acc;
+ private double ratio;
+ private RecordFactory factory;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ if (!context.nextKey() ||
+ context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
+ throw new IOException("Missing reduce spec");
+ }
+ long outBytes = 0L;
+ long outRecords = 0L;
+ long inRecords = 0L;
+ for (GridmixRecord ignored : context.getValues()) {
+ final GridmixKey spec = context.getCurrentKey();
+ inRecords += spec.getReduceInputRecords();
+ outBytes += spec.getReduceOutputBytes();
+ outRecords += spec.getReduceOutputRecords();
+ }
+ if (0 == outRecords && inRecords > 0) {
+ LOG.info("Spec output bytes w/o records. Using input record count");
+ outRecords = inRecords;
+ }
+ factory =
+ new AvgRecordFactory(outBytes, outRecords, context.getConfiguration());
+ ratio = outRecords / (1.0 * inRecords);
+ acc = 0.0;
+ }
+ @Override
+ protected void reduce(GridmixKey key, Iterable<GridmixRecord> values,
+ Context context) throws IOException, InterruptedException {
+ for (GridmixRecord ignored : values) {
+ acc += ratio;
+ while (acc >= 1.0 && factory.next(null, val)) {
+ context.write(NullWritable.get(), val);
+ acc -= 1.0;
+ }
+ }
+ }
+ @Override
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ val.setSeed(r.nextLong());
+ while (factory.next(null, val)) {
+ context.write(NullWritable.get(), val);
+ val.setSeed(r.nextLong());
+ }
+ }
+ }
+
+ static class LoadRecordReader
+ extends RecordReader<NullWritable,GridmixRecord> {
+
+ private RecordFactory factory;
+ private final Random r = new Random();
+ private final GridmixRecord val = new GridmixRecord();
+
+ public LoadRecordReader() { }
+
+ @Override
+ public void initialize(InputSplit genericSplit, TaskAttemptContext ctxt)
+ throws IOException, InterruptedException {
+ final LoadSplit split = (LoadSplit)genericSplit;
+ final Configuration conf = ctxt.getConfiguration();
+ factory = new ReadRecordFactory(split.getLength(),
+ split.getInputRecords(), new FileQueue(split, conf), conf);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ val.setSeed(r.nextLong());
+ return factory.next(null, val);
+ }
+ @Override
+ public float getProgress() throws IOException {
+ return factory.getProgress();
+ }
+ @Override
+ public NullWritable getCurrentKey() {
+ return NullWritable.get();
+ }
+ @Override
+ public GridmixRecord getCurrentValue() {
+ return val;
+ }
+ @Override
+ public void close() throws IOException {
+ factory.close();
+ }
+ }
+
+ static class LoadInputFormat
+ extends InputFormat<NullWritable,GridmixRecord> {
+
+ @Override
+ public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+ return pullDescription(jobCtxt);
+ }
+ @Override
+ public RecordReader<NullWritable,GridmixRecord> createRecordReader(
+ InputSplit split, final TaskAttemptContext taskContext)
+ throws IOException {
+ return new LoadRecordReader();
+ }
+ }
+
+ @Override
+ void buildSplits(FilePool inputDir) throws IOException {
+ long mapInputBytesTotal = 0L;
+ long mapOutputBytesTotal = 0L;
+ long mapOutputRecordsTotal = 0L;
+ final JobStory jobdesc = getJobDesc();
+ if (null == jobdesc) {
+ return;
+ }
+ final int maps = jobdesc.getNumberMaps();
+ final int reds = jobdesc.getNumberReduces();
+ for (int i = 0; i < maps; ++i) {
+ final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
+ mapInputBytesTotal += info.getInputBytes();
+ mapOutputBytesTotal += info.getOutputBytes();
+ mapOutputRecordsTotal += info.getOutputRecords();
+ }
+ final double[] reduceRecordRatio = new double[reds];
+ final double[] reduceByteRatio = new double[reds];
+ for (int i = 0; i < reds; ++i) {
+ final TaskInfo info = jobdesc.getTaskInfo(TaskType.REDUCE, i);
+ reduceByteRatio[i] = info.getInputBytes() / (1.0 * mapOutputBytesTotal);
+ reduceRecordRatio[i] =
+ info.getInputRecords() / (1.0 * mapOutputRecordsTotal);
+ }
+ final InputStriper striper = new InputStriper(inputDir, mapInputBytesTotal);
+ final List<InputSplit> splits = new ArrayList<InputSplit>();
+ for (int i = 0; i < maps; ++i) {
+ final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
+ final long[] specBytes = new long[nSpec];
+ final long[] specRecords = new long[nSpec];
+ for (int j = 0; j < nSpec; ++j) {
+ final TaskInfo info =
+ jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps);
+ specBytes[j] = info.getOutputBytes();
+ specRecords[j] = info.getOutputRecords();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
+ i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
+ }
+ }
+ final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
+ splits.add(new LoadSplit(striper.splitFor(inputDir,
+ info.getInputBytes(), 3), maps, i,
+ info.getInputBytes(), info.getInputRecords(),
+ info.getOutputBytes(), info.getOutputRecords(),
+ reduceByteRatio, reduceRecordRatio, specBytes, specRecords));
+ }
+ pushDescription(id(), splits);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java?rev=1077373&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java Fri Mar 4 04:08:31 2011
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableUtils;
+
+class LoadSplit extends CombineFileSplit {
+ private int id;
+ private int nSpec;
+ private int maps;
+ private int reduces;
+ private long inputRecords;
+ private long outputBytes;
+ private long outputRecords;
+ private long maxMemory;
+ private double[] reduceBytes = new double[0];
+ private double[] reduceRecords = new double[0];
+
+ // Spec for reduces id mod this
+ private long[] reduceOutputBytes = new long[0];
+ private long[] reduceOutputRecords = new long[0];
+
+ LoadSplit() {
+ super();
+ }
+
+ public LoadSplit(CombineFileSplit cfsplit, int maps, int id,
+ long inputBytes, long inputRecords, long outputBytes,
+ long outputRecords, double[] reduceBytes, double[] reduceRecords,
+ long[] reduceOutputBytes, long[] reduceOutputRecords)
+ throws IOException {
+ super(cfsplit);
+ this.id = id;
+ this.maps = maps;
+ reduces = reduceBytes.length;
+ this.inputRecords = inputRecords;
+ this.outputBytes = outputBytes;
+ this.outputRecords = outputRecords;
+ this.reduceBytes = reduceBytes;
+ this.reduceRecords = reduceRecords;
+ nSpec = reduceOutputBytes.length;
+ this.reduceOutputBytes = reduceOutputBytes;
+ this.reduceOutputRecords = reduceOutputRecords;
+ }
+
+ public int getId() {
+ return id;
+ }
+ public int getMapCount() {
+ return maps;
+ }
+ public long getInputRecords() {
+ return inputRecords;
+ }
+ public long[] getOutputBytes() {
+ if (0 == reduces) {
+ return new long[] { outputBytes };
+ }
+ final long[] ret = new long[reduces];
+ for (int i = 0; i < reduces; ++i) {
+ ret[i] = Math.round(outputBytes * reduceBytes[i]);
+ }
+ return ret;
+ }
+ public long[] getOutputRecords() {
+ if (0 == reduces) {
+ return new long[] { outputRecords };
+ }
+ final long[] ret = new long[reduces];
+ for (int i = 0; i < reduces; ++i) {
+ ret[i] = Math.round(outputRecords * reduceRecords[i]);
+ }
+ return ret;
+ }
+ public long getReduceBytes(int i) {
+ return reduceOutputBytes[i];
+ }
+ public long getReduceRecords(int i) {
+ return reduceOutputRecords[i];
+ }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ WritableUtils.writeVInt(out, id);
+ WritableUtils.writeVInt(out, maps);
+ WritableUtils.writeVLong(out, inputRecords);
+ WritableUtils.writeVLong(out, outputBytes);
+ WritableUtils.writeVLong(out, outputRecords);
+ WritableUtils.writeVLong(out, maxMemory);
+ WritableUtils.writeVInt(out, reduces);
+ for (int i = 0; i < reduces; ++i) {
+ out.writeDouble(reduceBytes[i]);
+ out.writeDouble(reduceRecords[i]);
+ }
+ WritableUtils.writeVInt(out, nSpec);
+ for (int i = 0; i < nSpec; ++i) {
+ WritableUtils.writeVLong(out, reduceOutputBytes[i]);
+ WritableUtils.writeVLong(out, reduceOutputRecords[i]);
+ }
+ }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ id = WritableUtils.readVInt(in);
+ maps = WritableUtils.readVInt(in);
+ inputRecords = WritableUtils.readVLong(in);
+ outputBytes = WritableUtils.readVLong(in);
+ outputRecords = WritableUtils.readVLong(in);
+ maxMemory = WritableUtils.readVLong(in);
+ reduces = WritableUtils.readVInt(in);
+ if (reduceBytes.length < reduces) {
+ reduceBytes = new double[reduces];
+ reduceRecords = new double[reduces];
+ }
+ for (int i = 0; i < reduces; ++i) {
+ reduceBytes[i] = in.readDouble();
+ reduceRecords[i] = in.readDouble();
+ }
+ nSpec = WritableUtils.readVInt(in);
+ if (reduceOutputBytes.length < nSpec) {
+ reduceOutputBytes = new long[nSpec];
+ reduceOutputRecords = new long[nSpec];
+ }
+ for (int i = 0; i < nSpec; ++i) {
+ reduceOutputBytes[i] = WritableUtils.readVLong(in);
+ reduceOutputRecords[i] = WritableUtils.readVLong(in);
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java Fri Mar 4 04:08:31 2011
@@ -98,11 +98,12 @@ import java.util.concurrent.TimeUnit;
}
last = current;
submitter.add(
- new GridmixJob(
- conf, initTime + Math.round(rateFactor * (current - first)),
- job, scratch, userResolver.getTargetUgi(
- UserGroupInformation.createRemoteUser(job.getUser())),
- sequence.getAndIncrement()));
+ jobCreator.createGridmixJob(
+ conf, initTime + Math.round(
+ rateFactor * (current - first)), job, scratch,
+ userResolver.getTargetUgi(
+ UserGroupInformation.createRemoteUser(
+ job.getUser())), sequence.getAndIncrement()));
} catch (IOException e) {
error = e;
return;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java Fri Mar 4 04:08:31 2011
@@ -97,7 +97,7 @@ public class SerialJobFactory extends Jo
LOG.debug(
"Serial mode submitting job " + job.getName());
}
- prevJob = new GridmixJob(
+ prevJob = jobCreator.createGridmixJob(
conf, 0L, job, scratch, userResolver.getTargetUgi(
UserGroupInformation.createRemoteUser(job.getUser())),
sequence.getAndIncrement());
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java?rev=1077373&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java Fri Mar 4 04:08:31 2011
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class SleepJob extends GridmixJob {
+ public static final Log LOG = LogFactory.getLog(SleepJob.class);
+
+ /**
+ * Interval at which to report progress, in seconds.
+ */
+ public static final String GRIDMIX_SLEEP_INTERVAL = "gridmix.sleep.interval";
+
+ public SleepJob(
+ Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
+ UserGroupInformation ugi, int seq) throws IOException {
+ super(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
+ }
+
+ @Override
+ public Job call()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ ugi.doAs(
+ new PrivilegedExceptionAction<Job>() {
+ public Job run()
+ throws IOException, ClassNotFoundException, InterruptedException {
+ job.setMapperClass(SleepMapper.class);
+ job.setReducerClass(SleepReducer.class);
+ job.setNumReduceTasks(jobdesc.getNumberReduces());
+ job.setMapOutputKeyClass(GridmixKey.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setSortComparatorClass(GridmixKey.Comparator.class);
+ job.setGroupingComparatorClass(SpecGroupingComparator.class);
+ job.setInputFormatClass(SleepInputFormat.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setPartitionerClass(DraftPartitioner.class);
+ job.setJarByClass(SleepJob.class);
+ job.getConfiguration().setBoolean(
+ "mapred.used.genericoptionsparser", true);
+ job.submit();
+ return job;
+
+ }
+ });
+
+ return job;
+ }
+
+ public static class SleepMapper
+ extends Mapper<LongWritable, LongWritable, GridmixKey, NullWritable> {
+
+ @Override
+ public void map(LongWritable key, LongWritable value, Context context)
+ throws IOException, InterruptedException {
+ context.setStatus("Sleeping... " + value.get() + " ms left");
+ long now = System.currentTimeMillis();
+ if (now < key.get()) {
+ TimeUnit.MILLISECONDS.sleep(key.get() - now);
+ }
+ }
+
+ @Override
+ public void cleanup(Context context)
+ throws IOException, InterruptedException {
+ final int nReds = context.getNumReduceTasks();
+ if (nReds > 0) {
+ final SleepSplit split = (SleepSplit) context.getInputSplit();
+ int id = split.getId();
+ final int nMaps = split.getNumMaps();
+ //This is a hack to pass the sleep duration via Gridmix key
+ //TODO: We need to come up with better solution for this.
+
+ final GridmixKey key = new GridmixKey(GridmixKey.REDUCE_SPEC, 0, 0L);
+ for (int i = id, idx = 0; i < nReds; i += nMaps) {
+ key.setPartition(i);
+ key.setReduceOutputBytes(split.getReduceDurations(idx++));
+ id += nReds;
+ context.write(key, NullWritable.get());
+ }
+ }
+ }
+
+ }
+
+ public static class SleepReducer
+ extends Reducer<GridmixKey, NullWritable, NullWritable, NullWritable> {
+
+ private long duration = 0L;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ if (!context.nextKey() ||
+ context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
+ throw new IOException("Missing reduce spec");
+ }
+ for (NullWritable ignored : context.getValues()) {
+ final GridmixKey spec = context.getCurrentKey();
+ duration += spec.getReduceOutputBytes();
+ }
+ final long RINTERVAL = TimeUnit.MILLISECONDS.convert(
+ context.getConfiguration().getLong(
+ GRIDMIX_SLEEP_INTERVAL, 5), TimeUnit.SECONDS);
+ //This is to stop accumulating deviation from expected sleep time
+ //over a period of time.
+ long start = System.currentTimeMillis();
+ long slept = 0L;
+ long sleep = 0L;
+ while (slept < duration) {
+ final long rem = duration - slept;
+ sleep = Math.min(rem, RINTERVAL);
+ context.setStatus("Sleeping... " + rem + " ms left");
+ TimeUnit.MILLISECONDS.sleep(sleep);
+ slept = System.currentTimeMillis() - start;
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ final String msg = "Slept for " + duration;
+ LOG.info(msg);
+ context.setStatus(msg);
+ }
+ }
+
+ public static class SleepInputFormat
+ extends InputFormat<LongWritable, LongWritable> {
+
+ @Override
+ public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+ return pullDescription(jobCtxt);
+ }
+
+ @Override
+ public RecordReader<LongWritable, LongWritable> createRecordReader(
+ InputSplit split, final TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ final long duration = split.getLength();
+ final long RINTERVAL = TimeUnit.MILLISECONDS.convert(
+ context.getConfiguration().getLong(
+ GRIDMIX_SLEEP_INTERVAL, 5), TimeUnit.SECONDS);
+ if (RINTERVAL <= 0) {
+ throw new IOException(
+ "Invalid " + GRIDMIX_SLEEP_INTERVAL + ": " + RINTERVAL);
+ }
+ return new RecordReader<LongWritable, LongWritable>() {
+ long start = -1;
+ long slept = 0L;
+ long sleep = 0L;
+ final LongWritable key = new LongWritable();
+ final LongWritable val = new LongWritable();
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ if (start == -1) {
+ start = System.currentTimeMillis();
+ }
+ slept += sleep;
+ sleep = Math.min(duration - slept, RINTERVAL);
+ key.set(slept + sleep + start);
+ val.set(duration - slept);
+ return slept < duration;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return slept / ((float) duration);
+ }
+
+ @Override
+ public LongWritable getCurrentKey() {
+ return key;
+ }
+
+ @Override
+ public LongWritable getCurrentValue() {
+ return val;
+ }
+
+ @Override
+ public void close() throws IOException {
+ final String msg = "Slept for " + duration;
+ LOG.info(msg);
+ }
+
+ public void initialize(InputSplit split, TaskAttemptContext ctxt) {
+ }
+ };
+ }
+ }
+
+ public static class SleepSplit extends InputSplit implements Writable {
+ private int id;
+ private int nSpec;
+ private int nMaps;
+ private long sleepDuration;
+ private long[] reduceDurations = new long[0];
+ private String[] locations = new String[0];
+
+ public SleepSplit() {
+ }
+
+ public SleepSplit(
+ int id, long sleepDuration, long[] reduceDurations, int nMaps,
+ String[] locations) {
+ this.id = id;
+ this.sleepDuration = sleepDuration;
+ nSpec = reduceDurations.length;
+ this.reduceDurations = reduceDurations;
+ this.nMaps = nMaps;
+ this.locations = locations;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, id);
+ WritableUtils.writeVLong(out, sleepDuration);
+ WritableUtils.writeVInt(out, nMaps);
+ WritableUtils.writeVInt(out, nSpec);
+ for (int i = 0; i < nSpec; ++i) {
+ WritableUtils.writeVLong(out, reduceDurations[i]);
+ }
+ WritableUtils.writeVInt(out, locations.length);
+ for (int i = 0; i < locations.length; ++i) {
+ Text.writeString(out, locations[i]);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = WritableUtils.readVInt(in);
+ sleepDuration = WritableUtils.readVLong(in);
+ nMaps = WritableUtils.readVInt(in);
+ nSpec = WritableUtils.readVInt(in);
+ if (reduceDurations.length < nSpec) {
+ reduceDurations = new long[nSpec];
+ }
+ for (int i = 0; i < nSpec; ++i) {
+ reduceDurations[i] = WritableUtils.readVLong(in);
+ }
+ final int nLoc = WritableUtils.readVInt(in);
+ if (nLoc != locations.length) {
+ locations = new String[nLoc];
+ }
+ for (int i = 0; i < nLoc; ++i) {
+ locations[i] = Text.readString(in);
+ }
+ }
+
+ @Override
+ public long getLength() {
+ return sleepDuration;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public int getNumMaps() {
+ return nMaps;
+ }
+
+ public long getReduceDurations(int i) {
+ return reduceDurations[i];
+ }
+
+ @Override
+ public String[] getLocations() {
+ return locations;
+ }
+ }
+
+ private TaskAttemptInfo getSuccessfulAttemptInfo(TaskType type, int task) {
+ TaskAttemptInfo ret;
+ for (int i = 0; true; ++i) {
+ // Rumen should make up an attempt if it's missing. Or this won't work
+ // at all. It's hard to discern what is happening in there.
+ ret = jobdesc.getTaskAttemptInfo(type, task, i);
+ if (ret.getRunState() == TaskStatus.State.SUCCEEDED) {
+ break;
+ }
+ }
+ if(ret.getRunState() != TaskStatus.State.SUCCEEDED) {
+ LOG.warn("No sucessful attempts tasktype " + type +" task "+ task);
+ }
+
+ return ret;
+ }
+
+ @Override
+ void buildSplits(FilePool inputDir) throws IOException {
+ final List<InputSplit> splits = new ArrayList<InputSplit>();
+ final int reds = jobdesc.getNumberReduces();
+ final int maps = jobdesc.getNumberMaps();
+ for (int i = 0; i < maps; ++i) {
+ final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
+ final long[] redDurations = new long[nSpec];
+ for (int j = 0; j < nSpec; ++j) {
+ final ReduceTaskAttemptInfo info =
+ (ReduceTaskAttemptInfo) getSuccessfulAttemptInfo(
+ TaskType.REDUCE, i + j * maps);
+ // Include only merge/reduce time
+ redDurations[j] = info.getMergeRuntime() + info.getReduceRuntime();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ String.format(
+ "SPEC(%d) %d -> %d %d/%d", id(), i, i + j * maps, redDurations[j],
+ info.getRuntime()));
+ }
+ }
+ final TaskAttemptInfo info = getSuccessfulAttemptInfo(TaskType.MAP, i);
+ splits.add(
+ new SleepSplit(
+ i, info.getRuntime(), redDurations, maps, new String[0]));
+ }
+ pushDescription(id(), splits);
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java Fri Mar 4 04:08:31 2011
@@ -227,7 +227,8 @@ public class Statistics implements Compo
private void getJobReports(List<JobStatus> jobs) throws IOException {
for (final JobStatus job : jobs) {
- UserGroupInformation user = userResolver.getTargetUgi(
+
+ final UserGroupInformation user = userResolver.getTargetUgi(
UserGroupInformation.createRemoteUser(job.getUsername()));
try {
user.doAs(
@@ -241,7 +242,8 @@ public class Statistics implements Compo
org.apache.hadoop.mapred.JobID.downgrade(id)));
} catch (IOException e) {
LOG.error(
- " Couldnt get the MapTaskResports for "+ job.getJobId());
+ " Couldnt get the MapTaskResports for " + job.getJobId() +
+ " job username "+ job.getUsername() +" cause " + user);
}
}
return null;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java Fri Mar 4 04:08:31 2011
@@ -128,7 +128,7 @@ public class StressJobFactory extends Jo
i += job.getNumberMaps();
submitter.add(
- new GridmixJob(
+ jobCreator.createGridmixJob(
conf, 0L, job, scratch, userResolver.getTargetUgi(
UserGroupInformation.createRemoteUser(
job.getUser())), sequence.getAndIncrement()));
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Fri Mar 4 04:08:31 2011
@@ -39,13 +39,13 @@ class DebugJobFactory {
CountDownLatch startFlag,UserResolver resolver) throws IOException {
GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
conf, GridmixJobSubmissionPolicy.STRESS);
- if (policy.name().equalsIgnoreCase("REPLAY")) {
+ if (policy==GridmixJobSubmissionPolicy.REPLAY) {
return new DebugReplayJobFactory(
submitter, scratch, numJobs, conf, startFlag,resolver);
- } else if (policy.name().equalsIgnoreCase("STRESS")) {
+ } else if (policy==GridmixJobSubmissionPolicy.STRESS) {
return new DebugStressJobFactory(
submitter, scratch, numJobs, conf, startFlag,resolver);
- } else if (policy.name().equalsIgnoreCase("SERIAL")) {
+ } else if (policy==GridmixJobSubmissionPolicy.SERIAL) {
return new DebugSerialJobFactory(
submitter, scratch, numJobs, conf, startFlag,resolver);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java Fri Mar 4 04:08:31 2011
@@ -17,8 +17,11 @@
*/
package org.apache.hadoop.mapred.gridmix;
+import org.apache.hadoop.mapred.TaskStatus.State;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.MapTaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
@@ -257,6 +260,19 @@ public class DebugJobProducer implements
@Override
public TaskAttemptInfo getTaskAttemptInfo(
TaskType taskType, int taskNumber, int taskAttemptNumber) {
+ switch (taskType) {
+ case MAP:
+ return new MapTaskAttemptInfo(
+ State.SUCCEEDED, new TaskInfo(
+ m_bytesIn[taskNumber], m_recsIn[taskNumber],
+ m_bytesOut[taskNumber], m_recsOut[taskNumber], -1),100);
+
+ case REDUCE:
+ return new ReduceTaskAttemptInfo(
+ State.SUCCEEDED, new TaskInfo(
+ r_bytesIn[taskNumber], r_recsIn[taskNumber],
+ r_bytesOut[taskNumber], r_recsOut[taskNumber], -1),100,100,100);
+ }
throw new UnsupportedOperationException();
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Fri Mar 4 04:08:31 2011
@@ -17,19 +17,13 @@
*/
package org.apache.hadoop.mapred.gridmix;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.Counters;
@@ -43,19 +37,26 @@ import org.apache.hadoop.mapreduce.TaskT
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.util.ToolRunner;
-import static org.apache.hadoop.mapred.Task.Counter.*;
-
-import org.junit.Test;
-import org.junit.BeforeClass;
+import org.apache.log4j.Level;
import org.junit.AfterClass;
-import static org.junit.Assert.*;
+import org.junit.BeforeClass;
+import org.junit.Test;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.log4j.Level;
-import org.apache.hadoop.security.UserGroupInformation;
-import java.security.PrivilegedExceptionAction;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
+import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
+import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapred.Task.Counter.REDUCE_INPUT_RECORDS;
+import static org.apache.hadoop.mapred.Task.Counter.REDUCE_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapred.Task.Counter.REDUCE_SHUFFLE_BYTES;
+import static org.junit.Assert.*;
public class TestGridmixSubmission {
static GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.REPLAY;
@@ -105,7 +106,8 @@ public class TestGridmixSubmission {
for (JobStory spec : submitted) {
sub.put(spec.getName(), spec);
}
- final JobClient client = new JobClient(GridmixTestUtils.mrCluster.createJobConf());
+ final JobClient client = new JobClient(
+ GridmixTestUtils.mrCluster.createJobConf());
for (Job job : succeeded) {
final String jobname = job.getJobName();
if ("GRIDMIX_GENDATA".equals(jobname)) {
@@ -293,8 +295,6 @@ public class TestGridmixSubmission {
@Override
protected JobMonitor createJobMonitor(Statistics stats) {
- Configuration conf = new Configuration();
- conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy.name());
monitor = new TestMonitor(NJOBS + 1, stats);
return monitor;
}
@@ -350,7 +350,7 @@ public class TestGridmixSubmission {
};
DebugGridmix client = new DebugGridmix();
conf = new Configuration();
- conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy.name());
+ conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy);
conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
// GridmixTestUtils.createHomeAndStagingDirectory((JobConf)conf);
// allow synthetic users to create home directories
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java?rev=1077373&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java Fri Mar 4 04:08:31 2011
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.*;
+
+public class TestSleepJob {
+
+ public static final Log LOG = LogFactory.getLog(Gridmix.class);
+
+ {
+ ((Log4JLogger) LogFactory.getLog("org.apache.hadoop.mapred.gridmix"))
+ .getLogger().setLevel(Level.DEBUG);
+ }
+
+ static GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.REPLAY;
+ private static final int NJOBS = 2;
+ private static final long GENDATA = 50; // in megabytes
+
+
+ @BeforeClass
+ public static void init() throws IOException {
+ GridmixTestUtils.initCluster();
+ }
+
+ @AfterClass
+ public static void shutDown() throws IOException {
+ GridmixTestUtils.shutdownCluster();
+ }
+
+ static class TestMonitor extends JobMonitor {
+ private final BlockingQueue<Job> retiredJobs;
+ private final int expected;
+
+ public TestMonitor(int expected, Statistics stats) {
+ super(stats);
+ this.expected = expected;
+ retiredJobs = new LinkedBlockingQueue<Job>();
+ }
+
+ @Override
+ protected void onSuccess(Job job) {
+ System.out.println(" Job Sucess " + job);
+ retiredJobs.add(job);
+ }
+
+ @Override
+ protected void onFailure(Job job) {
+ fail("Job failure: " + job);
+ }
+
+ public void verify(ArrayList<JobStory> submitted) throws Exception {
+ assertEquals("Bad job count", expected, retiredJobs.size());
+ }
+ }
+
+
+ static class DebugGridmix extends Gridmix {
+
+ private JobFactory factory;
+ private TestMonitor monitor;
+
+ @Override
+ protected JobMonitor createJobMonitor(Statistics stats) {
+ monitor = new TestMonitor(NJOBS + 1, stats);
+ return monitor;
+ }
+
+ @Override
+ protected JobFactory createJobFactory(
+ JobSubmitter submitter, String traceIn, Path scratchDir,
+ Configuration conf, CountDownLatch startFlag, UserResolver userResolver)
+ throws IOException {
+ factory = DebugJobFactory.getFactory(
+ submitter, scratchDir, NJOBS, conf, startFlag, userResolver);
+ return factory;
+ }
+
+ public void checkMonitor() throws Exception {
+ monitor.verify(((DebugJobFactory.Debuggable) factory).getSubmitted());
+ }
+ }
+
+
+ @Test
+ public void testReplaySubmit() throws Exception {
+ policy = GridmixJobSubmissionPolicy.REPLAY;
+ System.out.println(" Replay started at " + System.currentTimeMillis());
+ doSubmission();
+ System.out.println(" Replay ended at " + System.currentTimeMillis());
+ }
+
+ @Test
+ public void testStressSubmit() throws Exception {
+ policy = GridmixJobSubmissionPolicy.STRESS;
+ System.out.println(" Stress started at " + System.currentTimeMillis());
+ doSubmission();
+ System.out.println(" Stress ended at " + System.currentTimeMillis());
+ }
+
+ @Test
+ public void testSerialSubmit() throws Exception {
+ policy = GridmixJobSubmissionPolicy.SERIAL;
+ System.out.println("Serial started at " + System.currentTimeMillis());
+ doSubmission();
+ System.out.println("Serial ended at " + System.currentTimeMillis());
+ }
+
+
+ private void doSubmission() throws Exception {
+ final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
+ final Path out = GridmixTestUtils.DEST.makeQualified(GridmixTestUtils.dfs);
+ final Path root = new Path("/user");
+ Configuration conf = null;
+ try {
+ final String[] argv = {"-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
+ "-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
+ "-D" + Gridmix.GRIDMIX_USR_RSV + "=" + EchoUserResolver.class.getName(),
+ "-D" + JobCreator.GRIDMIX_JOB_TYPE + "=" + JobCreator.SLEEPJOB.name(),
+ "-D" + SleepJob.GRIDMIX_SLEEP_INTERVAL +"=" +"10",
+ "-generate",String.valueOf(GENDATA) + "m", in.toString(), "-"
+ // ignored by DebugGridmix
+ };
+ DebugGridmix client = new DebugGridmix();
+ conf = new Configuration();
+ conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy);
+ conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
+// GridmixTestUtils.createHomeAndStagingDirectory((JobConf)conf);
+ // allow synthetic users to create home directories
+ GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short) 0777));
+ GridmixTestUtils.dfs.setPermission(root, new FsPermission((short) 0777));
+ int res = ToolRunner.run(conf, client, argv);
+ assertEquals("Client exited with nonzero status", 0, res);
+ client.checkMonitor();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ in.getFileSystem(conf).delete(in, true);
+ out.getFileSystem(conf).delete(out, true);
+ root.getFileSystem(conf).delete(root, true);
+ }
+ }
+
+}