You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:08:32 UTC

svn commit: r1077373 - in /hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src: java/org/apache/hadoop/mapred/gridmix/ test/org/apache/hadoop/mapred/gridmix/

Author: omalley
Date: Fri Mar  4 04:08:31 2011
New Revision: 1077373

URL: http://svn.apache.org/viewvc?rev=1077373&view=rev
Log:
commit c07c1e16aaf2316716bc0439f9bf52455c8f0cfe
Author: Rahul Kumar Singh <rk...@yahoo-inc.com>
Date:   Wed Apr 7 10:56:55 2010 +0530

    MAPREDUCE:1594 from https://issues.apache.org/jira/secure/attachment/12440983/1594-yhadoop-20-1xx-1-5.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1594. Support for SleepJobs in Gridmix (rksingh)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java
Removed:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java Fri Mar  4 04:08:31 2011
@@ -98,7 +98,7 @@ class GenerateData extends GridmixJob {
   public Job call() throws IOException, InterruptedException,
                            ClassNotFoundException {
     UserGroupInformation ugi = UserGroupInformation.getLoginUser();
-    job = ugi.doAs( new PrivilegedExceptionAction <Job>() {
+    ugi.doAs( new PrivilegedExceptionAction <Job>() {
        public Job run() throws IOException, ClassNotFoundException,
                                InterruptedException {
         job.setMapperClass(GenDataMapper.class);
@@ -108,13 +108,13 @@ class GenerateData extends GridmixJob {
         job.setInputFormatClass(GenDataFormat.class);
         job.setOutputFormatClass(RawBytesOutputFormat.class);
         job.setJarByClass(GenerateData.class);
-         try {
-           FileInputFormat.addInputPath(job, new Path("ignored"));
-         } catch (IOException e) {
-           LOG.error("Error  while adding input path ",e);
-         }
-         job.submit();
-         return job;
+        try {
+          FileInputFormat.addInputPath(job, new Path("ignored"));
+        } catch (IOException e) {
+          LOG.error("Error  while adding input path ", e);
+        }
+        job.submit();
+        return job;
       }
     });
     return job;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Fri Mar  4 04:08:31 2011
@@ -161,9 +161,8 @@ public class Gridmix extends Configured 
       LOG.info(" Submission policy is " + policy.name());
       statistics = new Statistics(conf, policy.getPollingInterval(), startFlag,userResolver);
       monitor = createJobMonitor(statistics);
-      int noOfSubmitterThreads = policy.name().equals(
-        GridmixJobSubmissionPolicy.SERIAL.name()) ? 1 :
-        Runtime.getRuntime().availableProcessors() + 1;
+      int noOfSubmitterThreads = (policy == GridmixJobSubmissionPolicy.SERIAL) ? 1
+          : Runtime.getRuntime().availableProcessors() + 1;
 
       submitter = createJobSubmitter(
         monitor, conf.getInt(
@@ -173,7 +172,7 @@ public class Gridmix extends Configured 
       
       factory = createJobFactory(
         submitter, traceIn, scratchDir, conf, startFlag, userResolver);
-      if (policy.name().equals(GridmixJobSubmissionPolicy.SERIAL.name())) {
+      if (policy==GridmixJobSubmissionPolicy.SERIAL) {
         statistics.addJobStatsListeners(factory);
       } else {
         statistics.addClusterStatsObservers(factory);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Fri Mar  4 04:08:31 2011
@@ -18,44 +18,32 @@
 package org.apache.hadoop.mapred.gridmix;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Formatter;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 import java.security.PrivilegedExceptionAction;
-import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.tools.rumen.JobStory;
-import org.apache.hadoop.tools.rumen.TaskInfo;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -63,7 +51,7 @@ import org.apache.commons.logging.LogFac
 /**
  * Synthetic job generated from a trace description.
  */
-class GridmixJob implements Callable<Job>, Delayed {
+abstract class GridmixJob implements Callable<Job>, Delayed {
 
   public static final String JOBNAME = "GRIDMIX";
   public static final String ORIGNAME = "gridmix.job.name.original";
@@ -79,46 +67,41 @@ class GridmixJob implements Callable<Job
       }
     };
 
-  private final int seq;
-  private final Path outdir;
-  protected Job job;
-  private final JobStory jobdesc;
-  private final UserGroupInformation ugi;
-  private final long submissionTimeNanos;
+  protected final int seq;
+  protected final Path outdir;
+  protected final Job job;
+  protected final JobStory jobdesc;
+  protected final UserGroupInformation ugi;
+  protected final long submissionTimeNanos;
+  private static final ConcurrentHashMap<Integer,List<InputSplit>> descCache =
+     new ConcurrentHashMap<Integer,List<InputSplit>>();
 
   public GridmixJob(
     final Configuration conf, long submissionMillis, final JobStory jobdesc,
     Path outRoot, UserGroupInformation ugi, final int seq) throws IOException {
     this.ugi = ugi;
+    this.jobdesc = jobdesc;
+    this.seq = seq;
+
     ((StringBuilder)nameFormat.get().out()).setLength(JOBNAME.length());
     try {
       job = this.ugi.doAs(new PrivilegedExceptionAction<Job>() {
-        public Job run(){
-          try {
-            return new Job(
-              conf, nameFormat.get().format(
-                "%05d", seq).toString());
-          } catch (IOException e) {
-            LOG.error(" Could not run job submitted " + jobdesc.getName());
-            return null;
-          }
+        public Job run() throws IOException {
+          Job ret = new Job(conf, nameFormat.get().format("%05d", seq)
+              .toString());
+          ret.getConfiguration().setInt("gridmix.job.seq", seq);
+          ret.getConfiguration().set(ORIGNAME,
+              null == jobdesc.getJobID() ? "<unknown>" : jobdesc.getJobID()
+                  .toString());
+          return ret;
         }
       });
     } catch (InterruptedException e) {
       throw new IOException(e);
-    } catch (IOException e) {
-      throw e;
     }
 
-    if(job == null) {
-      throw new IOException(
-        " Could not create Job instance for job " + jobdesc.getName());
-    }
-    
     submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
         submissionMillis, TimeUnit.MILLISECONDS);
-    this.jobdesc = jobdesc;
-    this.seq = seq;
     outdir = new Path(outRoot, "" + seq);
   }
 
@@ -134,17 +117,14 @@ class GridmixJob implements Callable<Job
 
     try {
       job = this.ugi.doAs(new PrivilegedExceptionAction<Job>() {
-          public Job run(){
-            try {
-              return new Job(conf,name);
-            } catch (IOException e) {
-              LOG.error(" Could not run job submitted " + name);
-              return null;
-            }
-          }
-        });
+        public Job run() throws IOException {
+          Job ret = new Job(conf, name);
+          ret.getConfiguration().setInt("gridmix.job.seq", seq);
+          return ret;
+        }
+      });
     } catch (InterruptedException e) {
-      LOG.error(" Error while creating new job " , e);
+      throw new IOException(e);
     }
   }
 
@@ -161,6 +141,41 @@ class GridmixJob implements Callable<Job
         TimeUnit.NANOSECONDS);
   }
 
+  int id() {
+    return seq;
+  }
+
+  Job getJob() {
+    return job;
+  }
+
+  JobStory getJobDesc() {
+    return jobdesc;
+  }
+
+  static void pushDescription(int seq, List<InputSplit> splits) {
+    if (null != descCache.putIfAbsent(seq, splits)) {
+      throw new IllegalArgumentException("Description exists for id " + seq);
+    }
+  }
+
+  static List<InputSplit> pullDescription(JobContext jobCtxt) {
+    return pullDescription(jobCtxt.getConfiguration().getInt(
+        "gridmix.job.seq", -1));
+  }
+  
+  static List<InputSplit> pullDescription(int seq) {
+    return descCache.remove(seq);
+  }
+
+  static void clearAll() {
+    descCache.clear();
+  }
+
+  void buildSplits(FilePool inputDir) throws IOException {
+
+  }
+
   @Override
   public int compareTo(Delayed other) {
     if (this == other) {
@@ -181,6 +196,7 @@ class GridmixJob implements Callable<Job
     return 0 == diff ? 0 : (diff > 0 ? 1 : -1);
   }
 
+
   @Override
   public boolean equals(Object other) {
     if (this == other) {
@@ -195,66 +211,6 @@ class GridmixJob implements Callable<Job
     return id();
   }
 
-  int id() {
-    return seq;
-  }
-
-  Job getJob() {
-    return job;
-  }
-
-  JobStory getJobDesc() {
-    return jobdesc;
-  }
-
-  public Job call() throws IOException, InterruptedException,
-                           ClassNotFoundException {
-    job = ugi.doAs(
-      new PrivilegedExceptionAction<Job>() {
-        public Job run() {
-          job.setMapperClass(GridmixMapper.class);
-          job.setReducerClass(GridmixReducer.class);
-          job.setNumReduceTasks(jobdesc.getNumberReduces());
-          job.setMapOutputKeyClass(GridmixKey.class);
-          job.setMapOutputValueClass(GridmixRecord.class);
-          job.setSortComparatorClass(GridmixKey.Comparator.class);
-          job.setGroupingComparatorClass(SpecGroupingComparator.class);
-          job.setInputFormatClass(GridmixInputFormat.class);
-          job.setOutputFormatClass(RawBytesOutputFormat.class);
-          job.setPartitionerClass(DraftPartitioner.class);
-          job.setJarByClass(GridmixJob.class);
-          job.getConfiguration().setInt("gridmix.job.seq", seq);
-          job.getConfiguration().set(
-            ORIGNAME, null == jobdesc.getJobID() ? "<unknown>" :
-              jobdesc.getJobID().toString());
-          job.getConfiguration().setBoolean(
-            "mapred.used.genericoptionsparser", true);
-          try {
-            FileInputFormat.addInputPath(job, new Path("ignored"));
-          } catch (IOException e) {
-            LOG.error(" Exception while addingInpuPath job " , e);
-            return null;
-          }
-          FileOutputFormat.setOutputPath(job, outdir);
-          try {
-            job.submit();
-          } catch (IOException e) {
-            LOG.error(" Exception while submitting job " , e);
-            return null;
-          } catch (InterruptedException e) {
-            LOG.error(" Exception while submitting job " , e);
-            return null;
-          } catch (ClassNotFoundException e) {
-            LOG.error(" Exception while submitting job " , e);
-            return null;
-          }
-          return job;
-        }
-      });
-
-    return job;
-  }
-
   public static class DraftPartitioner<V> extends Partitioner<GridmixKey,V> {
     public int getPartition(GridmixKey key, V value, int numReduceTasks) {
       return key.getPartition();
@@ -304,204 +260,6 @@ class GridmixJob implements Callable<Job
     }
   }
 
-  public static class GridmixMapper
-      extends Mapper<NullWritable,GridmixRecord,GridmixKey,GridmixRecord> {
-
-    private double acc;
-    private double ratio;
-    private final ArrayList<RecordFactory> reduces =
-      new ArrayList<RecordFactory>();
-    private final Random r = new Random();
-
-    private final GridmixKey key = new GridmixKey();
-    private final GridmixRecord val = new GridmixRecord();
-
-    @Override
-    protected void setup(Context ctxt)
-        throws IOException, InterruptedException {
-      final Configuration conf = ctxt.getConfiguration();
-      final GridmixSplit split = (GridmixSplit) ctxt.getInputSplit();
-      final int maps = split.getMapCount();
-      final long[] reduceBytes = split.getOutputBytes();
-      final long[] reduceRecords = split.getOutputRecords();
-
-      long totalRecords = 0L;
-      final int nReduces = ctxt.getNumReduceTasks();
-      if (nReduces > 0) {
-        int idx = 0;
-        int id = split.getId();
-        for (int i = 0; i < nReduces; ++i) {
-          final GridmixKey.Spec spec = new GridmixKey.Spec();
-          if (i == id) {
-            spec.bytes_out = split.getReduceBytes(idx);
-            spec.rec_out = split.getReduceRecords(idx);
-            ++idx;
-            id += maps;
-          }
-          reduces.add(new IntermediateRecordFactory(
-              new AvgRecordFactory(reduceBytes[i], reduceRecords[i], conf),
-              i, reduceRecords[i], spec, conf));
-          totalRecords += reduceRecords[i];
-        }
-      } else {
-        reduces.add(new AvgRecordFactory(reduceBytes[0], reduceRecords[0],
-              conf));
-        totalRecords = reduceRecords[0];
-      }
-      final long splitRecords = split.getInputRecords();
-      final long inputRecords = splitRecords <= 0 && split.getLength() >= 0
-        ? Math.max(1,
-          split.getLength() / conf.getInt("gridmix.missing.rec.size", 64*1024))
-        : splitRecords;
-      ratio = totalRecords / (1.0 * inputRecords);
-      acc = 0.0;
-    }
-
-    @Override
-    public void map(NullWritable ignored, GridmixRecord rec,
-        Context context) throws IOException, InterruptedException {
-      acc += ratio;
-      while (acc >= 1.0 && !reduces.isEmpty()) {
-        key.setSeed(r.nextLong());
-        val.setSeed(r.nextLong());
-        final int idx = r.nextInt(reduces.size());
-        final RecordFactory f = reduces.get(idx);
-        if (!f.next(key, val)) {
-          reduces.remove(idx);
-          continue;
-        }
-        context.write(key, val);
-        acc -= 1.0;
-      }
-    }
-
-    @Override
-    public void cleanup(Context context)
-        throws IOException, InterruptedException {
-      for (RecordFactory factory : reduces) {
-        key.setSeed(r.nextLong());
-        while (factory.next(key, val)) {
-          context.write(key, val);
-          key.setSeed(r.nextLong());
-        }
-      }
-    }
-  }
-
-  public static class GridmixReducer
-      extends Reducer<GridmixKey,GridmixRecord,NullWritable,GridmixRecord> {
-
-    private final Random r = new Random();
-    private final GridmixRecord val = new GridmixRecord();
-
-    private double acc;
-    private double ratio;
-    private RecordFactory factory;
-
-    @Override
-    protected void setup(Context context)
-        throws IOException, InterruptedException {
-      if (!context.nextKey() ||
-           context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
-        throw new IOException("Missing reduce spec");
-      }
-      long outBytes = 0L;
-      long outRecords = 0L;
-      long inRecords = 0L;
-      for (GridmixRecord ignored : context.getValues()) {
-        final GridmixKey spec = context.getCurrentKey();
-        inRecords += spec.getReduceInputRecords();
-        outBytes += spec.getReduceOutputBytes();
-        outRecords += spec.getReduceOutputRecords();
-      }
-      if (0 == outRecords && inRecords > 0) {
-        LOG.info("Spec output bytes w/o records. Using input record count");
-        outRecords = inRecords;
-      }
-      factory =
-        new AvgRecordFactory(outBytes, outRecords, context.getConfiguration());
-      ratio = outRecords / (1.0 * inRecords);
-      acc = 0.0;
-    }
-    @Override
-    protected void reduce(GridmixKey key, Iterable<GridmixRecord> values,
-        Context context) throws IOException, InterruptedException {
-      for (GridmixRecord ignored : values) {
-        acc += ratio;
-        while (acc >= 1.0 && factory.next(null, val)) {
-          context.write(NullWritable.get(), val);
-          acc -= 1.0;
-        }
-      }
-    }
-    @Override
-    protected void cleanup(Context context)
-        throws IOException, InterruptedException {
-      val.setSeed(r.nextLong());
-      while (factory.next(null, val)) {
-        context.write(NullWritable.get(), val);
-        val.setSeed(r.nextLong());
-      }
-    }
-  }
-
-  static class GridmixRecordReader
-      extends RecordReader<NullWritable,GridmixRecord> {
-
-    private RecordFactory factory;
-    private final Random r = new Random();
-    private final GridmixRecord val = new GridmixRecord();
-
-    public GridmixRecordReader() { }
-
-    @Override
-    public void initialize(InputSplit genericSplit, TaskAttemptContext ctxt)
-            throws IOException, InterruptedException {
-      final GridmixSplit split = (GridmixSplit)genericSplit;
-      final Configuration conf = ctxt.getConfiguration();
-      factory = new ReadRecordFactory(split.getLength(),
-          split.getInputRecords(), new FileQueue(split, conf), conf);
-    }
-
-    @Override
-    public boolean nextKeyValue() throws IOException {
-      val.setSeed(r.nextLong());
-      return factory.next(null, val);
-    }
-    @Override
-    public float getProgress() throws IOException {
-      return factory.getProgress();
-    }
-    @Override
-    public NullWritable getCurrentKey() {
-      return NullWritable.get();
-    }
-    @Override
-    public GridmixRecord getCurrentValue() {
-      return val;
-    }
-    @Override
-    public void close() throws IOException {
-      factory.close();
-    }
-  }
-
-  static class GridmixInputFormat
-      extends InputFormat<NullWritable,GridmixRecord> {
-
-    @Override
-    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
-      return pullDescription(jobCtxt.getConfiguration().getInt(
-            "gridmix.job.seq", -1));
-    }
-    @Override
-    public RecordReader<NullWritable,GridmixRecord> createRecordReader(
-        InputSplit split, final TaskAttemptContext taskContext)
-        throws IOException {
-      return new GridmixRecordReader();
-    }
-  }
-
   static class RawBytesOutputFormat<K>
       extends FileOutputFormat<K,GridmixRecord> {
 
@@ -525,74 +283,4 @@ class GridmixJob implements Callable<Job
       };
     }
   }
-
-  // TODO replace with ThreadLocal submitter?
-  private static final ConcurrentHashMap<Integer,List<InputSplit>> descCache =
-    new ConcurrentHashMap<Integer,List<InputSplit>>();
-
-  static void pushDescription(int seq, List<InputSplit> splits) {
-    if (null != descCache.putIfAbsent(seq, splits)) {
-      throw new IllegalArgumentException("Description exists for id " + seq);
-    }
-  }
-
-  static List<InputSplit> pullDescription(int seq) {
-    return descCache.remove(seq);
-  }
-
-  // not nesc when TL
-  static void clearAll() {
-    descCache.clear();
-  }
-
-  void buildSplits(FilePool inputDir) throws IOException {
-    long mapInputBytesTotal = 0L;
-    long mapOutputBytesTotal = 0L;
-    long mapOutputRecordsTotal = 0L;
-    final JobStory jobdesc = getJobDesc();
-    if (null == jobdesc) {
-      return;
-    }
-    final int maps = jobdesc.getNumberMaps();
-    final int reds = jobdesc.getNumberReduces();
-    for (int i = 0; i < maps; ++i) {
-      final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
-      mapInputBytesTotal += info.getInputBytes();
-      mapOutputBytesTotal += info.getOutputBytes();
-      mapOutputRecordsTotal += info.getOutputRecords();
-    }
-    final double[] reduceRecordRatio = new double[reds];
-    final double[] reduceByteRatio = new double[reds];
-    for (int i = 0; i < reds; ++i) {
-      final TaskInfo info = jobdesc.getTaskInfo(TaskType.REDUCE, i);
-      reduceByteRatio[i] = info.getInputBytes() / (1.0 * mapOutputBytesTotal);
-      reduceRecordRatio[i] =
-        info.getInputRecords() / (1.0 * mapOutputRecordsTotal);
-    }
-    final InputStriper striper = new InputStriper(inputDir, mapInputBytesTotal);
-    final List<InputSplit> splits = new ArrayList<InputSplit>();
-    for (int i = 0; i < maps; ++i) {
-      final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
-      final long[] specBytes = new long[nSpec];
-      final long[] specRecords = new long[nSpec];
-      for (int j = 0; j < nSpec; ++j) {
-        final TaskInfo info =
-          jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps);
-        specBytes[j] = info.getOutputBytes();
-        specRecords[j] = info.getOutputRecords();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
-              i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
-        }
-      }
-      final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
-      splits.add(new GridmixSplit(striper.splitFor(inputDir,
-              info.getInputBytes(), 3), maps, i,
-            info.getInputBytes(), info.getInputRecords(),
-            info.getOutputBytes(), info.getOutputRecords(),
-            reduceByteRatio, reduceRecordRatio, specBytes, specRecords));
-    }
-    pushDescription(id(), splits);
-  }
-
 }

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java?rev=1077373&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java Fri Mar  4 04:08:31 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.rumen.JobStory;
+
+import java.io.IOException;
+
+public enum JobCreator {
+
+  LOADJOB("LOADJOB") {
+    @Override
+    public GridmixJob createGridmixJob(
+      Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
+      UserGroupInformation ugi, int seq) throws IOException {
+      return new LoadJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
+    }},
+
+  SLEEPJOB("SLEEPJOB") {
+    @Override
+    public GridmixJob createGridmixJob(
+      Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
+      UserGroupInformation ugi, int seq) throws IOException {
+      return new SleepJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
+    }};
+
+  public static final String GRIDMIX_JOB_TYPE = "gridmix.job.type";
+
+
+  private final String name;
+
+  JobCreator(String name) {
+    this.name = name;
+  }
+
+  public abstract GridmixJob createGridmixJob(
+    final Configuration conf, long submissionMillis, final JobStory jobdesc,
+    Path outRoot, UserGroupInformation ugi, final int seq) throws IOException;
+
+  public static JobCreator getPolicy(
+    Configuration conf, JobCreator defaultPolicy) {
+    return conf.getEnum(GRIDMIX_JOB_TYPE, defaultPolicy);
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Fri Mar  4 04:08:31 2011
@@ -22,7 +22,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -63,6 +62,7 @@ abstract class JobFactory<T> implements 
   protected volatile IOException error = null;
   protected final JobStoryProducer jobProducer;
   protected final ReentrantLock lock = new ReentrantLock(true);
+  protected final JobCreator jobCreator;
 
   /**
    * Creating a new instance does not start the thread.
@@ -104,6 +104,7 @@ abstract class JobFactory<T> implements 
       LOG.debug(" The submission thread name is " + rThread.getName());
     }
     this.userResolver = userResolver;
+    this.jobCreator = JobCreator.getPolicy(conf,JobCreator.LOADJOB);
   }
 
 

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java?rev=1077373&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java Fri Mar  4 04:08:31 2011
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Synthetic job generated from a trace description.
+ */
+class LoadJob extends GridmixJob {
+
+  public static final Log LOG = LogFactory.getLog(LoadJob.class);
+
+  public LoadJob(
+    final Configuration conf, long submissionMillis, final JobStory jobdesc,
+    Path outRoot, UserGroupInformation ugi, final int seq) throws IOException {
+    super(conf,submissionMillis,jobdesc,outRoot,ugi,seq);
+  }
+
+  public Job call() throws IOException, InterruptedException,
+                           ClassNotFoundException {
+    ugi.doAs(
+      new PrivilegedExceptionAction<Job>() {
+        public Job run() throws IOException, ClassNotFoundException,
+                                InterruptedException {
+          job.setMapperClass(LoadMapper.class);
+          job.setReducerClass(LoadReducer.class);
+          job.setNumReduceTasks(jobdesc.getNumberReduces());
+          job.setMapOutputKeyClass(GridmixKey.class);
+          job.setMapOutputValueClass(GridmixRecord.class);
+          job.setSortComparatorClass(GridmixKey.Comparator.class);
+          job.setGroupingComparatorClass(SpecGroupingComparator.class);
+          job.setInputFormatClass(LoadInputFormat.class);
+          job.setOutputFormatClass(RawBytesOutputFormat.class);
+          job.setPartitionerClass(DraftPartitioner.class);
+          job.setJarByClass(LoadJob.class);
+          job.getConfiguration().setBoolean(
+            "mapred.used.genericoptionsparser", true);
+          FileOutputFormat.setOutputPath(job, outdir);
+          job.submit();
+          return job;
+        }
+      });
+
+    return job;
+  }
+
+  public static class LoadMapper
+      extends Mapper<NullWritable,GridmixRecord,GridmixKey,GridmixRecord> {
+
+    private double acc;
+    private double ratio;
+    private final ArrayList<RecordFactory> reduces =
+      new ArrayList<RecordFactory>();
+    private final Random r = new Random();
+
+    private final GridmixKey key = new GridmixKey();
+    private final GridmixRecord val = new GridmixRecord();
+
+    @Override
+    protected void setup(Context ctxt)
+        throws IOException, InterruptedException {
+      final Configuration conf = ctxt.getConfiguration();
+      final LoadSplit split = (LoadSplit) ctxt.getInputSplit();
+      final int maps = split.getMapCount();
+      final long[] reduceBytes = split.getOutputBytes();
+      final long[] reduceRecords = split.getOutputRecords();
+
+      long totalRecords = 0L;
+      final int nReduces = ctxt.getNumReduceTasks();
+      if (nReduces > 0) {
+        int idx = 0;
+        int id = split.getId();
+        for (int i = 0; i < nReduces; ++i) {
+          final GridmixKey.Spec spec = new GridmixKey.Spec();
+          if (i == id) {
+            spec.bytes_out = split.getReduceBytes(idx);
+            spec.rec_out = split.getReduceRecords(idx);
+            ++idx;
+            id += maps;
+          }
+          reduces.add(new IntermediateRecordFactory(
+              new AvgRecordFactory(reduceBytes[i], reduceRecords[i], conf),
+              i, reduceRecords[i], spec, conf));
+          totalRecords += reduceRecords[i];
+        }
+      } else {
+        reduces.add(new AvgRecordFactory(reduceBytes[0], reduceRecords[0],
+              conf));
+        totalRecords = reduceRecords[0];
+      }
+      final long splitRecords = split.getInputRecords();
+      final long inputRecords = splitRecords <= 0 && split.getLength() >= 0
+        ? Math.max(1,
+          split.getLength() / conf.getInt("gridmix.missing.rec.size", 64*1024))
+        : splitRecords;
+      ratio = totalRecords / (1.0 * inputRecords);
+      acc = 0.0;
+    }
+
+    @Override
+    public void map(NullWritable ignored, GridmixRecord rec,
+        Context context) throws IOException, InterruptedException {
+      acc += ratio;
+      while (acc >= 1.0 && !reduces.isEmpty()) {
+        key.setSeed(r.nextLong());
+        val.setSeed(r.nextLong());
+        final int idx = r.nextInt(reduces.size());
+        final RecordFactory f = reduces.get(idx);
+        if (!f.next(key, val)) {
+          reduces.remove(idx);
+          continue;
+        }
+        context.write(key, val);
+        acc -= 1.0;
+      }
+    }
+
+    @Override
+    public void cleanup(Context context)
+        throws IOException, InterruptedException {
+      for (RecordFactory factory : reduces) {
+        key.setSeed(r.nextLong());
+        while (factory.next(key, val)) {
+          context.write(key, val);
+          key.setSeed(r.nextLong());
+        }
+      }
+    }
+  }
+
+  public static class LoadReducer
+      extends Reducer<GridmixKey,GridmixRecord,NullWritable,GridmixRecord> {
+
+    private final Random r = new Random();
+    private final GridmixRecord val = new GridmixRecord();
+
+    private double acc;
+    private double ratio;
+    private RecordFactory factory;
+
+    @Override
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      if (!context.nextKey() ||
+           context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
+        throw new IOException("Missing reduce spec");
+      }
+      long outBytes = 0L;
+      long outRecords = 0L;
+      long inRecords = 0L;
+      for (GridmixRecord ignored : context.getValues()) {
+        final GridmixKey spec = context.getCurrentKey();
+        inRecords += spec.getReduceInputRecords();
+        outBytes += spec.getReduceOutputBytes();
+        outRecords += spec.getReduceOutputRecords();
+      }
+      if (0 == outRecords && inRecords > 0) {
+        LOG.info("Spec output bytes w/o records. Using input record count");
+        outRecords = inRecords;
+      }
+      factory =
+        new AvgRecordFactory(outBytes, outRecords, context.getConfiguration());
+      ratio = outRecords / (1.0 * inRecords);
+      acc = 0.0;
+    }
+    @Override
+    protected void reduce(GridmixKey key, Iterable<GridmixRecord> values,
+        Context context) throws IOException, InterruptedException {
+      for (GridmixRecord ignored : values) {
+        acc += ratio;
+        while (acc >= 1.0 && factory.next(null, val)) {
+          context.write(NullWritable.get(), val);
+          acc -= 1.0;
+        }
+      }
+    }
+    @Override
+    protected void cleanup(Context context)
+        throws IOException, InterruptedException {
+      val.setSeed(r.nextLong());
+      while (factory.next(null, val)) {
+        context.write(NullWritable.get(), val);
+        val.setSeed(r.nextLong());
+      }
+    }
+  }
+
+  static class LoadRecordReader
+      extends RecordReader<NullWritable,GridmixRecord> {
+
+    private RecordFactory factory;
+    private final Random r = new Random();
+    private final GridmixRecord val = new GridmixRecord();
+
+    public LoadRecordReader() { }
+
+    @Override
+    public void initialize(InputSplit genericSplit, TaskAttemptContext ctxt)
+            throws IOException, InterruptedException {
+      final LoadSplit split = (LoadSplit)genericSplit;
+      final Configuration conf = ctxt.getConfiguration();
+      factory = new ReadRecordFactory(split.getLength(),
+          split.getInputRecords(), new FileQueue(split, conf), conf);
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException {
+      val.setSeed(r.nextLong());
+      return factory.next(null, val);
+    }
+    @Override
+    public float getProgress() throws IOException {
+      return factory.getProgress();
+    }
+    @Override
+    public NullWritable getCurrentKey() {
+      return NullWritable.get();
+    }
+    @Override
+    public GridmixRecord getCurrentValue() {
+      return val;
+    }
+    @Override
+    public void close() throws IOException {
+      factory.close();
+    }
+  }
+
+  static class LoadInputFormat
+      extends InputFormat<NullWritable,GridmixRecord> {
+
+    @Override
+    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+      return pullDescription(jobCtxt);
+    }
+    @Override
+    public RecordReader<NullWritable,GridmixRecord> createRecordReader(
+        InputSplit split, final TaskAttemptContext taskContext)
+        throws IOException {
+      return new LoadRecordReader();
+    }
+  }
+
+  @Override
+  void buildSplits(FilePool inputDir) throws IOException {
+    long mapInputBytesTotal = 0L;
+    long mapOutputBytesTotal = 0L;
+    long mapOutputRecordsTotal = 0L;
+    final JobStory jobdesc = getJobDesc();
+    if (null == jobdesc) {
+      return;
+    }
+    final int maps = jobdesc.getNumberMaps();
+    final int reds = jobdesc.getNumberReduces();
+    for (int i = 0; i < maps; ++i) {
+      final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
+      mapInputBytesTotal += info.getInputBytes();
+      mapOutputBytesTotal += info.getOutputBytes();
+      mapOutputRecordsTotal += info.getOutputRecords();
+    }
+    final double[] reduceRecordRatio = new double[reds];
+    final double[] reduceByteRatio = new double[reds];
+    for (int i = 0; i < reds; ++i) {
+      final TaskInfo info = jobdesc.getTaskInfo(TaskType.REDUCE, i);
+      reduceByteRatio[i] = info.getInputBytes() / (1.0 * mapOutputBytesTotal);
+      reduceRecordRatio[i] =
+        info.getInputRecords() / (1.0 * mapOutputRecordsTotal);
+    }
+    final InputStriper striper = new InputStriper(inputDir, mapInputBytesTotal);
+    final List<InputSplit> splits = new ArrayList<InputSplit>();
+    for (int i = 0; i < maps; ++i) {
+      final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
+      final long[] specBytes = new long[nSpec];
+      final long[] specRecords = new long[nSpec];
+      for (int j = 0; j < nSpec; ++j) {
+        final TaskInfo info =
+          jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps);
+        specBytes[j] = info.getOutputBytes();
+        specRecords[j] = info.getOutputRecords();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
+              i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
+        }
+      }
+      final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
+      splits.add(new LoadSplit(striper.splitFor(inputDir,
+              info.getInputBytes(), 3), maps, i,
+            info.getInputBytes(), info.getInputRecords(),
+            info.getOutputBytes(), info.getOutputRecords(),
+            reduceByteRatio, reduceRecordRatio, specBytes, specRecords));
+    }
+    pushDescription(id(), splits);
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java?rev=1077373&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java Fri Mar  4 04:08:31 2011
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableUtils;
+
+class LoadSplit extends CombineFileSplit {
+  private int id;
+  private int nSpec;
+  private int maps;
+  private int reduces;
+  private long inputRecords;
+  private long outputBytes;
+  private long outputRecords;
+  private long maxMemory;
+  private double[] reduceBytes = new double[0];
+  private double[] reduceRecords = new double[0];
+
+  // Spec for reduces id mod this
+  private long[] reduceOutputBytes = new long[0];
+  private long[] reduceOutputRecords = new long[0];
+
+  LoadSplit() {
+    super();
+  }
+
+  public LoadSplit(CombineFileSplit cfsplit, int maps, int id,
+      long inputBytes, long inputRecords, long outputBytes,
+      long outputRecords, double[] reduceBytes, double[] reduceRecords,
+      long[] reduceOutputBytes, long[] reduceOutputRecords)
+      throws IOException {
+    super(cfsplit);
+    this.id = id;
+    this.maps = maps;
+    reduces = reduceBytes.length;
+    this.inputRecords = inputRecords;
+    this.outputBytes = outputBytes;
+    this.outputRecords = outputRecords;
+    this.reduceBytes = reduceBytes;
+    this.reduceRecords = reduceRecords;
+    nSpec = reduceOutputBytes.length;
+    this.reduceOutputBytes = reduceOutputBytes;
+    this.reduceOutputRecords = reduceOutputRecords;
+  }
+
+  public int getId() {
+    return id;
+  }
+  public int getMapCount() {
+    return maps;
+  }
+  public long getInputRecords() {
+    return inputRecords;
+  }
+  public long[] getOutputBytes() {
+    if (0 == reduces) {
+      return new long[] { outputBytes };
+    }
+    final long[] ret = new long[reduces];
+    for (int i = 0; i < reduces; ++i) {
+      ret[i] = Math.round(outputBytes * reduceBytes[i]);
+    }
+    return ret;
+  }
+  public long[] getOutputRecords() {
+    if (0 == reduces) {
+      return new long[] { outputRecords };
+    }
+    final long[] ret = new long[reduces];
+    for (int i = 0; i < reduces; ++i) {
+      ret[i] = Math.round(outputRecords * reduceRecords[i]);
+    }
+    return ret;
+  }
+  public long getReduceBytes(int i) {
+    return reduceOutputBytes[i];
+  }
+  public long getReduceRecords(int i) {
+    return reduceOutputRecords[i];
+  }
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    WritableUtils.writeVInt(out, id);
+    WritableUtils.writeVInt(out, maps);
+    WritableUtils.writeVLong(out, inputRecords);
+    WritableUtils.writeVLong(out, outputBytes);
+    WritableUtils.writeVLong(out, outputRecords);
+    WritableUtils.writeVLong(out, maxMemory);
+    WritableUtils.writeVInt(out, reduces);
+    for (int i = 0; i < reduces; ++i) {
+      out.writeDouble(reduceBytes[i]);
+      out.writeDouble(reduceRecords[i]);
+    }
+    WritableUtils.writeVInt(out, nSpec);
+    for (int i = 0; i < nSpec; ++i) {
+      WritableUtils.writeVLong(out, reduceOutputBytes[i]);
+      WritableUtils.writeVLong(out, reduceOutputRecords[i]);
+    }
+  }
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    id = WritableUtils.readVInt(in);
+    maps = WritableUtils.readVInt(in);
+    inputRecords = WritableUtils.readVLong(in);
+    outputBytes = WritableUtils.readVLong(in);
+    outputRecords = WritableUtils.readVLong(in);
+    maxMemory = WritableUtils.readVLong(in);
+    reduces = WritableUtils.readVInt(in);
+    if (reduceBytes.length < reduces) {
+      reduceBytes = new double[reduces];
+      reduceRecords = new double[reduces];
+    }
+    for (int i = 0; i < reduces; ++i) {
+      reduceBytes[i] = in.readDouble();
+      reduceRecords[i] = in.readDouble();
+    }
+    nSpec = WritableUtils.readVInt(in);
+    if (reduceOutputBytes.length < nSpec) {
+      reduceOutputBytes = new long[nSpec];
+      reduceOutputRecords = new long[nSpec];
+    }
+    for (int i = 0; i < nSpec; ++i) {
+      reduceOutputBytes[i] = WritableUtils.readVLong(in);
+      reduceOutputRecords[i] = WritableUtils.readVLong(in);
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java Fri Mar  4 04:08:31 2011
@@ -98,11 +98,12 @@ import java.util.concurrent.TimeUnit;
             }
             last = current;
             submitter.add(
-              new GridmixJob(
-                conf, initTime + Math.round(rateFactor * (current - first)),
-                job, scratch, userResolver.getTargetUgi(
-                  UserGroupInformation.createRemoteUser(job.getUser())),
-                sequence.getAndIncrement()));
+              jobCreator.createGridmixJob(
+                conf, initTime + Math.round(
+                  rateFactor * (current - first)), job, scratch,
+                userResolver.getTargetUgi(
+                  UserGroupInformation.createRemoteUser(
+                    job.getUser())), sequence.getAndIncrement()));
           } catch (IOException e) {
             error = e;
             return;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java Fri Mar  4 04:08:31 2011
@@ -97,7 +97,7 @@ public class SerialJobFactory extends Jo
               LOG.debug(
                 "Serial mode submitting job " + job.getName());
             }
