You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:27:11 UTC

svn commit: r1077541 - in /hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src: java/org/apache/hadoop/mapred/gridmix/ test/org/apache/hadoop/mapred/gridmix/

Author: omalley
Date: Fri Mar  4 04:27:10 2011
New Revision: 1077541

URL: http://svn.apache.org/viewvc?rev=1077541&view=rev
Log:
commit f76df7c997be08ec1f040d81401fb3f57d3453ce
Author: Hong Tang <ht...@coatsatfind-lm.local>
Date:   Tue Jul 13 01:14:24 2010 -0700

    MAPREDUCE-1936. Make Gridmix3 more customizable. From https://issues.apache.org/jira/secure/attachment/12449310/mr-1936-yhadoop-20.1xx.patch.  (htang)
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1936. Make Gridmix3 more customizable. (htang)

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomAlgorithms.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomAlgorithm.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java?rev=1077541&r1=1077540&r2=1077541&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java Fri Mar  4 04:27:10 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.mapred.gridmix;
 
 import java.io.IOException;
+
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -38,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.gridmix.RandomAlgorithms.Selector;
 
 /**
  * Class for caching a pool of input data to be used by synthetic jobs for
@@ -223,22 +225,15 @@ class FilePool {
         return getSize();
       }
 
-      IndexMapper mapping;
-      if ((curdir.size() < 200) || ((double) targetSize / getSize() > 0.5)) {
-        mapping = new DenseIndexMapper(curdir.size());
-      } else {
-        mapping = new SparseIndexMapper();
-      }
-
+      Selector selector = new Selector(curdir.size(), (double) targetSize
+          / getSize(), rand);
+      
       ArrayList<Integer> selected = new ArrayList<Integer>();
       long ret = 0L;
-      int poolSize = curdir.size();
       do {
-        int pos = rand.nextInt(poolSize);
-        int index = mapping.get(pos);
+        int index = selector.next();
         selected.add(index);
         ret += curdir.get(index).getLen();
-        mapping.swap(pos, --poolSize);
       } while (ret < targetSize);
 
       for (Integer i : selected) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=1077541&r1=1077540&r2=1077541&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Fri Mar  4 04:27:10 2011
@@ -399,16 +399,31 @@ public class Gridmix extends Configured 
     out.println("Usage: gridmix [-generate <MiB>] [-users URI] <iopath> <trace>");
     out.println("  e.g. gridmix -generate 100m foo -");
     out.println("Configuration parameters:");
-    out.printf("       %-42s : Output directory\n", GRIDMIX_OUT_DIR);
-    out.printf("       %-42s : Submitting threads\n", GRIDMIX_SUB_THR);
-    out.printf("       %-42s : Queued job desc\n", GRIDMIX_QUE_DEP);
-    out.printf("       %-42s : Key fraction of rec\n",
+    out.printf("       %-48s : Output directory\n", GRIDMIX_OUT_DIR);
+    out.printf("       %-48s : Submitting threads\n", GRIDMIX_SUB_THR);
+    out.printf("       %-48s : Queued job desc\n", GRIDMIX_QUE_DEP);
+    out.printf("       %-48s : Key fraction of rec\n",
         AvgRecordFactory.GRIDMIX_KEY_FRC);
-    out.printf("       %-42s : User resolution class\n", GRIDMIX_USR_RSV);
-    out.printf("       %-42s : Enable/disable using queues in trace\n",
+    out.printf("       %-48s : User resolution class\n", GRIDMIX_USR_RSV);
+    out.printf("       %-48s : Enable/disable using queues in trace\n",
         GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE);
-    out.printf("       %-42s : Default queue\n",
+    out.printf("       %-48s : Default queue\n",
         GridmixJob.GRIDMIX_DEFAULT_QUEUE);
+    out.printf("       %-48s : Throttling - jobs vs task-tracker ratio\n",
+        StressJobFactory.CONF_MAX_JOB_TRACKER_RATIO);
+    out.printf("       %-48s : Throttling - maps vs map-slot ratio\n",
+        StressJobFactory.CONF_OVERLOAD_MAPTASK_MAPSLOT_RATIO);
+    out.printf("       %-48s : Throttling - reduces vs reduce-slot ratio\n",
+        StressJobFactory.CONF_OVERLOAD_REDUCETASK_REDUCESLOT_RATIO);
+    out.printf("       %-48s : Throttling - map-slot share per job\n",
+        StressJobFactory.CONF_MAX_MAPSLOT_SHARE_PER_JOB);
+    out.printf("       %-48s : Throttling - reduce-slot share per job\n",
+        StressJobFactory.CONF_MAX_REDUCESLOT_SHARE_PER_JOB);
+    out.printf("       %-48s : Whether to ignore reduce tasks for sleep jobs\n",
+        SleepJob.SLEEPJOB_MAPTASK_ONLY);
+    out.printf("       %-48s : Number of fake locations for sleep job map tasks\n",
+        JobCreator.SLEEPJOB_RANDOM_LOCATIONS);
+    
     
     StringBuilder sb = new StringBuilder();
     String sep = "";
@@ -418,8 +433,13 @@ public class Gridmix extends Configured 
       sb.append(policy.name());
       sep = "|";
     }
-    out.printf("       %-40s : Job submission policy (%s)\n",
+    out.printf("       %-48s : Job submission policy (%s)\n",
         GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, sb.toString());
+    out.printf("       %-48s : Maximum map task runtime in mili-sec\n",
+        SleepJob.GRIDMIX_SLEEP_MAX_MAP_TIME);
+    out.printf("       %-48s : Maximum reduce task runtime in mili-sec (merge+reduce)\n",
+        SleepJob.GRIDMIX_SLEEP_MAX_REDUCE_TIME);
+
   }
 
   /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java?rev=1077541&r1=1077540&r2=1077541&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java Fri Mar  4 04:27:10 2011
@@ -20,14 +20,23 @@ package org.apache.hadoop.mapred.gridmix
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.gridmix.GenerateData.GenSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.rumen.JobStory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 public enum JobCreator {
 
-  LOADJOB("LOADJOB") {
+  LOADJOB {
     @Override
     public GridmixJob createGridmixJob(
       Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
@@ -35,22 +44,39 @@ public enum JobCreator {
       return new LoadJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
     }},
 
-  SLEEPJOB("SLEEPJOB") {
+  SLEEPJOB {
+    private String[] hosts;
+      
     @Override
     public GridmixJob createGridmixJob(
       Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
       UserGroupInformation ugi, int seq) throws IOException {
-      return new SleepJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
+      int numLocations = conf.getInt(SLEEPJOB_RANDOM_LOCATIONS, 0);
+      if (numLocations < 0) numLocations=0;
+      if ((numLocations > 0) && (hosts == null)) {
+        final JobClient client = new JobClient(new JobConf(conf));
+        ClusterStatus stat = client.getClusterStatus(true);
+        final int nTrackers = stat.getTaskTrackers();
+        final ArrayList<String> hostList = new ArrayList<String>(nTrackers);
+        final Pattern trackerPattern = Pattern.compile("tracker_([^:]*):.*");
+        final Matcher m = trackerPattern.matcher("");
+        for (String tracker : stat.getActiveTrackerNames()) {
+          m.reset(tracker);
+          if (!m.find()) {
+            continue;
+          }
+          final String name = m.group(1);
+          hostList.add(name);
+        }
+        hosts = hostList.toArray(new String[hostList.size()]);
+      }
+      return new SleepJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq,
+          numLocations, hosts);
     }};
 
   public static final String GRIDMIX_JOB_TYPE = "gridmix.job.type";
-
-
-  private final String name;
-
-  JobCreator(String name) {
-    this.name = name;
-  }
+  public static final String SLEEPJOB_RANDOM_LOCATIONS = 
+    "gridmix.sleep.fake-locations";
 
   public abstract GridmixJob createGridmixJob(
     final Configuration conf, long submissionMillis, final JobStory jobdesc,

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomAlgorithms.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomAlgorithms.java?rev=1077541&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomAlgorithms.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomAlgorithms.java Fri Mar  4 04:27:10 2011
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Random algorithms.
+ */
+public class RandomAlgorithms {
+  
+  private interface IndexMapper {
+    int get(int pos);
+    void swap(int a, int b);
+    int getSize();
+    void reset();
+  }
+
+  /**
+   * A sparse index mapping table - useful when we want to
+   * non-destructively permute a small fraction of a large array.
+   */
+  private static class SparseIndexMapper implements IndexMapper {
+    Map<Integer, Integer> mapping = new HashMap<Integer, Integer>();
+    int size;
+    
+    SparseIndexMapper(int size) { 
+      this.size = size;
+    }
+    
+    public int get(int pos) {
+      Integer mapped = mapping.get(pos);
+      if (mapped == null) return pos;
+      return mapped;
+    }
+
+    public void swap(int a, int b) {
+      if (a == b) return;
+      int valA = get(a);
+      int valB = get(b);
+      if (b == valA) {
+        mapping.remove(b);
+      } else {
+        mapping.put(b, valA);
+      }
+      if (a == valB) {
+        mapping.remove(a);
+      } else {
+        mapping.put(a, valB);
+      }
+    }
+    
+    public int getSize() {
+      return size;
+    }
+    
+    public void reset() {
+      mapping.clear();
+    }
+  }
+
+  /**
+   * A dense index mapping table - useful when we want to
+   * non-destructively permute a large fraction of an array.
+   */
+  private static class DenseIndexMapper implements IndexMapper {
+    int[] mapping;
+
+    DenseIndexMapper(int size) {
+      mapping = new int[size];
+      for (int i=0; i<size; ++i) {
+        mapping[i] = i;
+      }
+    }
+
+    public int get(int pos) {
+      if ( (pos < 0) || (pos>=mapping.length) ) {
+        throw new IndexOutOfBoundsException();
+      }
+      return mapping[pos];
+    }
+
+    public void swap(int a, int b) {
+      if (a == b) return;
+      int valA = get(a);
+      int valB = get(b);
+      mapping[a]=valB;
+      mapping[b]=valA;
+    }
+    
+    public int getSize() {
+      return mapping.length;
+    }
+    
+    public void reset() {
+      return;
+    }
+  }
+
+  /**
+   * Iteratively pick random numbers from pool 0..n-1. Each number can only be
+   * picked once.
+   */
+  public static class Selector {
+    private IndexMapper mapping;
+    private int n;
+    private Random rand;
+
+    /**
+     * Constructor.
+     * 
+     * @param n
+     *          The pool of integers: 0..n-1.
+     * @param selPcnt
+     *          Percentage of selected numbers. This is just a hint for internal
+     *          memory optimization.
+     * @param rand
+     *          Random number generator.
+     */
+    public Selector(int n, double selPcnt, Random rand) {
+      if (n <= 0) {
+        throw new IllegalArgumentException("n should be positive");
+      }
+      
+      boolean sparse = (n > 200) && (selPcnt < 0.1);
+      
+      this.n = n;
+      mapping = (sparse) ? new SparseIndexMapper(n) : new DenseIndexMapper(n);
+      this.rand = rand;
+    }
+    
+    /**
+     * Select the next random number.
+     * @return Random number selected. Or -1 if the remaining pool is empty.
+     */
+    public int next() {
+      switch (n) {
+      case 0: return -1;
+      case 1: 
+      {
+        int index = mapping.get(0);
+        --n;
+        return index;
+      }
+      default:
+      {
+        int pos = rand.nextInt(n);
+        int index = mapping.get(pos);
+        mapping.swap(pos, --n);
+        return index;
+      }
+      }
+    }
+
+    /**
+     * Get the remaining random number pool size.
+     */
+    public int getPoolSize() {
+      return n;
+    }
+    
+    /**
+     * Reset the selector for reuse usage.
+     */
+    public void reset() {
+      mapping.reset();
+      n = mapping.getSize();
+    }
+  }
+  
+  
+  /**
+   * Selecting m random integers from 0..n-1.
+   * @return An array of selected integers.
+   */
+  public static int[] select(int m, int n, Random rand) {
+    if (m >= n) {
+      int[] ret = new int[n];
+      for (int i=0; i<n; ++i) {
+        ret[i] = i;
+      }
+      return ret;
+    }
+    
+    Selector selector = new Selector(n, (float)m/n, rand);
+    int[] selected = new int[m];
+    for (int i=0; i<m; ++i) {
+      selected[i] = selector.next();
+    }
+    return selected;
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java?rev=1077541&r1=1077540&r2=1077541&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java Fri Mar  4 04:27:10 2011
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
@@ -33,6 +34,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapred.gridmix.RandomAlgorithms.Selector;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -53,16 +55,42 @@ import org.apache.commons.logging.LogFac
 
 public class SleepJob extends GridmixJob {
   public static final Log LOG = LogFactory.getLog(SleepJob.class);
-
+  private static final ThreadLocal <Random> rand = 
+    new ThreadLocal <Random> () {
+        @Override protected Random initialValue() {
+            return new Random();
+    }
+  };
+  
+  public static final String SLEEPJOB_MAPTASK_ONLY="gridmix.sleep.maptask-only";
+  private final boolean mapTasksOnly;
+  private final int fakeLocations;
+  private final String[] hosts;
+  private final Selector selector;
+  
   /**
    * Interval at which to report progress, in seconds.
    */
   public static final String GRIDMIX_SLEEP_INTERVAL = "gridmix.sleep.interval";
-
-  public SleepJob(
-    Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
-    UserGroupInformation ugi, int seq) throws IOException {
+  public static final String GRIDMIX_SLEEP_MAX_MAP_TIME = 
+    "gridmix.sleep.max-map-time";
+  public static final String GRIDMIX_SLEEP_MAX_REDUCE_TIME = 
+    "gridmix.sleep.max-reduce-time";
+
+  private long mapMaxSleepTime, reduceMaxSleepTime;
+
+  public SleepJob(Configuration conf, long submissionMillis, JobStory jobdesc,
+      Path outRoot, UserGroupInformation ugi, int seq, int numLocations,
+      String[] hosts) throws IOException {
     super(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
+    this.fakeLocations = numLocations;
+    this.hosts = hosts;
+    this.selector = (fakeLocations > 0)? new Selector(hosts.length, (float) fakeLocations
+        / hosts.length, rand.get()) : null;
+    this.mapTasksOnly = conf.getBoolean(SLEEPJOB_MAPTASK_ONLY, false);
+    mapMaxSleepTime = conf.getLong(GRIDMIX_SLEEP_MAX_MAP_TIME, Long.MAX_VALUE);
+    reduceMaxSleepTime = conf.getLong(GRIDMIX_SLEEP_MAX_REDUCE_TIME,
+        Long.MAX_VALUE);
   }
 
   @Override
@@ -74,7 +102,7 @@ public class SleepJob extends GridmixJob
           throws IOException, ClassNotFoundException, InterruptedException {
           job.setMapperClass(SleepMapper.class);
           job.setReducerClass(SleepReducer.class);
-          job.setNumReduceTasks(jobdesc.getNumberReduces());
+          job.setNumReduceTasks((mapTasksOnly) ? 0 : jobdesc.getNumberReduces());
           job.setMapOutputKeyClass(GridmixKey.class);
           job.setMapOutputValueClass(NullWritable.class);
           job.setSortComparatorClass(GridmixKey.Comparator.class);
@@ -339,7 +367,7 @@ public class SleepJob extends GridmixJob
   @Override
   void buildSplits(FilePool inputDir) throws IOException {
     final List<InputSplit> splits = new ArrayList<InputSplit>();
-    final int reds = jobdesc.getNumberReduces();
+    final int reds = (mapTasksOnly) ? 0 : jobdesc.getNumberReduces();
     final int maps = jobdesc.getNumberMaps();
     for (int i = 0; i < maps; ++i) {
       final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
@@ -349,7 +377,8 @@ public class SleepJob extends GridmixJob
           (ReduceTaskAttemptInfo) getSuccessfulAttemptInfo(
             TaskType.REDUCE, i + j * maps);
         // Include only merge/reduce time
-        redDurations[j] = info.getMergeRuntime() + info.getReduceRuntime();
+        redDurations[j] = Math.min(reduceMaxSleepTime, info.getMergeRuntime()
+            + info.getReduceRuntime());
         if (LOG.isDebugEnabled()) {
           LOG.debug(
             String.format(
@@ -358,9 +387,19 @@ public class SleepJob extends GridmixJob
         }
       }
       final TaskAttemptInfo info = getSuccessfulAttemptInfo(TaskType.MAP, i);
-      splits.add(
-        new SleepSplit(
-          i, info.getRuntime(), redDurations, maps, new String[0]));
+      ArrayList<String> locations = new ArrayList<String>(fakeLocations);
+      if (fakeLocations > 0) {
+        selector.reset();
+      }
+      for (int k=0; k<fakeLocations; ++k) {
+        int index = selector.next();
+        if (index < 0) break;
+        locations.add(hosts[index]);
+      }
+
+      splits.add(new SleepSplit(i,
+          Math.min(info.getRuntime(), mapMaxSleepTime), redDurations, maps,
+          locations.toArray(new String[locations.size()])));
     }
     pushDescription(id(), splits);
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java?rev=1077541&r1=1077540&r2=1077541&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java Fri Mar  4 04:27:10 2011
@@ -44,7 +44,10 @@ public class StressJobFactory extends Jo
    * overloaded. For running maps, we only count them partially. Namely, a 40%
    * completed map is counted as 0.6 map tasks in our calculation.
    */
-  static final float OVERLOAD_MAPTASK_MAPSLOT_RATIO = 2.0f;
+  private static final float OVERLOAD_MAPTASK_MAPSLOT_RATIO = 2.0f;
+  public static final String CONF_OVERLOAD_MAPTASK_MAPSLOT_RATIO=
+      "gridmix.throttle.maps.task-to-slot-ratio";
+  final float overloadMapTaskMapSlotRatio;
 
   /**
    * The minimum ratio between pending+running reduce tasks (aka. incomplete
@@ -53,19 +56,37 @@ public class StressJobFactory extends Jo
    * Namely, a 40% completed reduce is counted as 0.6 reduce tasks in our
    * calculation.
    */
-  static final float OVERLOAD_REDUCETASK_REDUCESLOT_RATIO = 2.5f;
+  private static final float OVERLOAD_REDUCETASK_REDUCESLOT_RATIO = 2.5f;
+  public static final String CONF_OVERLOAD_REDUCETASK_REDUCESLOT_RATIO=
+    "gridmix.throttle.reduces.task-to-slot-ratio";
+  final float overloadReduceTaskReduceSlotRatio;
 
   /**
    * The maximum share of the cluster's mapslot capacity that can be counted
    * toward a job's incomplete map tasks in overload calculation.
    */
-  static final float MAX_MAPSLOT_SHARE_PER_JOB=0.1f;
-
+  private static final float MAX_MAPSLOT_SHARE_PER_JOB=0.1f;
+  public static final String CONF_MAX_MAPSLOT_SHARE_PER_JOB=
+    "gridmix.throttle.maps.max-slot-share-per-job";  
+  final float maxMapSlotSharePerJob;
+  
   /**
    * The maximum share of the cluster's reduceslot capacity that can be counted
    * toward a job's incomplete reduce tasks in overload calculation.
    */
-  static final float MAX_REDUCESLOT_SHARE_PER_JOB=0.1f;
+  private static final float MAX_REDUCESLOT_SHARE_PER_JOB=0.1f;
+  public static final String CONF_MAX_REDUCESLOT_SHARE_PER_JOB=
+    "gridmix.throttle.reducess.max-slot-share-per-job";  
+  final float maxReduceSlotSharePerJob;
+
+  /**
+   * The ratio of the maximum number of pending+running jobs over the number of
+   * task trackers.
+   */
+  private static final float MAX_JOB_TRACKER_RATIO=1.0f;
+  public static final String CONF_MAX_JOB_TRACKER_RATIO=
+    "gridmix.throttle.jobs-to-tracker-ratio";  
+  final float maxJobTrackerRatio;
 
   /**
    * Creating a new instance does not start the thread.
@@ -84,6 +105,17 @@ public class StressJobFactory extends Jo
     throws IOException {
     super(
       submitter, jobProducer, scratch, conf, startFlag, resolver);
+    overloadMapTaskMapSlotRatio = conf.getFloat(
+        CONF_OVERLOAD_MAPTASK_MAPSLOT_RATIO, OVERLOAD_MAPTASK_MAPSLOT_RATIO);
+    overloadReduceTaskReduceSlotRatio = conf.getFloat(
+        CONF_OVERLOAD_REDUCETASK_REDUCESLOT_RATIO, 
+        OVERLOAD_REDUCETASK_REDUCESLOT_RATIO);
+    maxMapSlotSharePerJob = conf.getFloat(
+        CONF_MAX_MAPSLOT_SHARE_PER_JOB, MAX_MAPSLOT_SHARE_PER_JOB);
+    maxReduceSlotSharePerJob = conf.getFloat(
+        CONF_MAX_REDUCESLOT_SHARE_PER_JOB, MAX_REDUCESLOT_SHARE_PER_JOB);
+    maxJobTrackerRatio = conf.getFloat(
+        CONF_MAX_JOB_TRACKER_RATIO, MAX_JOB_TRACKER_RATIO);
   }
 
   public Thread createReaderThread() {
@@ -194,19 +226,19 @@ public class StressJobFactory extends Jo
     }
   }
 
-  static float calcEffectiveIncompleteMapTasks(int mapSlotCapacity,
+  float calcEffectiveIncompleteMapTasks(int mapSlotCapacity,
       int numMaps, float mapProgress) {
     float maxEffIncompleteMapTasks = Math.max(1.0f, mapSlotCapacity
-        * MAX_MAPSLOT_SHARE_PER_JOB);
+        * maxMapSlotSharePerJob);
     float mapProgressAdjusted = Math.max(Math.min(mapProgress, 1.0f), 0.0f);
     return Math.min(maxEffIncompleteMapTasks, numMaps
         * (1.0f - mapProgressAdjusted));
   }
 
-  static float calcEffectiveIncompleteReduceTasks(int reduceSlotCapacity,
+  float calcEffectiveIncompleteReduceTasks(int reduceSlotCapacity,
       int numReduces, float reduceProgress) {
     float maxEffIncompleteReduceTasks = Math.max(1.0f, reduceSlotCapacity
-        * MAX_REDUCESLOT_SHARE_PER_JOB);
+        * maxReduceSlotSharePerJob);
     float reduceProgressAdjusted = Math.max(Math.min(reduceProgress, 1.0f),
         0.0f);
     return Math.min(maxEffIncompleteReduceTasks, numReduces
@@ -226,7 +258,8 @@ public class StressJobFactory extends Jo
     loadStatus.reduceSlotCapacity = clusterStatus.getMaxReduceTasks();
     
     
-    loadStatus.numJobsBackfill = clusterStatus.getTaskTrackers()
+    loadStatus.numJobsBackfill = 
+      (int)(maxJobTrackerRatio*clusterStatus.getTaskTrackers())
         - stats.getNumRunningJob();
     if (loadStatus.numJobsBackfill <= 0) {
       if (LOG.isDebugEnabled()) {
@@ -244,7 +277,7 @@ public class StressJobFactory extends Jo
       incompleteMapTasks += calcEffectiveIncompleteMapTasks(clusterStatus
           .getMaxMapTasks(), noOfMaps, mapProgress);
     }
-    loadStatus.mapSlotsBackfill = (int) (OVERLOAD_MAPTASK_MAPSLOT_RATIO
+    loadStatus.mapSlotsBackfill = (int) (overloadMapTaskMapSlotRatio
         * clusterStatus.getMaxMapTasks() - incompleteMapTasks);
     if (loadStatus.mapSlotsBackfill <= 0) {
       if (LOG.isDebugEnabled()) {
@@ -264,7 +297,7 @@ public class StressJobFactory extends Jo
             clusterStatus.getMaxReduceTasks(), noOfReduces, reduceProgress);
       }
     }
-    loadStatus.reduceSlotsBackfill = (int) (OVERLOAD_REDUCETASK_REDUCESLOT_RATIO
+    loadStatus.reduceSlotsBackfill = (int) (overloadReduceTaskReduceSlotRatio
         * clusterStatus.getMaxReduceTasks() - incompleteReduceTasks);
     if (loadStatus.reduceSlotsBackfill <= 0) {
       if (LOG.isDebugEnabled()) {

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomAlgorithm.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomAlgorithm.java?rev=1077541&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomAlgorithm.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomAlgorithm.java Fri Mar  4 04:27:10 2011
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.junit.Test;
+
+import com.sun.tools.javac.code.Attribute.Array;
+
+public class TestRandomAlgorithm {
+  private static final int[][] parameters = new int[][] {
+    {5, 1, 1}, 
+    {10, 1, 2},
+    {10, 2, 2},
+    {20, 1, 3},
+    {20, 2, 3},
+    {20, 3, 3},
+    {100, 3, 10},
+    {100, 3, 100},
+    {100, 3, 1000},
+    {100, 3, 10000},
+    {100, 3, 100000},
+    {100, 3, 1000000}
+  };
+  
+  private List<Integer> convertIntArray(int[] from) {
+    List<Integer> ret = new ArrayList<Integer>(from.length);
+    for (int v : from) {
+      ret.add(v);
+    }
+    return ret;
+  }
+  
+  private void testRandomSelectSelector(int niter, int m, int n) {
+    RandomAlgorithms.Selector selector = new RandomAlgorithms.Selector(n,
+        (double) m / n, new Random());
+    Map<List<Integer>, Integer> results = new HashMap<List<Integer>, Integer>(
+        niter);
+    for (int i = 0; i < niter; ++i, selector.reset()) {
+      int[] result = new int[m];
+      for (int j = 0; j < m; ++j) {
+        int v = selector.next();
+        if (v < 0)
+          break;
+        result[j]=v;
+      }
+      Arrays.sort(result);
+      List<Integer> resultAsList = convertIntArray(result);
+      Integer count = results.get(resultAsList);
+      if (count == null) {
+        results.put(resultAsList, 1);
+      } else {
+        results.put(resultAsList, ++count);
+      }
+    }
+
+    verifyResults(results, m, n);
+  }
+
+  private void testRandomSelect(int niter, int m, int n) {
+    Random random = new Random();
+    Map<List<Integer>, Integer> results = new HashMap<List<Integer>, Integer>(
+        niter);
+    for (int i = 0; i < niter; ++i) {
+      int[] result = RandomAlgorithms.select(m, n, random);
+      Arrays.sort(result);
+      List<Integer> resultAsList = convertIntArray(result);
+      Integer count = results.get(resultAsList);
+      if (count == null) {
+        results.put(resultAsList, 1);
+      } else {
+        results.put(resultAsList, ++count);
+      }
+    }
+
+    verifyResults(results, m, n);
+  }
+
+  private void verifyResults(Map<List<Integer>, Integer> results, int m, int n) {
+    if (n>=10) {
+      assertTrue(results.size() >= Math.min(m, 2));
+    }
+    for (List<Integer> result : results.keySet()) {
+      assertEquals(m, result.size());
+      Set<Integer> seen = new HashSet<Integer>();
+      for (int v : result) {
+        System.out.printf("%d ", v);
+        assertTrue((v >= 0) && (v < n));
+        assertTrue(seen.add(v));
+      }
+      System.out.printf(" ==> %d\n", results.get(result));
+    }
+    System.out.println("====");
+  }
+  
+  @Test
+  public void testRandomSelect() {
+    for (int[] param : parameters) {
+    testRandomSelect(param[0], param[1], param[2]);
+    }
+  }
+  
+  @Test
+  public void testRandomSelectSelector() {
+    for (int[] param : parameters) {
+      testRandomSelectSelector(param[0], param[1], param[2]);
+      }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java?rev=1077541&r1=1077540&r2=1077541&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java Fri Mar  4 04:27:10 2011
@@ -24,7 +24,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Level;
@@ -34,6 +38,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -127,6 +132,31 @@ public class TestSleepJob {
   }
 
   @Test
+  public void testRandomLocationSubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.STRESS;
+    System.out.println(" Random locations started at " + System.currentTimeMillis());
+    doSubmission("-D"+JobCreator.SLEEPJOB_RANDOM_LOCATIONS+"=3");
+    System.out.println(" Random locations ended at " + System.currentTimeMillis());
+  }
+  
+  @Test
+  public void testMapTasksOnlySubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.STRESS;
+    System.out.println(" Map tasks only at " + System.currentTimeMillis());
+    doSubmission("-D"+SleepJob.SLEEPJOB_MAPTASK_ONLY+"=true");
+    System.out.println(" Map tasks only ended at " + System.currentTimeMillis());
+  }
+
+  @Test
+  public void testLimitTaskSleepTimeSubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.STRESS;
+    System.out.println(" Limit sleep time only at " + System.currentTimeMillis());
+    doSubmission("-D" + SleepJob.GRIDMIX_SLEEP_MAX_MAP_TIME + "=100", "-D"
+        + SleepJob.GRIDMIX_SLEEP_MAX_REDUCE_TIME + "=200");
+    System.out.println(" Limit sleep time ended at " + System.currentTimeMillis());
+  }
+
+  @Test
   public void testStressSubmit() throws Exception {
     policy = GridmixJobSubmissionPolicy.STRESS;
     System.out.println(" Stress started at " + System.currentTimeMillis());
@@ -141,22 +171,83 @@ public class TestSleepJob {
     doSubmission();
     System.out.println("Serial ended at " + System.currentTimeMillis());
   }
+  
+  @Test
+  public void testRandomLocation() throws Exception {
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+    // testRandomLocation(0, 10, ugi);
+    testRandomLocation(1, 10, ugi);
+    testRandomLocation(2, 10, ugi);
+  }
+  
+  private void testRandomLocation(int locations, int njobs, UserGroupInformation ugi) throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(JobCreator.SLEEPJOB_RANDOM_LOCATIONS, locations);
+    DebugJobProducer jobProducer = new DebugJobProducer(njobs, conf);
+    JobConf jconf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
+    JobStory story;
+    int seq=1;
+    while ((story = jobProducer.getNextJob()) != null) {
+      GridmixJob gridmixJob = JobCreator.SLEEPJOB.createGridmixJob(jconf, 0,
+          story, new Path("ignored"), ugi, seq++);
+      gridmixJob.buildSplits(null);
+      List<InputSplit> splits = new SleepJob.SleepInputFormat()
+          .getSplits(gridmixJob.getJob());
+      for (InputSplit split : splits) {
+        assertEquals(locations, split.getLocations().length);
+      }
+    }
+  }
+  
+  @Test
+  public void testMapTasksOnlySleepJobs()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(SleepJob.SLEEPJOB_MAPTASK_ONLY, true);
+    DebugJobProducer jobProducer = new DebugJobProducer(5, conf);
+    JobConf jconf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+    JobStory story;
+    int seq = 1;
+    while ((story = jobProducer.getNextJob()) != null) {
+      GridmixJob gridmixJob = JobCreator.SLEEPJOB.createGridmixJob(jconf, 0,
+          story, new Path("ignored"), ugi, seq++);
+      gridmixJob.buildSplits(null);
+      Job job = gridmixJob.call();
+      assertEquals(0, job.getNumReduceTasks());
+    }
+  }
 
-
-  private void doSubmission() throws Exception {
+  private void doSubmission(String...optional) throws Exception {
     final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
     final Path out = GridmixTestUtils.DEST.makeQualified(GridmixTestUtils.dfs);
     final Path root = new Path("/user");
     Configuration conf = null;
     try {
-      final String[] argv = {"-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
+      // required options
+      final String[] required = {"-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
         "-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
         "-D" + Gridmix.GRIDMIX_USR_RSV + "=" + EchoUserResolver.class.getName(),
         "-D" + JobCreator.GRIDMIX_JOB_TYPE + "=" + JobCreator.SLEEPJOB.name(),
-        "-D" + SleepJob.GRIDMIX_SLEEP_INTERVAL +"=" +"10",
-        "-generate",String.valueOf(GENDATA) + "m", in.toString(), "-"
-        // ignored by DebugGridmix
+        "-D" + SleepJob.GRIDMIX_SLEEP_INTERVAL +"=" +"10"
       };
+      // mandatory arguments
+      final String[] mandatory = {
+          "-generate",String.valueOf(GENDATA) + "m", in.toString(), "-"
+          // ignored by DebugGridmix
+      };
+      
+      ArrayList<String> argv = new ArrayList<String>(required.length+optional.length+mandatory.length);
+      for (String s : required) {
+        argv.add(s);
+      }
+      for (String s : optional) {
+        argv.add(s);
+      }
+      for (String s : mandatory) {
+        argv.add(s);
+      }
+
       DebugGridmix client = new DebugGridmix();
       conf = new Configuration();
       conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy);
@@ -165,7 +256,12 @@ public class TestSleepJob {
       // allow synthetic users to create home directories
       GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short) 0777));
       GridmixTestUtils.dfs.setPermission(root, new FsPermission((short) 0777));
-      int res = ToolRunner.run(conf, client, argv);
+      String[] args = argv.toArray(new String[argv.size()]);
+      System.out.println("Command line arguments:");
+      for (int i=0; i<args.length; ++i) {
+        System.out.printf("    [%d] %s\n", i, args[i]);
+      }
+      int res = ToolRunner.run(conf, client, args);
       assertEquals("Client exited with nonzero status", 0, res);
       client.checkMonitor();
     } catch (Exception e) {