You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/07/14 11:22:30 UTC

svn commit: r963986 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/

Author: cdouglas
Date: Wed Jul 14 09:22:29 2010
New Revision: 963986

URL: http://svn.apache.org/viewvc?rev=963986&view=rev
Log:
MAPREDUCE-1840. Enhancements to Gridmix benchmark simulating user
diversity, queue replay, and task duration for JobTracker load testing.
Also includes compatibility with security enhancements, and scalability
improvements. Contributed by Amar Kamat, Rahul Singh, Hong Tang, and cdouglas

Added:
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RoundRobinUserResolver.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SubmitterUserResolver.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestUserResolve.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=963986&r1=963985&r2=963986&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jul 14 09:22:29 2010
@@ -75,6 +75,11 @@ Trunk (unreleased changes)
     MAPREDUCE-1248. Fixes redudant memory copying in StreamKeyValUtil.
     (Ruibang He via amareshwari)
 
+    MAPREDUCE-1840. Enhancements to Gridmix benchmark simulating user
+    diversity, queue replay, and task duration for JobTracker load testing.
+    Also includes compatibility with security enhancements, and scalability
+    improvements. (Amar Kamat, Rahul Singh, Hong Tang, and cdouglas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java?rev=963986&r1=963985&r2=963986&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java Wed Jul 14 09:22:29 2010
@@ -31,6 +31,8 @@ class AvgRecordFactory extends RecordFac
    * Percentage of record for key data.
    */
   public static final String GRIDMIX_KEY_FRC = "gridmix.key.fraction";
+  public static final String GRIDMIX_MISSING_REC_SIZE = 
+    "gridmix.missing.rec.size";
 
 
   private final long targetBytes;
@@ -51,7 +53,7 @@ class AvgRecordFactory extends RecordFac
     this.targetBytes = targetBytes;
     this.targetRecords = targetRecords <= 0 && this.targetBytes >= 0
       ? Math.max(1,
-          this.targetBytes / conf.getInt("gridmix.missing.rec.size", 64 * 1024))
+          this.targetBytes / conf.getInt(GRIDMIX_MISSING_REC_SIZE, 64 * 1024))
       : targetRecords;
     final long tmp = this.targetBytes / this.targetRecords;
     step = this.targetBytes - this.targetRecords * tmp;

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java?rev=963986&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/EchoUserResolver.java Wed Jul 14 09:22:29 2010
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Echos the UGI offered.
+ */
+public class EchoUserResolver implements UserResolver {
+  public static final Log LOG = LogFactory.getLog(Gridmix.class);
+
+  public EchoUserResolver() {
+    LOG.info(" Current user resolver is EchoUserResolver ");
+  }
+
+  public synchronized boolean setTargetUsers(URI userdesc, Configuration conf)
+  throws IOException {
+    return false;
+  }
+
+  public synchronized UserGroupInformation getTargetUgi(
+    UserGroupInformation ugi) {
+    return ugi;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java?rev=963986&r1=963985&r2=963986&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java Wed Jul 14 09:22:29 2010
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.OutputStream;
+import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.List;
@@ -31,6 +32,7 @@ import java.util.regex.Pattern;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -38,6 +40,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -48,6 +51,7 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
 
 // TODO can replace with form of GridmixJob
 class GenerateData extends GridmixJob {
@@ -72,6 +76,16 @@ class GenerateData extends GridmixJob {
    */
   public static final String GRIDMIX_GEN_INTERVAL = "gendata.interval.mb";
 
+  /**
+   * Blocksize of generated data.
+   */
+  public static final String GRIDMIX_GEN_BLOCKSIZE = "gridmix.gen.blocksize";
+
+  /**
+   * Replication of generated data.
+   */
+  public static final String GRIDMIX_GEN_REPLICATION = "gridmix.gen.replicas";
+
   public GenerateData(Configuration conf, Path outdir, long genbytes)
       throws IOException {
     super(conf, 0L, "GRIDMIX_GENDATA");
@@ -82,15 +96,26 @@ class GenerateData extends GridmixJob {
   @Override
   public Job call() throws IOException, InterruptedException,
                            ClassNotFoundException {
-    job.setMapperClass(GenDataMapper.class);
-    job.setNumReduceTasks(0);
-    job.setMapOutputKeyClass(NullWritable.class);
-    job.setMapOutputValueClass(BytesWritable.class);
-    job.setInputFormatClass(GenDataFormat.class);
-    job.setOutputFormatClass(RawBytesOutputFormat.class);
-    job.setJarByClass(GenerateData.class);
-    FileInputFormat.addInputPath(job, new Path("ignored"));
-    job.submit();
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+    ugi.doAs( new PrivilegedExceptionAction <Job>() {
+       public Job run() throws IOException, ClassNotFoundException,
+                               InterruptedException {
+        job.setMapperClass(GenDataMapper.class);
+        job.setNumReduceTasks(0);
+        job.setMapOutputKeyClass(NullWritable.class);
+        job.setMapOutputValueClass(BytesWritable.class);
+        job.setInputFormatClass(GenDataFormat.class);
+        job.setOutputFormatClass(RawBytesOutputFormat.class);
+        job.setJarByClass(GenerateData.class);
+        try {
+          FileInputFormat.addInputPath(job, new Path("ignored"));
+        } catch (IOException e) {
+          LOG.error("Error while adding input path ", e);
+        }
+        job.submit();
+        return job;
+      }
+    });
     return job;
   }
 
@@ -123,7 +148,8 @@ class GenerateData extends GridmixJob {
 
     @Override
     public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
-      final JobClient client = new JobClient(jobCtxt.getConfiguration());
+      final JobClient client =
+        new JobClient(new JobConf(jobCtxt.getConfiguration()));
       ClusterStatus stat = client.getClusterStatus(true);
       final long toGen =
         jobCtxt.getConfiguration().getLong(GRIDMIX_GEN_BYTES, -1);
@@ -246,6 +272,9 @@ class GenerateData extends GridmixJob {
     static class ChunkWriter extends RecordWriter<NullWritable,BytesWritable> {
       private final Path outDir;
       private final FileSystem fs;
+      private final int blocksize;
+      private final short replicas;
+      private final FsPermission genPerms = new FsPermission((short) 0777);
       private final long maxFileBytes;
 
       private long accFileBytes = 0L;
@@ -255,6 +284,8 @@ class GenerateData extends GridmixJob {
       public ChunkWriter(Path outDir, Configuration conf) throws IOException {
         this.outDir = outDir;
         fs = outDir.getFileSystem(conf);
+        blocksize = conf.getInt(GRIDMIX_GEN_BLOCKSIZE, 1 << 28);
+        replicas = (short) conf.getInt(GRIDMIX_GEN_REPLICATION, 3);
         maxFileBytes = conf.getLong(GRIDMIX_GEN_CHUNK, 1L << 30);
         nextDestination();
       }
@@ -262,7 +293,9 @@ class GenerateData extends GridmixJob {
         if (fileOut != null) {
           fileOut.close();
         }
-        fileOut = fs.create(new Path(outDir, "segment-" + (++fileIdx)), false);
+        fileOut = fs.create(new Path(outDir, "segment-" + (++fileIdx)),
+                            genPerms, false, 64 * 1024, replicas, 
+                            blocksize, null);
         accFileBytes = 0L;
       }
       @Override
@@ -271,14 +304,14 @@ class GenerateData extends GridmixJob {
         int written = 0;
         final int total = value.getLength();
         while (written < total) {
+          if (accFileBytes >= maxFileBytes) {
+            nextDestination();
+          }
           final int write = (int)
             Math.min(total - written, maxFileBytes - accFileBytes);
           fileOut.write(value.getBytes(), written, write);
           written += write;
           accFileBytes += write;
-          if (accFileBytes >= maxFileBytes) {
-            nextDestination();
-          }
         }
       }
       @Override

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=963986&r1=963985&r2=963986&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Wed Jul 14 09:22:29 2010
@@ -20,15 +20,22 @@ package org.apache.hadoop.mapred.gridmix
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -78,6 +85,12 @@ public class Gridmix extends Configured 
    */
   public static final String GRIDMIX_SUB_MUL = "gridmix.submit.multiplier";
 
+  /**
+   * Class used to resolve users in the trace to the list of target users
+   * on the cluster.
+   */
+  public static final String GRIDMIX_USR_RSV = "gridmix.user.resolve.class";
+
   // Submit data structures
   private JobFactory factory;
   private JobSubmitter submitter;
@@ -108,6 +121,15 @@ public class Gridmix extends Configured 
     if (!genData.getJob().isSuccessful()) {
       throw new IOException("Data generation failed!");
     }
+
+    FsShell shell = new FsShell(conf);
+    try {
+      LOG.info("Changing the permissions for inputPath " + ioPath.toString());
+      shell.run(new String[] {"-chmod","-R","777", ioPath.toString()});
+    } catch (Exception e) {
+      LOG.error("Couldnt change the file permissions " , e);
+      throw new IOException(e);
+    }
     LOG.info("Done.");
   }
 
@@ -129,24 +151,28 @@ public class Gridmix extends Configured 
    * @param startFlag Semaphore for starting job trace pipeline
    */
   private void startThreads(Configuration conf, String traceIn, Path ioPath,
-      Path scratchDir, CountDownLatch startFlag) throws IOException {
+      Path scratchDir, CountDownLatch startFlag, UserResolver userResolver)
+      throws IOException {
     try {
       GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
         conf, GridmixJobSubmissionPolicy.STRESS);
       LOG.info(" Submission policy is " + policy.name());
       statistics = new Statistics(conf, policy.getPollingInterval(), startFlag);
       monitor = createJobMonitor(statistics);
-      int noOfSubmitterThreads = policy.name().equals(
-        GridmixJobSubmissionPolicy.SERIAL.name()) ? 1 :
-        Runtime.getRuntime().availableProcessors() + 1;
-
-      submitter = createJobSubmitter(
-        monitor, conf.getInt(
-          GRIDMIX_SUB_THR, noOfSubmitterThreads),
-        conf.getInt(GRIDMIX_QUE_DEP, 5), new FilePool(conf, ioPath));
-      factory = createJobFactory(
-        submitter, traceIn, scratchDir, conf, startFlag);
-      if (policy.name().equals(GridmixJobSubmissionPolicy.SERIAL.name())) {
+      int noOfSubmitterThreads = 
+        (policy == GridmixJobSubmissionPolicy.SERIAL) 
+        ? 1
+        : Runtime.getRuntime().availableProcessors() + 1;
+
+      int numThreads = conf.getInt(GRIDMIX_SUB_THR, noOfSubmitterThreads);
+      int queueDep = conf.getInt(GRIDMIX_QUE_DEP, 5);
+      submitter = createJobSubmitter(monitor, numThreads, queueDep,
+                                     new FilePool(conf, ioPath), userResolver, 
+                                     statistics);
+      
+      factory = createJobFactory(submitter, traceIn, scratchDir, conf, 
+                                 startFlag, userResolver);
+      if (policy == GridmixJobSubmissionPolicy.SERIAL) {
         statistics.addJobStatsListeners(factory);
       } else {
         statistics.addClusterStatsObservers(factory);
@@ -165,52 +191,95 @@ public class Gridmix extends Configured 
   }
 
   protected JobSubmitter createJobSubmitter(JobMonitor monitor, int threads,
-      int queueDepth, FilePool pool) throws IOException {
-    return new JobSubmitter(monitor, threads, queueDepth, pool);
+      int queueDepth, FilePool pool, UserResolver resolver, 
+      Statistics statistics) throws IOException {
+    return new JobSubmitter(monitor, threads, queueDepth, pool, statistics);
   }
 
   protected JobFactory createJobFactory(JobSubmitter submitter, String traceIn,
-      Path scratchDir, Configuration conf, CountDownLatch startFlag)
+      Path scratchDir, Configuration conf, CountDownLatch startFlag, 
+      UserResolver resolver)
       throws IOException {
      return GridmixJobSubmissionPolicy.getPolicy(
        conf, GridmixJobSubmissionPolicy.STRESS).createJobFactory(
        submitter, new ZombieJobProducer(
          createInputStream(
-           traceIn), null), scratchDir, conf, startFlag);  }
+           traceIn), null), scratchDir, conf, startFlag, resolver);  }
 
-  public int run(String[] argv) throws IOException, InterruptedException {
+  private static UserResolver userResolver;
+
+  public UserResolver getCurrentUserResolver() {
+    return userResolver;
+  }
+  
+  public int run(final String[] argv) throws IOException, InterruptedException {
+    int val = -1;
+    final Configuration conf = getConf();
+    UserGroupInformation.setConfiguration(conf);
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+
+    val = ugi.doAs(new PrivilegedExceptionAction<Integer>() {
+      public Integer run() throws Exception {
+        return runJob(conf, argv);
+      }
+    });
+    return val; 
+  }
+
+  private int runJob(Configuration conf, String[] argv)
+    throws IOException, InterruptedException {
     if (argv.length < 2) {
       printUsage(System.err);
       return 1;
     }
-    long genbytes = 0;
+    long genbytes = -1L;
     String traceIn = null;
     Path ioPath = null;
+    URI userRsrc = null;
+    userResolver = ReflectionUtils.newInstance(
+                     conf.getClass(GRIDMIX_USR_RSV, 
+                       SubmitterUserResolver.class,
+                       UserResolver.class), 
+                     conf);
     try {
-      int i = 0;
-      genbytes = "-generate".equals(argv[i++])
-        ? StringUtils.TraditionalBinaryPrefix.string2long(argv[i++])
-        : --i;
-      ioPath = new Path(argv[i++]);
-      traceIn = argv[i++];
-      if (i != argv.length) {
-        printUsage(System.err);
-        return 1;
+      for (int i = 0; i < argv.length - 2; ++i) {
+        if ("-generate".equals(argv[i])) {
+          genbytes = StringUtils.TraditionalBinaryPrefix.string2long(argv[++i]);
+        } else if ("-users".equals(argv[i])) {
+          userRsrc = new URI(argv[++i]);
+        } else {
+          printUsage(System.err);
+          return 1;
+        }
+      }
+      if (!userResolver.setTargetUsers(userRsrc, conf)) {
+        LOG.warn("Resource " + userRsrc + " ignored");
       }
+      ioPath = new Path(argv[argv.length - 2]);
+      traceIn = argv[argv.length - 1];
     } catch (Exception e) {
+      e.printStackTrace();
       printUsage(System.err);
       return 1;
     }
+    return start(conf, traceIn, ioPath, genbytes, userResolver);
+  }
+
+  int start(Configuration conf, String traceIn, Path ioPath, long genbytes,
+      UserResolver userResolver) throws IOException, InterruptedException {
     InputStream trace = null;
     try {
-      final Configuration conf = getConf();
       Path scratchDir = new Path(ioPath, conf.get(GRIDMIX_OUT_DIR, "gridmix"));
+      final FileSystem scratchFs = scratchDir.getFileSystem(conf);
+      scratchFs.mkdirs(scratchDir, new FsPermission((short) 0777));
+      scratchFs.setPermission(scratchDir, new FsPermission((short) 0777));
       // add shutdown hook for SIGINT, etc.
       Runtime.getRuntime().addShutdownHook(sdh);
       CountDownLatch startFlag = new CountDownLatch(1);
       try {
         // Create, start job submission threads
-        startThreads(conf, traceIn, ioPath, scratchDir, startFlag);
+        startThreads(conf, traceIn, ioPath, scratchDir, startFlag,
+                     userResolver);
         // Write input data if specified
         if (genbytes > 0) {
           writeInputData(genbytes, ioPath);
@@ -327,14 +396,29 @@ public class Gridmix extends Configured 
 
   protected void printUsage(PrintStream out) {
     ToolRunner.printGenericCommandUsage(out);
-    out.println("Usage: gridmix [-generate <MiB>] <iopath> <trace>");
+    out.println("Usage: gridmix [-generate <MiB>] [-users URI] <iopath> <trace>");
     out.println("  e.g. gridmix -generate 100m foo -");
     out.println("Configuration parameters:");
-    out.printf("       %-40s : Output directory\n", GRIDMIX_OUT_DIR);
-    out.printf("       %-40s : Submitting threads\n", GRIDMIX_SUB_THR);
-    out.printf("       %-40s : Queued job desc\n", GRIDMIX_QUE_DEP);
-    out.printf("       %-40s : Key fraction of rec\n",
+    out.printf("       %-42s : Output directory\n", GRIDMIX_OUT_DIR);
+    out.printf("       %-42s : Submitting threads\n", GRIDMIX_SUB_THR);
+    out.printf("       %-42s : Queued job desc\n", GRIDMIX_QUE_DEP);
+    out.printf("       %-42s : Key fraction of rec\n",
         AvgRecordFactory.GRIDMIX_KEY_FRC);
+    out.printf("       %-42s : User resolution class\n", GRIDMIX_USR_RSV);
+    out.printf("       %-42s : Enable/disable using queues in trace\n",
+        GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE);
+    out.printf("       %-42s : Default queue\n",
+        GridmixJob.GRIDMIX_DEFAULT_QUEUE);
+    
+    StringBuilder sb = new StringBuilder();
+    String sep = "";
+    for (GridmixJobSubmissionPolicy p : GridmixJobSubmissionPolicy.values()) {
+      sb.append(sep);
+      sb.append(p.name());
+      sep = "|";
+    }
+    out.printf("       %-42s : Job submission policy (%s)\n",
+        GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, sb.toString());
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=963986&r1=963985&r2=963986&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Wed Jul 14 09:22:29 2010
@@ -18,40 +18,33 @@
 package org.apache.hadoop.mapred.gridmix;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Formatter;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.rumen.JobStory;
-import org.apache.hadoop.tools.rumen.TaskInfo;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -59,7 +52,7 @@ import org.apache.commons.logging.LogFac
 /**
  * Synthetic job generated from a trace description.
  */
-class GridmixJob implements Callable<Job>, Delayed {
+abstract class GridmixJob implements Callable<Job>, Delayed {
 
   public static final String JOBNAME = "GRIDMIX";
   public static final String ORIGNAME = "gridmix.job.name.original";
@@ -75,31 +68,88 @@ class GridmixJob implements Callable<Job
       }
     };
 
-  private final int seq;
-  private final Path outdir;
+  protected final int seq;
+  protected final Path outdir;
   protected final Job job;
-  private final JobStory jobdesc;
-  private final long submissionTimeNanos;
+  protected final JobStory jobdesc;
+  protected final UserGroupInformation ugi;
+  protected final long submissionTimeNanos;
+  private static final ConcurrentHashMap<Integer,List<InputSplit>> descCache =
+     new ConcurrentHashMap<Integer,List<InputSplit>>();
+  protected static final String GRIDMIX_JOB_SEQ = "gridmix.job.seq";
+  protected static final String GRIDMIX_USE_QUEUE_IN_TRACE = 
+      "gridmix.job-submission.use-queue-in-trace";
+  protected static final String GRIDMIX_DEFAULT_QUEUE = 
+      "gridmix.job-submission.default-queue";
+
+  private static void setJobQueue(Job job, String queue) {
+    if (queue != null) {
+      job.getConfiguration().set(MRJobConfig.QUEUE_NAME, queue);
+    }
+  }
+  
+  public GridmixJob(final Configuration conf, long submissionMillis,
+      final JobStory jobdesc, Path outRoot, UserGroupInformation ugi, 
+      final int seq) throws IOException {
+    this.ugi = ugi;
+    this.jobdesc = jobdesc;
+    this.seq = seq;
 
-  public GridmixJob(Configuration conf, long submissionMillis,
-      JobStory jobdesc, Path outRoot, int seq) throws IOException {
     ((StringBuilder)nameFormat.get().out()).setLength(JOBNAME.length());
-    job = new Job(conf, nameFormat.get().format("%05d", seq).toString());
+    try {
+      job = this.ugi.doAs(new PrivilegedExceptionAction<Job>() {
+        public Job run() throws IOException {
+          Job ret = 
+            new Job(conf, 
+                    nameFormat.get().format("%05d", seq).toString());
+          ret.getConfiguration().setInt(GRIDMIX_JOB_SEQ, seq);
+          String jobId = null == jobdesc.getJobID() 
+                         ? "<unknown>" 
+                         : jobdesc.getJobID().toString();
+          ret.getConfiguration().set(ORIGNAME, jobId);
+          if (conf.getBoolean(GRIDMIX_USE_QUEUE_IN_TRACE, false)) {
+            setJobQueue(ret, jobdesc.getQueueName());
+          } else {
+            setJobQueue(ret, conf.get(GRIDMIX_DEFAULT_QUEUE));
+          }
+
+          return ret;
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+
     submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
         submissionMillis, TimeUnit.MILLISECONDS);
-    this.jobdesc = jobdesc;
-    this.seq = seq;
     outdir = new Path(outRoot, "" + seq);
   }
 
-  protected GridmixJob(Configuration conf, long submissionMillis, String name)
-      throws IOException {
-    job = new Job(conf, name);
+  protected GridmixJob(final Configuration conf, long submissionMillis, 
+                       final String name) throws IOException {
     submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
         submissionMillis, TimeUnit.MILLISECONDS);
     jobdesc = null;
     outdir = null;
     seq = -1;
+    ugi = UserGroupInformation.getCurrentUser();
+
+    try {
+      job = this.ugi.doAs(new PrivilegedExceptionAction<Job>() {
+        public Job run() throws IOException {
+          Job ret = new Job(conf, name);
+          ret.getConfiguration().setInt(GRIDMIX_JOB_SEQ, seq);
+          setJobQueue(ret, conf.get(GRIDMIX_DEFAULT_QUEUE));
+          return ret;
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public UserGroupInformation getUgi() {
+    return ugi;
   }
 
   public String toString() {
@@ -157,27 +207,29 @@ class GridmixJob implements Callable<Job
     return jobdesc;
   }
 
-  public Job call() throws IOException, InterruptedException,
-                           ClassNotFoundException {
-    job.setMapperClass(GridmixMapper.class);
-    job.setReducerClass(GridmixReducer.class);
-    job.setNumReduceTasks(jobdesc.getNumberReduces());
-    job.setMapOutputKeyClass(GridmixKey.class);
-    job.setMapOutputValueClass(GridmixRecord.class);
-    job.setSortComparatorClass(GridmixKey.Comparator.class);
-    job.setGroupingComparatorClass(SpecGroupingComparator.class);
-    job.setInputFormatClass(GridmixInputFormat.class);
-    job.setOutputFormatClass(RawBytesOutputFormat.class);
-    job.setPartitionerClass(DraftPartitioner.class);
-    job.setJarByClass(GridmixJob.class);
-    job.getConfiguration().setInt("gridmix.job.seq", seq);
-    job.getConfiguration().set(ORIGNAME, null == jobdesc.getJobID()
-        ? "<unknown>" : jobdesc.getJobID().toString());
-    job.getConfiguration().setBoolean(Job.USED_GENERIC_PARSER, true);
-    FileInputFormat.addInputPath(job, new Path("ignored"));
-    FileOutputFormat.setOutputPath(job, outdir);
-    job.submit();
-    return job;
+  static void pushDescription(int seq, List<InputSplit> splits) {
+    if (null != descCache.putIfAbsent(seq, splits)) {
+      throw new IllegalArgumentException("Description exists for id " + seq);
+    }
+  }
+
+  static List<InputSplit> pullDescription(JobContext jobCtxt) {
+    return pullDescription(GridmixJob.getJobSeqId(jobCtxt));
+  }
+  
+  static List<InputSplit> pullDescription(int seq) {
+    return descCache.remove(seq);
+  }
+
+  static void clearAll() {
+    descCache.clear();
+  }
+
+  void buildSplits(FilePool inputDir) throws IOException {
+
+  }
+  static int getJobSeqId(JobContext job) {
+    return job.getConfiguration().getInt(GRIDMIX_JOB_SEQ,-1);
   }
 
   public static class DraftPartitioner<V> extends Partitioner<GridmixKey,V> {
@@ -229,204 +281,6 @@ class GridmixJob implements Callable<Job
     }
   }
 
-  public static class GridmixMapper
-      extends Mapper<NullWritable,GridmixRecord,GridmixKey,GridmixRecord> {
-
-    private double acc;
-    private double ratio;
-    private final ArrayList<RecordFactory> reduces =
-      new ArrayList<RecordFactory>();
-    private final Random r = new Random();
-
-    private final GridmixKey key = new GridmixKey();
-    private final GridmixRecord val = new GridmixRecord();
-
-    @Override
-    protected void setup(Context ctxt)
-        throws IOException, InterruptedException {
-      final Configuration conf = ctxt.getConfiguration();
-      final GridmixSplit split = (GridmixSplit) ctxt.getInputSplit();
-      final int maps = split.getMapCount();
-      final long[] reduceBytes = split.getOutputBytes();
-      final long[] reduceRecords = split.getOutputRecords();
-
-      long totalRecords = 0L;
-      final int nReduces = ctxt.getNumReduceTasks();
-      if (nReduces > 0) {
-        int idx = 0;
-        int id = split.getId();
-        for (int i = 0; i < nReduces; ++i) {
-          final GridmixKey.Spec spec = new GridmixKey.Spec();
-          if (i == id) {
-            spec.bytes_out = split.getReduceBytes(idx);
-            spec.rec_out = split.getReduceRecords(idx);
-            ++idx;
-            id += maps;
-          }
-          reduces.add(new IntermediateRecordFactory(
-              new AvgRecordFactory(reduceBytes[i], reduceRecords[i], conf),
-              i, reduceRecords[i], spec, conf));
-          totalRecords += reduceRecords[i];
-        }
-      } else {
-        reduces.add(new AvgRecordFactory(reduceBytes[0], reduceRecords[0],
-              conf));
-        totalRecords = reduceRecords[0];
-      }
-      final long splitRecords = split.getInputRecords();
-      final long inputRecords = splitRecords <= 0 && split.getLength() >= 0
-        ? Math.max(1,
-          split.getLength() / conf.getInt("gridmix.missing.rec.size", 64*1024))
-        : splitRecords;
-      ratio = totalRecords / (1.0 * inputRecords);
-      acc = 0.0;
-    }
-
-    @Override
-    public void map(NullWritable ignored, GridmixRecord rec,
-        Context context) throws IOException, InterruptedException {
-      acc += ratio;
-      while (acc >= 1.0 && !reduces.isEmpty()) {
-        key.setSeed(r.nextLong());
-        val.setSeed(r.nextLong());
-        final int idx = r.nextInt(reduces.size());
-        final RecordFactory f = reduces.get(idx);
-        if (!f.next(key, val)) {
-          reduces.remove(idx);
-          continue;
-        }
-        context.write(key, val);
-        acc -= 1.0;
-      }
-    }
-
-    @Override
-    public void cleanup(Context context)
-        throws IOException, InterruptedException {
-      for (RecordFactory factory : reduces) {
-        key.setSeed(r.nextLong());
-        while (factory.next(key, val)) {
-          context.write(key, val);
-          key.setSeed(r.nextLong());
-        }
-      }
-    }
-  }
-
-  public static class GridmixReducer
-      extends Reducer<GridmixKey,GridmixRecord,NullWritable,GridmixRecord> {
-
-    private final Random r = new Random();
-    private final GridmixRecord val = new GridmixRecord();
-
-    private double acc;
-    private double ratio;
-    private RecordFactory factory;
-
-    @Override
-    protected void setup(Context context)
-        throws IOException, InterruptedException {
-      if (!context.nextKey() ||
-           context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
-        throw new IOException("Missing reduce spec");
-      }
-      long outBytes = 0L;
-      long outRecords = 0L;
-      long inRecords = 0L;
-      for (GridmixRecord ignored : context.getValues()) {
-        final GridmixKey spec = context.getCurrentKey();
-        inRecords += spec.getReduceInputRecords();
-        outBytes += spec.getReduceOutputBytes();
-        outRecords += spec.getReduceOutputRecords();
-      }
-      if (0 == outRecords && inRecords > 0) {
-        LOG.info("Spec output bytes w/o records. Using input record count");
-        outRecords = inRecords;
-      }
-      factory =
-        new AvgRecordFactory(outBytes, outRecords, context.getConfiguration());
-      ratio = outRecords / (1.0 * inRecords);
-      acc = 0.0;
-    }
-    @Override
-    protected void reduce(GridmixKey key, Iterable<GridmixRecord> values,
-        Context context) throws IOException, InterruptedException {
-      for (GridmixRecord ignored : values) {
-        acc += ratio;
-        while (acc >= 1.0 && factory.next(null, val)) {
-          context.write(NullWritable.get(), val);
-          acc -= 1.0;
-        }
-      }
-    }
-    @Override
-    protected void cleanup(Context context)
-        throws IOException, InterruptedException {
-      val.setSeed(r.nextLong());
-      while (factory.next(null, val)) {
-        context.write(NullWritable.get(), val);
-        val.setSeed(r.nextLong());
-      }
-    }
-  }
-
-  static class GridmixRecordReader
-      extends RecordReader<NullWritable,GridmixRecord> {
-
-    private RecordFactory factory;
-    private final Random r = new Random();
-    private final GridmixRecord val = new GridmixRecord();
-
-    public GridmixRecordReader() { }
-
-    @Override
-    public void initialize(InputSplit genericSplit, TaskAttemptContext ctxt)
-            throws IOException, InterruptedException {
-      final GridmixSplit split = (GridmixSplit)genericSplit;
-      final Configuration conf = ctxt.getConfiguration();
-      factory = new ReadRecordFactory(split.getLength(),
-          split.getInputRecords(), new FileQueue(split, conf), conf);
-    }
-
-    @Override
-    public boolean nextKeyValue() throws IOException {
-      val.setSeed(r.nextLong());
-      return factory.next(null, val);
-    }
-    @Override
-    public float getProgress() throws IOException {
-      return factory.getProgress();
-    }
-    @Override
-    public NullWritable getCurrentKey() {
-      return NullWritable.get();
-    }
-    @Override
-    public GridmixRecord getCurrentValue() {
-      return val;
-    }
-    @Override
-    public void close() throws IOException {
-      factory.close();
-    }
-  }
-
-  static class GridmixInputFormat
-      extends InputFormat<NullWritable,GridmixRecord> {
-
-    @Override
-    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
-      return pullDescription(jobCtxt.getConfiguration().getInt(
-            "gridmix.job.seq", -1));
-    }
-    @Override
-    public RecordReader<NullWritable,GridmixRecord> createRecordReader(
-        InputSplit split, final TaskAttemptContext taskContext)
-        throws IOException {
-      return new GridmixRecordReader();
-    }
-  }
-
   static class RawBytesOutputFormat<K>
       extends FileOutputFormat<K,GridmixRecord> {
 
@@ -450,74 +304,4 @@ class GridmixJob implements Callable<Job
       };
     }
   }
-
-  // TODO replace with ThreadLocal submitter?
-  private static final ConcurrentHashMap<Integer,List<InputSplit>> descCache =
-    new ConcurrentHashMap<Integer,List<InputSplit>>();
-
-  static void pushDescription(int seq, List<InputSplit> splits) {
-    if (null != descCache.putIfAbsent(seq, splits)) {
-      throw new IllegalArgumentException("Description exists for id " + seq);
-    }
-  }
-
-  static List<InputSplit> pullDescription(int seq) {
-    return descCache.remove(seq);
-  }
-
-  // not nesc when TL
-  static void clearAll() {
-    descCache.clear();
-  }
-
-  void buildSplits(FilePool inputDir) throws IOException {
-    long mapInputBytesTotal = 0L;
-    long mapOutputBytesTotal = 0L;
-    long mapOutputRecordsTotal = 0L;
-    final JobStory jobdesc = getJobDesc();
-    if (null == jobdesc) {
-      return;
-    }
-    final int maps = jobdesc.getNumberMaps();
-    final int reds = jobdesc.getNumberReduces();
-    for (int i = 0; i < maps; ++i) {
-      final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
-      mapInputBytesTotal += info.getInputBytes();
-      mapOutputBytesTotal += info.getOutputBytes();
-      mapOutputRecordsTotal += info.getOutputRecords();
-    }
-    final double[] reduceRecordRatio = new double[reds];
-    final double[] reduceByteRatio = new double[reds];
-    for (int i = 0; i < reds; ++i) {
-      final TaskInfo info = jobdesc.getTaskInfo(TaskType.REDUCE, i);
-      reduceByteRatio[i] = info.getInputBytes() / (1.0 * mapOutputBytesTotal);
-      reduceRecordRatio[i] =
-        info.getInputRecords() / (1.0 * mapOutputRecordsTotal);
-    }
-    final InputStriper striper = new InputStriper(inputDir, mapInputBytesTotal);
-    final List<InputSplit> splits = new ArrayList<InputSplit>();
-    for (int i = 0; i < maps; ++i) {
-      final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
-      final long[] specBytes = new long[nSpec];
-      final long[] specRecords = new long[nSpec];
-      for (int j = 0; j < nSpec; ++j) {
-        final TaskInfo info =
-          jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps);
-        specBytes[j] = info.getOutputBytes();
-        specRecords[j] = info.getOutputRecords();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
-              i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
-        }
-      }
-      final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
-      splits.add(new GridmixSplit(striper.splitFor(inputDir,
-              info.getInputBytes(), 3), maps, i,
-            info.getInputBytes(), info.getInputRecords(),
-            info.getOutputBytes(), info.getOutputRecords(),
-            reduceByteRatio, reduceRecordRatio, specBytes, specRecords));
-    }
-    pushDescription(id(), splits);
-  }
-
 }

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java?rev=963986&r1=963985&r2=963986&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java Wed Jul 14 09:22:29 2010
@@ -32,9 +32,10 @@ enum GridmixJobSubmissionPolicy {
     @Override
     public JobFactory<ClusterStats> createJobFactory(
       JobSubmitter submitter, JobStoryProducer producer, Path scratchDir,
-      Configuration conf, CountDownLatch startFlag) throws IOException {
+      Configuration conf, CountDownLatch startFlag, UserResolver userResolver)
+      throws IOException {
       return new ReplayJobFactory(
-        submitter, producer, scratchDir, conf, startFlag);
+        submitter, producer, scratchDir, conf, startFlag, userResolver);
     }
   },
 
@@ -42,9 +43,10 @@ enum GridmixJobSubmissionPolicy {
     @Override
     public JobFactory<ClusterStats> createJobFactory(
       JobSubmitter submitter, JobStoryProducer producer, Path scratchDir,
-      Configuration conf, CountDownLatch startFlag) throws IOException {
+      Configuration conf, CountDownLatch startFlag, UserResolver userResolver)
+      throws IOException {
       return new StressJobFactory(
-        submitter, producer, scratchDir, conf, startFlag);
+        submitter, producer, scratchDir, conf, startFlag, userResolver);
     }
   },
 
@@ -52,14 +54,10 @@ enum GridmixJobSubmissionPolicy {
     @Override
     public JobFactory<JobStats> createJobFactory(
       JobSubmitter submitter, JobStoryProducer producer, Path scratchDir,
-      Configuration conf, CountDownLatch startFlag) throws IOException {
+      Configuration conf, CountDownLatch startFlag, UserResolver userResolver)
+      throws IOException {
       return new SerialJobFactory(
-        submitter, producer, scratchDir, conf, startFlag);
-    }
-
-    @Override
-    public int getPollingInterval() {
-      return 0;
+        submitter, producer, scratchDir, conf, startFlag, userResolver);
     }
   };
 
@@ -76,7 +74,8 @@ enum GridmixJobSubmissionPolicy {
 
   public abstract JobFactory createJobFactory(
     JobSubmitter submitter, JobStoryProducer producer, Path scratchDir,
-    Configuration conf, CountDownLatch startFlag) throws IOException;
+    Configuration conf, CountDownLatch startFlag, UserResolver userResolver)
+    throws IOException;
 
   public int getPollingInterval() {
     return pollingInterval;

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java?rev=963986&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java Wed Jul 14 09:22:29 2010
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.rumen.JobStory;
+
+import java.io.IOException;
+
+public enum JobCreator {
+
+  LOADJOB("LOADJOB") {
+    @Override
+    public GridmixJob createGridmixJob(
+      Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
+      UserGroupInformation ugi, int seq) throws IOException {
+      return new LoadJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
+    }},
+
+  SLEEPJOB("SLEEPJOB") {
+    @Override
+    public GridmixJob createGridmixJob(
+      Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
+      UserGroupInformation ugi, int seq) throws IOException {
+      return new SleepJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
+    }};
+
+  public static final String GRIDMIX_JOB_TYPE = "gridmix.job.type";
+
+
+  private final String name;
+
+  JobCreator(String name) {
+    this.name = name;
+  }
+
+  public abstract GridmixJob createGridmixJob(
+    final Configuration conf, long submissionMillis, final JobStory jobdesc,
+    Path outRoot, UserGroupInformation ugi, final int seq) throws IOException;
+
+  public static JobCreator getPolicy(
+    Configuration conf, JobCreator defaultPolicy) {
+    return conf.getEnum(GRIDMIX_JOB_TYPE, defaultPolicy);
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=963986&r1=963985&r2=963986&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Wed Jul 14 09:22:29 2010
@@ -58,6 +58,8 @@ abstract class JobFactory<T> implements 
   protected final AtomicInteger sequence;
   protected final JobSubmitter submitter;
   protected final CountDownLatch startFlag;
+  protected final UserResolver userResolver;
+  protected final JobCreator jobCreator;
   protected volatile IOException error = null;
   protected final JobStoryProducer jobProducer;
   protected final ReentrantLock lock = new ReentrantLock(true);
@@ -73,10 +75,10 @@ abstract class JobFactory<T> implements 
    * @throws java.io.IOException
    */
   public JobFactory(JobSubmitter submitter, InputStream jobTrace,
-      Path scratch, Configuration conf, CountDownLatch startFlag)
-      throws IOException {
+      Path scratch, Configuration conf, CountDownLatch startFlag,
+      UserResolver userResolver) throws IOException {
     this(submitter, new ZombieJobProducer(jobTrace, null), scratch, conf,
-        startFlag);
+        startFlag, userResolver);
   }
 
   /**
@@ -88,7 +90,8 @@ abstract class JobFactory<T> implements 
    * @param startFlag Latch released from main to start pipeline
    */
   protected JobFactory(JobSubmitter submitter, JobStoryProducer jobProducer,
-      Path scratch, Configuration conf, CountDownLatch startFlag) {
+      Path scratch, Configuration conf, CountDownLatch startFlag,
+      UserResolver userResolver) {
     sequence = new AtomicInteger(0);
     this.scratch = scratch;
     this.rateFactor = conf.getFloat(Gridmix.GRIDMIX_SUB_MUL, 1.0f);
@@ -97,6 +100,11 @@ abstract class JobFactory<T> implements 
     this.submitter = submitter;
     this.startFlag = startFlag;
     this.rThread = createReaderThread();
+    if(LOG.isDebugEnabled()) {
+      LOG.debug(" The submission thread name is " + rThread.getName());
+    }
+    this.userResolver = userResolver;
+    this.jobCreator = JobCreator.getPolicy(conf, JobCreator.LOADJOB);
   }
 
   static class MinTaskInfo extends TaskInfo {

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java?rev=963986&r1=963985&r2=963986&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java Wed Jul 14 09:22:29 2010
@@ -40,7 +40,8 @@ class JobSubmitter implements Gridmix.Co
 
   public static final Log LOG = LogFactory.getLog(JobSubmitter.class);
 
-  final Semaphore sem;
+  private final Semaphore sem;
+  private final Statistics statistics;
   private final FilePool inputDir;
   private final JobMonitor monitor;
   private final ExecutorService sched;
@@ -56,14 +57,16 @@ class JobSubmitter implements Gridmix.Co
    *   See {@link Gridmix#GRIDMIX_QUE_DEP}.
    * @param inputDir Set of files from which split data may be mined for
    *   synthetic jobs.
+   * @param statistics
    */
   public JobSubmitter(JobMonitor monitor, int threads, int queueDepth,
-      FilePool inputDir) {
+      FilePool inputDir, Statistics statistics) {
     sem = new Semaphore(queueDepth);
     sched = new ThreadPoolExecutor(threads, threads, 0L,
         TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
     this.inputDir = inputDir;
     this.monitor = monitor;
+    this.statistics = statistics;
   }
 
   /**
@@ -81,11 +84,13 @@ class JobSubmitter implements Gridmix.Co
         try {
           job.buildSplits(inputDir);
         } catch (IOException e) {
-          LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+          LOG.warn("Failed to submit " + job.getJob().getJobName() + " as " 
+                   + job.getUgi(), e);
           monitor.submissionFailed(job.getJob());
           return;
         } catch (Exception e) {
-          LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+          LOG.warn("Failed to submit " + job.getJob().getJobName() + " as " 
+                   + job.getUgi(), e);
           monitor.submissionFailed(job.getJob());
           return;
         }
@@ -98,10 +103,12 @@ class JobSubmitter implements Gridmix.Co
         try {
           // submit job
           monitor.add(job.call());
+          statistics.addJobStats(job.getJob(), job.getJobDesc());
           LOG.debug("SUBMIT " + job + "@" + System.currentTimeMillis() +
               " (" + job.getJob().getJobID() + ")");
         } catch (IOException e) {
-          LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+          LOG.warn("Failed to submit " + job.getJob().getJobName() + " as " 
+                   + job.getUgi(), e);
           if (e.getCause() instanceof ClosedByInterruptException) {
             throw new InterruptedException("Failed to submit " +
                 job.getJob().getJobName());
@@ -117,12 +124,10 @@ class JobSubmitter implements Gridmix.Co
         GridmixJob.pullDescription(job.id());
         Thread.currentThread().interrupt();
         monitor.submissionFailed(job.getJob());
-        return;
       } catch(Exception e) {
         //Due to some exception job wasnt submitted.
         LOG.info(" Job " + job.getJob() + " submission failed " , e);
         monitor.submissionFailed(job.getJob());
-        return;
       } finally {
         sem.release();
       }
@@ -147,6 +152,7 @@ class JobSubmitter implements Gridmix.Co
 
   /**
    * (Re)scan the set of input files from which splits are derived.
+   * @throws java.io.IOException
    */
   public void refreshFilePool() throws IOException {
     inputDir.refresh();

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java?rev=963986&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java Wed Jul 14 09:22:29 2010
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Synthetic job generated from a trace description.
+ */
+class LoadJob extends GridmixJob {
+
+  public static final Log LOG = LogFactory.getLog(LoadJob.class);
+
+  public LoadJob(final Configuration conf, long submissionMillis, 
+                 final JobStory jobdesc, Path outRoot, UserGroupInformation ugi,
+                 final int seq) throws IOException {
+    super(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
+  }
+
+  public Job call() throws IOException, InterruptedException,
+                           ClassNotFoundException {
+    ugi.doAs(
+      new PrivilegedExceptionAction<Job>() {
+        public Job run() throws IOException, ClassNotFoundException,
+                                InterruptedException {
+          job.setMapperClass(LoadMapper.class);
+          job.setReducerClass(LoadReducer.class);
+          job.setNumReduceTasks(jobdesc.getNumberReduces());
+          job.setMapOutputKeyClass(GridmixKey.class);
+          job.setMapOutputValueClass(GridmixRecord.class);
+          job.setSortComparatorClass(GridmixKey.Comparator.class);
+          job.setGroupingComparatorClass(SpecGroupingComparator.class);
+          job.setInputFormatClass(LoadInputFormat.class);
+          job.setOutputFormatClass(RawBytesOutputFormat.class);
+          job.setPartitionerClass(DraftPartitioner.class);
+          job.setJarByClass(LoadJob.class);
+          job.getConfiguration().setBoolean(Job.USED_GENERIC_PARSER, true);
+          FileOutputFormat.setOutputPath(job, outdir);
+          job.submit();
+          return job;
+        }
+      });
+
+    return job;
+  }
+
+  public static class LoadMapper
+  extends Mapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> {
+
+    private double acc;
+    private double ratio;
+    private final ArrayList<RecordFactory> reduces =
+      new ArrayList<RecordFactory>();
+    private final Random r = new Random();
+
+    private final GridmixKey key = new GridmixKey();
+    private final GridmixRecord val = new GridmixRecord();
+
+    @Override
+    protected void setup(Context ctxt) 
+    throws IOException, InterruptedException {
+      final Configuration conf = ctxt.getConfiguration();
+      final LoadSplit split = (LoadSplit) ctxt.getInputSplit();
+      final int maps = split.getMapCount();
+      final long[] reduceBytes = split.getOutputBytes();
+      final long[] reduceRecords = split.getOutputRecords();
+
+      long totalRecords = 0L;
+      final int nReduces = ctxt.getNumReduceTasks();
+      if (nReduces > 0) {
+        int idx = 0;
+        int id = split.getId();
+        for (int i = 0; i < nReduces; ++i) {
+          final GridmixKey.Spec spec = new GridmixKey.Spec();
+          if (i == id) {
+            spec.bytes_out = split.getReduceBytes(idx);
+            spec.rec_out = split.getReduceRecords(idx);
+            ++idx;
+            id += maps;
+          }
+          reduces.add(new IntermediateRecordFactory(
+              new AvgRecordFactory(reduceBytes[i], reduceRecords[i], conf),
+              i, reduceRecords[i], spec, conf));
+          totalRecords += reduceRecords[i];
+        }
+      } else {
+        reduces.add(new AvgRecordFactory(reduceBytes[0], reduceRecords[0],
+                                         conf));
+        totalRecords = reduceRecords[0];
+      }
+      final long splitRecords = split.getInputRecords();
+      int missingRecSize = 
+        conf.getInt(AvgRecordFactory.GRIDMIX_MISSING_REC_SIZE, 64*1024);
+      final long inputRecords = 
+        (splitRecords <= 0 && split.getLength() >= 0)
+        ? Math.max(1, split.getLength() / missingRecSize)
+        : splitRecords;
+      ratio = totalRecords / (1.0 * inputRecords);
+      acc = 0.0;
+    }
+
+    @Override
+    public void map(NullWritable ignored, GridmixRecord rec,
+                    Context context) throws IOException, InterruptedException {
+      acc += ratio;
+      while (acc >= 1.0 && !reduces.isEmpty()) {
+        key.setSeed(r.nextLong());
+        val.setSeed(r.nextLong());
+        final int idx = r.nextInt(reduces.size());
+        final RecordFactory f = reduces.get(idx);
+        if (!f.next(key, val)) {
+          reduces.remove(idx);
+          continue;
+        }
+        context.write(key, val);
+        acc -= 1.0;
+      }
+    }
+
+    @Override
+    public void cleanup(Context context) 
+    throws IOException, InterruptedException {
+      for (RecordFactory factory : reduces) {
+        key.setSeed(r.nextLong());
+        while (factory.next(key, val)) {
+          context.write(key, val);
+          key.setSeed(r.nextLong());
+        }
+      }
+    }
+  }
+
+  public static class LoadReducer
+  extends Reducer<GridmixKey,GridmixRecord,NullWritable,GridmixRecord> {
+
+    private final Random r = new Random();
+    private final GridmixRecord val = new GridmixRecord();
+
+    private double acc;
+    private double ratio;
+    private RecordFactory factory;
+
+    @Override
+    protected void setup(Context context)
+    throws IOException, InterruptedException {
+      if (!context.nextKey() 
+          || context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
+        throw new IOException("Missing reduce spec");
+      }
+      long outBytes = 0L;
+      long outRecords = 0L;
+      long inRecords = 0L;
+      for (GridmixRecord ignored : context.getValues()) {
+        final GridmixKey spec = context.getCurrentKey();
+        inRecords += spec.getReduceInputRecords();
+        outBytes += spec.getReduceOutputBytes();
+        outRecords += spec.getReduceOutputRecords();
+      }
+      if (0 == outRecords && inRecords > 0) {
+        LOG.info("Spec output bytes w/o records. Using input record count");
+        outRecords = inRecords;
+      }
+      factory =
+        new AvgRecordFactory(outBytes, outRecords, context.getConfiguration());
+      ratio = outRecords / (1.0 * inRecords);
+      acc = 0.0;
+    }
+    @Override
+    protected void reduce(GridmixKey key, Iterable<GridmixRecord> values,
+                          Context context) 
+    throws IOException, InterruptedException {
+      for (GridmixRecord ignored : values) {
+        acc += ratio;
+        while (acc >= 1.0 && factory.next(null, val)) {
+          context.write(NullWritable.get(), val);
+          acc -= 1.0;
+        }
+      }
+    }
+    @Override
+    protected void cleanup(Context context)
+    throws IOException, InterruptedException {
+      val.setSeed(r.nextLong());
+      while (factory.next(null, val)) {
+        context.write(NullWritable.get(), val);
+        val.setSeed(r.nextLong());
+      }
+    }
+  }
+
+  static class LoadRecordReader
+  extends RecordReader<NullWritable,GridmixRecord> {
+
+    private RecordFactory factory;
+    private final Random r = new Random();
+    private final GridmixRecord val = new GridmixRecord();
+
+    public LoadRecordReader() { }
+
+    @Override
+    public void initialize(InputSplit genericSplit, TaskAttemptContext ctxt)
+    throws IOException, InterruptedException {
+      final LoadSplit split = (LoadSplit)genericSplit;
+      final Configuration conf = ctxt.getConfiguration();
+      factory = 
+        new ReadRecordFactory(split.getLength(), split.getInputRecords(), 
+                              new FileQueue(split, conf), conf);
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException {
+      val.setSeed(r.nextLong());
+      return factory.next(null, val);
+    }
+    @Override
+    public float getProgress() throws IOException {
+      return factory.getProgress();
+    }
+    @Override
+    public NullWritable getCurrentKey() {
+      return NullWritable.get();
+    }
+    @Override
+    public GridmixRecord getCurrentValue() {
+      return val;
+    }
+    @Override
+    public void close() throws IOException {
+      factory.close();
+    }
+  }
+
+  static class LoadInputFormat
+  extends InputFormat<NullWritable,GridmixRecord> {
+
+    @Override
+    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+      return pullDescription(jobCtxt);
+    }
+    @Override
+    public RecordReader<NullWritable,GridmixRecord> createRecordReader(
+        InputSplit split, final TaskAttemptContext taskContext)
+        throws IOException {
+      return new LoadRecordReader();
+    }
+  }
+
+  @Override
+  void buildSplits(FilePool inputDir) throws IOException {
+    long mapInputBytesTotal = 0L;
+    long mapOutputBytesTotal = 0L;
+    long mapOutputRecordsTotal = 0L;
+    final JobStory jobdesc = getJobDesc();
+    if (null == jobdesc) {
+      return;
+    }
+    final int maps = jobdesc.getNumberMaps();
+    final int reds = jobdesc.getNumberReduces();
+    for (int i = 0; i < maps; ++i) {
+      final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
+      mapInputBytesTotal += info.getInputBytes();
+      mapOutputBytesTotal += info.getOutputBytes();
+      mapOutputRecordsTotal += info.getOutputRecords();
+    }
+    final double[] reduceRecordRatio = new double[reds];
+    final double[] reduceByteRatio = new double[reds];
+    for (int i = 0; i < reds; ++i) {
+      final TaskInfo info = jobdesc.getTaskInfo(TaskType.REDUCE, i);
+      reduceByteRatio[i] = info.getInputBytes() / (1.0 * mapOutputBytesTotal);
+      reduceRecordRatio[i] =
+        info.getInputRecords() / (1.0 * mapOutputRecordsTotal);
+    }
+    final InputStriper striper = new InputStriper(inputDir, mapInputBytesTotal);
+    final List<InputSplit> splits = new ArrayList<InputSplit>();
+    for (int i = 0; i < maps; ++i) {
+      final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
+      final long[] specBytes = new long[nSpec];
+      final long[] specRecords = new long[nSpec];
+      for (int j = 0; j < nSpec; ++j) {
+        final TaskInfo info =
+          jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps);
+        specBytes[j] = info.getOutputBytes();
+        specRecords[j] = info.getOutputRecords();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
+                    i + j * maps, info.getOutputRecords(), 
+                    info.getOutputBytes()));
+        }
+      }
+      final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
+      splits.add(
+        new LoadSplit(striper.splitFor(inputDir, info.getInputBytes(), 3), 
+                      maps, i, info.getInputBytes(), info.getInputRecords(),
+                      info.getOutputBytes(), info.getOutputRecords(),
+                      reduceByteRatio, reduceRecordRatio, specBytes, 
+                      specRecords));
+    }
+    pushDescription(id(), splits);
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java?rev=963986&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java Wed Jul 14 09:22:29 2010
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+class LoadSplit extends CombineFileSplit {
+  private int id;
+  private int nSpec;
+  private int maps;
+  private int reduces;
+  private long inputRecords;
+  private long outputBytes;
+  private long outputRecords;
+  private long maxMemory;
+  private double[] reduceBytes = new double[0];
+  private double[] reduceRecords = new double[0];
+
+  // Spec for reduces id mod this
+  private long[] reduceOutputBytes = new long[0];
+  private long[] reduceOutputRecords = new long[0];
+
+  LoadSplit() {
+    super();
+  }
+
+  public LoadSplit(CombineFileSplit cfsplit, int maps, int id, long inputBytes, 
+                   long inputRecords, long outputBytes, long outputRecords, 
+                   double[] reduceBytes, double[] reduceRecords, 
+                   long[] reduceOutputBytes, long[] reduceOutputRecords)
+  throws IOException {
+    super(cfsplit);
+    this.id = id;
+    this.maps = maps;
+    reduces = reduceBytes.length;
+    this.inputRecords = inputRecords;
+    this.outputBytes = outputBytes;
+    this.outputRecords = outputRecords;
+    this.reduceBytes = reduceBytes;
+    this.reduceRecords = reduceRecords;
+    nSpec = reduceOutputBytes.length;
+    this.reduceOutputBytes = reduceOutputBytes;
+    this.reduceOutputRecords = reduceOutputRecords;
+  }
+
+  public int getId() {
+    return id;
+  }
+  public int getMapCount() {
+    return maps;
+  }
+  public long getInputRecords() {
+    return inputRecords;
+  }
+  public long[] getOutputBytes() {
+    if (0 == reduces) {
+      return new long[] { outputBytes };
+    }
+    final long[] ret = new long[reduces];
+    for (int i = 0; i < reduces; ++i) {
+      ret[i] = Math.round(outputBytes * reduceBytes[i]);
+    }
+    return ret;
+  }
+  public long[] getOutputRecords() {
+    if (0 == reduces) {
+      return new long[] { outputRecords };
+    }
+    final long[] ret = new long[reduces];
+    for (int i = 0; i < reduces; ++i) {
+      ret[i] = Math.round(outputRecords * reduceRecords[i]);
+    }
+    return ret;
+  }
+  public long getReduceBytes(int i) {
+    return reduceOutputBytes[i];
+  }
+  public long getReduceRecords(int i) {
+    return reduceOutputRecords[i];
+  }
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    WritableUtils.writeVInt(out, id);
+    WritableUtils.writeVInt(out, maps);
+    WritableUtils.writeVLong(out, inputRecords);
+    WritableUtils.writeVLong(out, outputBytes);
+    WritableUtils.writeVLong(out, outputRecords);
+    WritableUtils.writeVLong(out, maxMemory);
+    WritableUtils.writeVInt(out, reduces);
+    for (int i = 0; i < reduces; ++i) {
+      out.writeDouble(reduceBytes[i]);
+      out.writeDouble(reduceRecords[i]);
+    }
+    WritableUtils.writeVInt(out, nSpec);
+    for (int i = 0; i < nSpec; ++i) {
+      WritableUtils.writeVLong(out, reduceOutputBytes[i]);
+      WritableUtils.writeVLong(out, reduceOutputRecords[i]);
+    }
+  }
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    id = WritableUtils.readVInt(in);
+    maps = WritableUtils.readVInt(in);
+    inputRecords = WritableUtils.readVLong(in);
+    outputBytes = WritableUtils.readVLong(in);
+    outputRecords = WritableUtils.readVLong(in);
+    maxMemory = WritableUtils.readVLong(in);
+    reduces = WritableUtils.readVInt(in);
+    if (reduceBytes.length < reduces) {
+      reduceBytes = new double[reduces];
+      reduceRecords = new double[reduces];
+    }
+    for (int i = 0; i < reduces; ++i) {
+      reduceBytes[i] = in.readDouble();
+      reduceRecords[i] = in.readDouble();
+    }
+    nSpec = WritableUtils.readVInt(in);
+    if (reduceOutputBytes.length < nSpec) {
+      reduceOutputBytes = new long[nSpec];
+      reduceOutputRecords = new long[nSpec];
+    }
+    for (int i = 0; i < nSpec; ++i) {
+      reduceOutputBytes[i] = WritableUtils.readVLong(in);
+      reduceOutputRecords[i] = WritableUtils.readVLong(in);
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java?rev=963986&r1=963985&r2=963986&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java Wed Jul 14 09:22:29 2010
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -41,13 +42,14 @@ import java.util.concurrent.TimeUnit;
    * @param scratch     Directory into which to write output from simulated jobs
    * @param conf        Config passed to all jobs to be submitted
    * @param startFlag   Latch released from main to start pipeline
+   * @param resolver
    * @throws java.io.IOException
    */
   public ReplayJobFactory(
     JobSubmitter submitter, JobStoryProducer jobProducer, Path scratch,
-    Configuration conf, CountDownLatch startFlag)
+    Configuration conf, CountDownLatch startFlag, UserResolver resolver)
     throws IOException {
-    super(submitter, jobProducer, scratch, conf, startFlag);
+    super(submitter, jobProducer, scratch, conf, startFlag, resolver);
   }
 
    
@@ -96,9 +98,12 @@ import java.util.concurrent.TimeUnit;
             }
             last = current;
             submitter.add(
-              new GridmixJob(
+              jobCreator.createGridmixJob(
                 conf, initTime + Math.round(rateFactor * (current - first)),
-                job, scratch,sequence.getAndIncrement()));
+                job, scratch,
+                userResolver.getTargetUgi(
+                  UserGroupInformation.createRemoteUser(job.getUser())), 
+                sequence.getAndIncrement()));
           } catch (IOException e) {
             error = e;
             return;

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RoundRobinUserResolver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RoundRobinUserResolver.java?rev=963986&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RoundRobinUserResolver.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RoundRobinUserResolver.java Wed Jul 14 09:22:29 2010
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.LineReader;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+public class RoundRobinUserResolver implements UserResolver {
+  public static final Log LOG = LogFactory.getLog(RoundRobinUserResolver.class);
+
+  private int uidx = 0;
+  private List<UserGroupInformation> users = Collections.emptyList();
+  private final HashMap<UserGroupInformation,UserGroupInformation> usercache =
+    new HashMap<UserGroupInformation,UserGroupInformation>();
+  
+  /**
+   * Userlist assumes one UGI per line, each UGI matching
+   * &lt;username&gt;,&lt;group&gt;[,group]*
+   */
+  private List<UserGroupInformation> parseUserList(URI userUri, 
+                                                   Configuration conf) 
+  throws IOException {
+    if (null == userUri) {
+      return Collections.emptyList();
+    }
+    
+    final Path userloc = new Path(userUri.toString());
+    final Text rawUgi = new Text();
+    final FileSystem fs = userloc.getFileSystem(conf);
+    final ArrayList<UserGroupInformation> ret = new ArrayList();
+
+    LineReader in = null;
+    try {
+      final ArrayList<String> groups = new ArrayList();
+      in = new LineReader(fs.open(userloc));
+      while (in.readLine(rawUgi) > 0) {
+        int e = rawUgi.find(",");
+        if (e <= 0) {
+          throw new IOException("Missing username: " + rawUgi);
+        }
+        final String username = Text.decode(rawUgi.getBytes(), 0, e);
+        int s = e;
+        while ((e = rawUgi.find(",", ++s)) != -1) {
+          groups.add(Text.decode(rawUgi.getBytes(), s, e - s));
+          s = e;
+        }
+        groups.add(Text.decode(rawUgi.getBytes(), s, rawUgi.getLength() - s));
+        if (groups.size() == 0) {
+          throw new IOException("Missing groups: " + rawUgi);
+        }
+        ret.add(UserGroupInformation.createRemoteUser(username));
+      }
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+    }
+    return ret;
+  }
+
+  @Override
+  public synchronized boolean setTargetUsers(URI userloc, Configuration conf)
+  throws IOException {
+    users = parseUserList(userloc, conf);
+    if (users.size() == 0) {
+      throw new IOException("Empty user list");
+    }
+    usercache.keySet().retainAll(users);
+    return true;
+  }
+
+  @Override
+  public synchronized UserGroupInformation getTargetUgi(
+    UserGroupInformation ugi) {
+    UserGroupInformation ret = usercache.get(ugi);
+    if (null == ret) {
+      ret = users.get(uidx++ % users.size());
+      usercache.put(ugi, ret);
+    }
+    UserGroupInformation val = null;
+    try {
+      val = 
+        UserGroupInformation.createProxyUser(ret.getUserName(), 
+          UserGroupInformation.getLoginUser());
+    } catch (IOException e) {
+      LOG.error("Error while creating the proxy user " ,e);
+    }
+    return val;
+  }
+}
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java?rev=963986&r1=963985&r2=963986&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java Wed Jul 14 09:22:29 2010
@@ -23,6 +23,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.JobStoryProducer;
 import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -48,9 +49,9 @@ public class SerialJobFactory extends Jo
    */
   public SerialJobFactory(
     JobSubmitter submitter, JobStoryProducer jobProducer, Path scratch,
-    Configuration conf, CountDownLatch startFlag)
+    Configuration conf, CountDownLatch startFlag, UserResolver resolver)
     throws IOException {
-    super(submitter, jobProducer, scratch, conf, startFlag);
+    super(submitter, jobProducer, scratch, conf, startFlag, resolver);
   }
 
   @Override
@@ -96,8 +97,11 @@ public class SerialJobFactory extends Jo
               LOG.debug(
                 "Serial mode submitting job " + job.getName());
             }
-            prevJob = new GridmixJob(
-              conf, 0L, job, scratch,sequence.getAndIncrement());
+            prevJob = jobCreator.createGridmixJob(
+              conf, 0L, job, scratch, 
+              userResolver.getTargetUgi(
+                UserGroupInformation.createRemoteUser(job.getUser())),
+              sequence.getAndIncrement());
 
             lock.lock();
             try {