-            prevJob = new GridmixJob(
+            prevJob = jobCreator.createGridmixJob(
               conf, 0L, job, scratch, userResolver.getTargetUgi(
                 UserGroupInformation.createRemoteUser(job.getUser())),
               sequence.getAndIncrement());

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java?rev=1077373&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java Fri Mar  4 04:08:31 2011
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class SleepJob extends GridmixJob {
+  public static final Log LOG = LogFactory.getLog(SleepJob.class);
+
+  /**
+   * Interval at which to report progress, in seconds.
+   */
+  public static final String GRIDMIX_SLEEP_INTERVAL = "gridmix.sleep.interval";
+
+  public SleepJob(
+    Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
+    UserGroupInformation ugi, int seq) throws IOException {
+    super(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
+  }
+
+  @Override
+  public Job call()
+    throws IOException, InterruptedException, ClassNotFoundException {
+    ugi.doAs(
+      new PrivilegedExceptionAction<Job>() {
+        public Job run()
+          throws IOException, ClassNotFoundException, InterruptedException {
+          job.setMapperClass(SleepMapper.class);
+          job.setReducerClass(SleepReducer.class);
+          job.setNumReduceTasks(jobdesc.getNumberReduces());
+          job.setMapOutputKeyClass(GridmixKey.class);
+          job.setMapOutputValueClass(NullWritable.class);
+          job.setSortComparatorClass(GridmixKey.Comparator.class);
+          job.setGroupingComparatorClass(SpecGroupingComparator.class);
+          job.setInputFormatClass(SleepInputFormat.class);
+          job.setOutputFormatClass(NullOutputFormat.class);
+          job.setPartitionerClass(DraftPartitioner.class);
+          job.setJarByClass(SleepJob.class);
+          job.getConfiguration().setBoolean(
+            "mapred.used.genericoptionsparser", true);
+          job.submit();
+          return job;
+
+        }
+      });
+
+    return job;
+  }
+
+  public static class SleepMapper
+    extends Mapper<LongWritable, LongWritable, GridmixKey, NullWritable> {
+
+    @Override
+    public void map(LongWritable key, LongWritable value, Context context)
+      throws IOException, InterruptedException {
+      context.setStatus("Sleeping... " + value.get() + " ms left");
+      long now = System.currentTimeMillis();
+      if (now < key.get()) {
+        TimeUnit.MILLISECONDS.sleep(key.get() - now);
+      }
+    }
+
+    @Override
+    public void cleanup(Context context)
+      throws IOException, InterruptedException {
+      final int nReds = context.getNumReduceTasks();
+      if (nReds > 0) {
+        final SleepSplit split = (SleepSplit) context.getInputSplit();
+        int id = split.getId();
+        final int nMaps = split.getNumMaps();
+        //This is a hack to pass the sleep duration via Gridmix key
+        //TODO: We need to come up with better solution for this.
+        
+        final GridmixKey key = new GridmixKey(GridmixKey.REDUCE_SPEC, 0, 0L);
+        for (int i = id, idx = 0; i < nReds; i += nMaps) {
+          key.setPartition(i);
+          key.setReduceOutputBytes(split.getReduceDurations(idx++));
+          id += nReds;
+          context.write(key, NullWritable.get());
+        }
+      }
+    }
+
+  }
+
+  public static class SleepReducer
+    extends Reducer<GridmixKey, NullWritable, NullWritable, NullWritable> {
+
+    private long duration = 0L;
+
+    @Override
+    protected void setup(Context context)
+      throws IOException, InterruptedException {
+      if (!context.nextKey() ||
+        context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
+        throw new IOException("Missing reduce spec");
+      }
+      for (NullWritable ignored : context.getValues()) {
+        final GridmixKey spec = context.getCurrentKey();
+        duration += spec.getReduceOutputBytes();
+      }
+      final long RINTERVAL = TimeUnit.MILLISECONDS.convert(
+        context.getConfiguration().getLong(
+          GRIDMIX_SLEEP_INTERVAL, 5), TimeUnit.SECONDS);
+      //This is to stop accumulating deviation from expected sleep time
+      //over a period of time.
+      long start = System.currentTimeMillis();
+      long slept = 0L;
+      long sleep = 0L;
+      while (slept < duration) {
+        final long rem = duration - slept;
+        sleep = Math.min(rem, RINTERVAL);
+        context.setStatus("Sleeping... " + rem + " ms left");
+        TimeUnit.MILLISECONDS.sleep(sleep);
+        slept = System.currentTimeMillis() - start;
+      }
+    }
+
+    @Override
+    protected void cleanup(Context context)
+      throws IOException, InterruptedException {
+      final String msg = "Slept for " + duration;
+      LOG.info(msg);
+      context.setStatus(msg);
+    }
+  }
+
+  public static class SleepInputFormat
+    extends InputFormat<LongWritable, LongWritable> {
+
+    @Override
+    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+      return pullDescription(jobCtxt);
+    }
+
+    @Override
+    public RecordReader<LongWritable, LongWritable> createRecordReader(
+      InputSplit split, final TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      final long duration = split.getLength();
+      final long RINTERVAL = TimeUnit.MILLISECONDS.convert(
+        context.getConfiguration().getLong(
+          GRIDMIX_SLEEP_INTERVAL, 5), TimeUnit.SECONDS);
+      if (RINTERVAL <= 0) {
+        throw new IOException(
+          "Invalid " + GRIDMIX_SLEEP_INTERVAL + ": " + RINTERVAL);
+      }
+      return new RecordReader<LongWritable, LongWritable>() {
+        long start = -1;
+        long slept = 0L;
+        long sleep = 0L;
+        final LongWritable key = new LongWritable();
+        final LongWritable val = new LongWritable();
+
+        @Override
+        public boolean nextKeyValue() throws IOException {
+          if (start == -1) {
+            start = System.currentTimeMillis();
+          }
+          slept += sleep;
+          sleep = Math.min(duration - slept, RINTERVAL);
+          key.set(slept + sleep + start);
+          val.set(duration - slept);
+          return slept < duration;
+        }
+
+        @Override
+        public float getProgress() throws IOException {
+          return slept / ((float) duration);
+        }
+
+        @Override
+        public LongWritable getCurrentKey() {
+          return key;
+        }
+
+        @Override
+        public LongWritable getCurrentValue() {
+          return val;
+        }
+
+        @Override
+        public void close() throws IOException {
+          final String msg = "Slept for " + duration;
+          LOG.info(msg);
+        }
+
+        public void initialize(InputSplit split, TaskAttemptContext ctxt) {
+        }
+      };
+    }
+  }
+
+  public static class SleepSplit extends InputSplit implements Writable {
+    private int id;
+    private int nSpec;
+    private int nMaps;
+    private long sleepDuration;
+    private long[] reduceDurations = new long[0];
+    private String[] locations = new String[0];
+
+    public SleepSplit() {
+    }
+
+    public SleepSplit(
+      int id, long sleepDuration, long[] reduceDurations, int nMaps,
+      String[] locations) {
+      this.id = id;
+      this.sleepDuration = sleepDuration;
+      nSpec = reduceDurations.length;
+      this.reduceDurations = reduceDurations;
+      this.nMaps = nMaps;
+      this.locations = locations;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      WritableUtils.writeVInt(out, id);
+      WritableUtils.writeVLong(out, sleepDuration);
+      WritableUtils.writeVInt(out, nMaps);
+      WritableUtils.writeVInt(out, nSpec);
+      for (int i = 0; i < nSpec; ++i) {
+        WritableUtils.writeVLong(out, reduceDurations[i]);
+      }
+      WritableUtils.writeVInt(out, locations.length);
+      for (int i = 0; i < locations.length; ++i) {
+        Text.writeString(out, locations[i]);
+      }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      id = WritableUtils.readVInt(in);
+      sleepDuration = WritableUtils.readVLong(in);
+      nMaps = WritableUtils.readVInt(in);
+      nSpec = WritableUtils.readVInt(in);
+      if (reduceDurations.length < nSpec) {
+        reduceDurations = new long[nSpec];
+      }
+      for (int i = 0; i < nSpec; ++i) {
+        reduceDurations[i] = WritableUtils.readVLong(in);
+      }
+      final int nLoc = WritableUtils.readVInt(in);
+      if (nLoc != locations.length) {
+        locations = new String[nLoc];
+      }
+      for (int i = 0; i < nLoc; ++i) {
+        locations[i] = Text.readString(in);
+      }
+    }
+
+    @Override
+    public long getLength() {
+      return sleepDuration;
+    }
+
+    public int getId() {
+      return id;
+    }
+
+    public int getNumMaps() {
+      return nMaps;
+    }
+
+    public long getReduceDurations(int i) {
+      return reduceDurations[i];
+    }
+
+    @Override
+    public String[] getLocations() {
+      return locations;
+    }
+  }
+
+  private TaskAttemptInfo getSuccessfulAttemptInfo(TaskType type, int task) {
+    TaskAttemptInfo ret;
+    for (int i = 0; true; ++i) {
+      // Rumen should make up an attempt if it's missing. Or this won't work
+      // at all. It's hard to discern what is happening in there.
+      ret = jobdesc.getTaskAttemptInfo(type, task, i);
+      if (ret.getRunState() == TaskStatus.State.SUCCEEDED) {
+        break;
+      }
+    }
+    if(ret.getRunState() != TaskStatus.State.SUCCEEDED) {
+      LOG.warn("No sucessful attempts tasktype " + type +" task "+ task);
+    }
+
+    return ret;
+  }
+
+  @Override
+  void buildSplits(FilePool inputDir) throws IOException {
+    final List<InputSplit> splits = new ArrayList<InputSplit>();
+    final int reds = jobdesc.getNumberReduces();
+    final int maps = jobdesc.getNumberMaps();
+    for (int i = 0; i < maps; ++i) {
+      final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
+      final long[] redDurations = new long[nSpec];
+      for (int j = 0; j < nSpec; ++j) {
+        final ReduceTaskAttemptInfo info =
+          (ReduceTaskAttemptInfo) getSuccessfulAttemptInfo(
+            TaskType.REDUCE, i + j * maps);
+        // Include only merge/reduce time
+        redDurations[j] = info.getMergeRuntime() + info.getReduceRuntime();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+            String.format(
+              "SPEC(%d) %d -> %d %d/%d", id(), i, i + j * maps, redDurations[j],
+              info.getRuntime()));
+        }
+      }
+      final TaskAttemptInfo info = getSuccessfulAttemptInfo(TaskType.MAP, i);
+      splits.add(
+        new SleepSplit(
+          i, info.getRuntime(), redDurations, maps, new String[0]));
+    }
+    pushDescription(id(), splits);
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java Fri Mar  4 04:08:31 2011
@@ -227,7 +227,8 @@ public class Statistics implements Compo
 
     private void getJobReports(List<JobStatus> jobs) throws IOException {
       for (final JobStatus job : jobs) {
-        UserGroupInformation user = userResolver.getTargetUgi(
+
+        final UserGroupInformation user = userResolver.getTargetUgi(
           UserGroupInformation.createRemoteUser(job.getUsername()));
         try {
           user.doAs(
@@ -241,7 +242,8 @@ public class Statistics implements Compo
                         org.apache.hadoop.mapred.JobID.downgrade(id)));
                   } catch (IOException e) {
                     LOG.error(
-                      " Couldnt get the MapTaskResports for "+ job.getJobId());
+                      " Couldnt get the MapTaskResports for " + job.getJobId() +
+                        " job username "+ job.getUsername() +" cause " + user);
                   }
                 }
                 return null;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java Fri Mar  4 04:08:31 2011
@@ -128,7 +128,7 @@ public class StressJobFactory extends Jo
                 i += job.getNumberMaps();
 
                 submitter.add(
-                  new GridmixJob(
+                  jobCreator.createGridmixJob(
                     conf, 0L, job, scratch, userResolver.getTargetUgi(
                       UserGroupInformation.createRemoteUser(
                         job.getUser())), sequence.getAndIncrement()));

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Fri Mar  4 04:08:31 2011
@@ -39,13 +39,13 @@ class DebugJobFactory {
     CountDownLatch startFlag,UserResolver resolver) throws IOException {
     GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
       conf, GridmixJobSubmissionPolicy.STRESS);
-    if (policy.name().equalsIgnoreCase("REPLAY")) {
+    if (policy==GridmixJobSubmissionPolicy.REPLAY) {
       return new DebugReplayJobFactory(
         submitter, scratch, numJobs, conf, startFlag,resolver);
-    } else if (policy.name().equalsIgnoreCase("STRESS")) {
+    } else if (policy==GridmixJobSubmissionPolicy.STRESS) {
       return new DebugStressJobFactory(
         submitter, scratch, numJobs, conf, startFlag,resolver);
-    } else if (policy.name().equalsIgnoreCase("SERIAL")) {
+    } else if (policy==GridmixJobSubmissionPolicy.SERIAL) {
       return new DebugSerialJobFactory(
         submitter, scratch, numJobs, conf, startFlag,resolver);
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java Fri Mar  4 04:08:31 2011
@@ -17,8 +17,11 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
+import org.apache.hadoop.mapred.TaskStatus.State;
 import org.apache.hadoop.tools.rumen.JobStoryProducer;
 import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.MapTaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
 import org.apache.hadoop.tools.rumen.TaskInfo;
 import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
 import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
@@ -257,6 +260,19 @@ public class DebugJobProducer implements
     @Override
     public TaskAttemptInfo getTaskAttemptInfo(
       TaskType taskType, int taskNumber, int taskAttemptNumber) {
+      switch (taskType) {
+        case MAP:
+          return new MapTaskAttemptInfo(
+            State.SUCCEEDED, new TaskInfo(
+              m_bytesIn[taskNumber], m_recsIn[taskNumber],
+              m_bytesOut[taskNumber], m_recsOut[taskNumber], -1),100);
+
+        case REDUCE:
+          return new ReduceTaskAttemptInfo(
+            State.SUCCEEDED, new TaskInfo(
+              r_bytesIn[taskNumber], r_recsIn[taskNumber],
+              r_bytesOut[taskNumber], r_recsOut[taskNumber], -1),100,100,100);
+      }
       throw new UnsupportedOperationException();
     }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=1077373&r1=1077372&r2=1077373&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Fri Mar  4 04:08:31 2011
@@ -17,19 +17,13 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.Counters;
@@ -43,19 +37,26 @@ import org.apache.hadoop.mapreduce.TaskT
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.TaskInfo;
 import org.apache.hadoop.util.ToolRunner;
-import static org.apache.hadoop.mapred.Task.Counter.*;
-
-import org.junit.Test;
-import org.junit.BeforeClass;
+import org.apache.log4j.Level;
 import org.junit.AfterClass;
-import static org.junit.Assert.*;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.log4j.Level;
-import org.apache.hadoop.security.UserGroupInformation;
-import java.security.PrivilegedExceptionAction;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
+import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
+import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapred.Task.Counter.REDUCE_INPUT_RECORDS;
+import static org.apache.hadoop.mapred.Task.Counter.REDUCE_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapred.Task.Counter.REDUCE_SHUFFLE_BYTES;
+import static org.junit.Assert.*;
 
 public class TestGridmixSubmission {
   static GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.REPLAY;
@@ -105,7 +106,8 @@ public class TestGridmixSubmission {
       for (JobStory spec : submitted) {
         sub.put(spec.getName(), spec);
       }
-      final JobClient client = new JobClient(GridmixTestUtils.mrCluster.createJobConf());
+      final JobClient client = new JobClient(
+        GridmixTestUtils.mrCluster.createJobConf());
       for (Job job : succeeded) {
         final String jobname = job.getJobName();
         if ("GRIDMIX_GENDATA".equals(jobname)) {
@@ -293,8 +295,6 @@ public class TestGridmixSubmission {
 
     @Override
     protected JobMonitor createJobMonitor(Statistics stats) {
-      Configuration conf = new Configuration();
-      conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy.name());
       monitor = new TestMonitor(NJOBS + 1, stats);
       return monitor;
     }
@@ -350,7 +350,7 @@ public class TestGridmixSubmission {
     };
     DebugGridmix client = new DebugGridmix();
     conf = new Configuration();
-      conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy.name());
+      conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy);
     conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
 //    GridmixTestUtils.createHomeAndStagingDirectory((JobConf)conf);
     // allow synthetic users to create home directories

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java?rev=1077373&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java Fri Mar  4 04:08:31 2011
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.*;
+
+public class TestSleepJob {
+
+  public static final Log LOG = LogFactory.getLog(Gridmix.class);
+
+  {
+    ((Log4JLogger) LogFactory.getLog("org.apache.hadoop.mapred.gridmix"))
+      .getLogger().setLevel(Level.DEBUG);
+  }
+
+  static GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.REPLAY;
+  private static final int NJOBS = 2;
+  private static final long GENDATA = 50; // in megabytes
+
+
+  @BeforeClass
+  public static void init() throws IOException {
+    GridmixTestUtils.initCluster();
+  }
+
+  @AfterClass
+  public static void shutDown() throws IOException {
+    GridmixTestUtils.shutdownCluster();
+  }
+
+  static class TestMonitor extends JobMonitor {
+    private final BlockingQueue<Job> retiredJobs;
+    private final int expected;
+
+    public TestMonitor(int expected, Statistics stats) {
+      super(stats);
+      this.expected = expected;
+      retiredJobs = new LinkedBlockingQueue<Job>();
+    }
+
+    @Override
+    protected void onSuccess(Job job) {
+      System.out.println(" Job Sucess " + job);
+      retiredJobs.add(job);
+    }
+
+    @Override
+    protected void onFailure(Job job) {
+      fail("Job failure: " + job);
+    }
+
+    public void verify(ArrayList<JobStory> submitted) throws Exception  {
+      assertEquals("Bad job count", expected, retiredJobs.size());
+    }
+  }
+
+
+  static class DebugGridmix extends Gridmix {
+
+    private JobFactory factory;
+    private TestMonitor monitor;
+
+    @Override
+    protected JobMonitor createJobMonitor(Statistics stats) {
+      monitor = new TestMonitor(NJOBS + 1, stats);
+      return monitor;
+    }
+
+    @Override
+    protected JobFactory createJobFactory(
+      JobSubmitter submitter, String traceIn, Path scratchDir,
+      Configuration conf, CountDownLatch startFlag, UserResolver userResolver)
+      throws IOException {
+      factory = DebugJobFactory.getFactory(
+        submitter, scratchDir, NJOBS, conf, startFlag, userResolver);
+      return factory;
+    }
+
+    public void checkMonitor() throws Exception {
+      monitor.verify(((DebugJobFactory.Debuggable) factory).getSubmitted());
+    }
+  }
+
+
+  @Test
+  public void testReplaySubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.REPLAY;
+    System.out.println(" Replay started at " + System.currentTimeMillis());
+    doSubmission();
+    System.out.println(" Replay ended at " + System.currentTimeMillis());
+  }
+
+  @Test
+  public void testStressSubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.STRESS;
+    System.out.println(" Stress started at " + System.currentTimeMillis());
+    doSubmission();
+    System.out.println(" Stress ended at " + System.currentTimeMillis());
+  }
+
+  @Test
+  public void testSerialSubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.SERIAL;
+    System.out.println("Serial started at " + System.currentTimeMillis());
+    doSubmission();
+    System.out.println("Serial ended at " + System.currentTimeMillis());
+  }
+
+
+  private void doSubmission() throws Exception {
+    final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
+    final Path out = GridmixTestUtils.DEST.makeQualified(GridmixTestUtils.dfs);
+    final Path root = new Path("/user");
+    Configuration conf = null;
+    try {
+      final String[] argv = {"-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
+        "-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
+        "-D" + Gridmix.GRIDMIX_USR_RSV + "=" + EchoUserResolver.class.getName(),
+        "-D" + JobCreator.GRIDMIX_JOB_TYPE + "=" + JobCreator.SLEEPJOB.name(),
+        "-D" + SleepJob.GRIDMIX_SLEEP_INTERVAL +"=" +"10",
+        "-generate",String.valueOf(GENDATA) + "m", in.toString(), "-"
+        // ignored by DebugGridmix
+      };
+      DebugGridmix client = new DebugGridmix();
+      conf = new Configuration();
+      conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy);
+      conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
+//    GridmixTestUtils.createHomeAndStagingDirectory((JobConf)conf);
+      // allow synthetic users to create home directories
+      GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short) 0777));
+      GridmixTestUtils.dfs.setPermission(root, new FsPermission((short) 0777));
+      int res = ToolRunner.run(conf, client, argv);
+      assertEquals("Client exited with nonzero status", 0, res);
+      client.checkMonitor();
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      in.getFileSystem(conf).delete(in, true);
+      out.getFileSystem(conf).delete(out, true);
+      root.getFileSystem(conf).delete(root, true);
+    }
+  }
+
+}