You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/07/21 21:06:07 UTC
svn commit: r966365 - in /hadoop/mapreduce/trunk: ./
src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/
src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/
Author: cdouglas
Date: Wed Jul 21 19:06:06 2010
New Revision: 966365
URL: http://svn.apache.org/viewvc?rev=966365&view=rev
Log:
MAPREDUCE-1936. Modify Gridmix3 to support more tunable parameters for
stress submission and sleep jobs. Contributed by Hong Tang
Added:
hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomAlgorithms.java
hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomAlgorithm.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java
hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=966365&r1=966364&r2=966365&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jul 21 19:06:06 2010
@@ -89,6 +89,9 @@ Trunk (unreleased changes)
MAPREDUCE-1945. The MapReduce component for HADOOP-6632.
(Kan Zhang & Jitendra Pandey via ddas)
+ MAPREDUCE-1936. Modify Gridmix3 to support more tunable parameters for
+ stress submission and sleep jobs. (Hong Tang via cdouglas)
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java?rev=966365&r1=966364&r2=966365&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java Wed Jul 21 19:06:06 2010
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapred.gridmix;
import java.io.IOException;
+
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
@@ -38,6 +39,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.gridmix.RandomAlgorithms.Selector;
/**
* Class for caching a pool of input data to be used by synthetic jobs for
@@ -135,69 +137,6 @@ class FilePool {
throws IOException;
}
- interface IndexMapper {
- int get(int pos);
- void swap(int a, int b);
- }
-
- /**
- * A sparse index mapping table - useful when we want to
- * non-destructively permute a small fraction of a large array.
- */
- static class SparseIndexMapper implements IndexMapper {
- Map<Integer, Integer> mapping = new HashMap<Integer, Integer>();
-
- public int get(int pos) {
- Integer mapped = mapping.get(pos);
- if (mapped == null) return pos;
- return mapped;
- }
-
- public void swap(int a, int b) {
- int valA = get(a);
- int valB = get(b);
- if (b == valA) {
- mapping.remove(b);
- } else {
- mapping.put(b, valA);
- }
- if (a == valB) {
- mapping.remove(a);
- } else {
- mapping.put(a, valB);
- }
- }
- }
-
- /**
- * A dense index mapping table - useful when we want to
- * non-destructively permute a large fraction of an array.
- */
- static class DenseIndexMapper implements IndexMapper {
- int[] mapping;
-
- DenseIndexMapper(int size) {
- mapping = new int[size];
- for (int i=0; i<size; ++i) {
- mapping[i] = i;
- }
- }
-
- public int get(int pos) {
- if ( (pos < 0) || (pos>=mapping.length) ) {
- throw new IndexOutOfBoundsException();
- }
- return mapping[pos];
- }
-
- public void swap(int a, int b) {
- int valA = get(a);
- int valB = get(b);
- mapping[a]=valB;
- mapping[b]=valA;
- }
- }
-
/**
* Files in current directory of this Node.
*/
@@ -223,22 +162,15 @@ class FilePool {
return getSize();
}
- IndexMapper mapping;
- if ((curdir.size() < 200) || ((double) targetSize / getSize() > 0.5)) {
- mapping = new DenseIndexMapper(curdir.size());
- } else {
- mapping = new SparseIndexMapper();
- }
-
+ Selector selector = new Selector(curdir.size(), (double) targetSize
+ / getSize(), rand);
+
ArrayList<Integer> selected = new ArrayList<Integer>();
long ret = 0L;
- int poolSize = curdir.size();
do {
- int pos = rand.nextInt(poolSize);
- int index = mapping.get(pos);
+ int index = selector.next();
selected.add(index);
ret += curdir.get(index).getLen();
- mapping.swap(pos, --poolSize);
} while (ret < targetSize);
for (Integer i : selected) {
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=966365&r1=966364&r2=966365&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Wed Jul 21 19:06:06 2010
@@ -394,32 +394,67 @@ public class Gridmix extends Configured
}
}
- protected void printUsage(PrintStream out) {
- ToolRunner.printGenericCommandUsage(out);
- out.println("Usage: gridmix [-generate <MiB>] [-users URI] <iopath> <trace>");
- out.println(" e.g. gridmix -generate 100m foo -");
- out.println("Configuration parameters:");
- out.printf(" %-42s : Output directory\n", GRIDMIX_OUT_DIR);
- out.printf(" %-42s : Submitting threads\n", GRIDMIX_SUB_THR);
- out.printf(" %-42s : Queued job desc\n", GRIDMIX_QUE_DEP);
- out.printf(" %-42s : Key fraction of rec\n",
- AvgRecordFactory.GRIDMIX_KEY_FRC);
- out.printf(" %-42s : User resolution class\n", GRIDMIX_USR_RSV);
- out.printf(" %-42s : Enable/disable using queues in trace\n",
- GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE);
- out.printf(" %-42s : Default queue\n",
- GridmixJob.GRIDMIX_DEFAULT_QUEUE);
-
+ private <T> String getEnumValues(Enum<? extends T>[] e) {
StringBuilder sb = new StringBuilder();
String sep = "";
- for (GridmixJobSubmissionPolicy p : GridmixJobSubmissionPolicy.values()) {
+ for (Enum<? extends T> v : e) {
sb.append(sep);
- sb.append(p.name());
+ sb.append(v.name());
sep = "|";
}
- out.printf(" %-42s : Job submission policy (%s)\n",
- GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, sb.toString());
+ return sb.toString();
+ }
+
+ private String getJobTypes() {
+ return getEnumValues(JobCreator.values());
}
+
+ private String getSubmissionPolicies() {
+ return getEnumValues(GridmixJobSubmissionPolicy.values());
+ }
+
+ protected void printUsage(PrintStream out) {
+ ToolRunner.printGenericCommandUsage(out);
+ out.println("Usage: gridmix [-generate <MiB>] [-users URI] [-Dname=value ...] <iopath> <trace>");
+ out.println(" e.g. gridmix -generate 100m foo -");
+ out.println("Configuration parameters:");
+ out.println(" General parameters:");
+ out.printf(" %-48s : Output directory\n", GRIDMIX_OUT_DIR);
+ out.printf(" %-48s : Submitting threads\n", GRIDMIX_SUB_THR);
+ out.printf(" %-48s : Queued job desc\n", GRIDMIX_QUE_DEP);
+ out.printf(" %-48s : User resolution class\n", GRIDMIX_USR_RSV);
+ out.printf(" %-48s : Job types (%s)\n", JobCreator.GRIDMIX_JOB_TYPE, getJobTypes());
+ out.println(" Parameters related to job submission:");
+ out.printf(" %-48s : Default queue\n",
+ GridmixJob.GRIDMIX_DEFAULT_QUEUE);
+ out.printf(" %-48s : Enable/disable using queues in trace\n",
+ GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE);
+ out.printf(" %-48s : Job submission policy (%s)\n",
+ GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, getSubmissionPolicies());
+ out.println(" Parameters specific for LOADJOB:");
+ out.printf(" %-48s : Key fraction of rec\n",
+ AvgRecordFactory.GRIDMIX_KEY_FRC);
+ out.println(" Parameters specific for SLEEPJOB:");
+ out.printf(" %-48s : Whether to ignore reduce tasks\n",
+ SleepJob.SLEEPJOB_MAPTASK_ONLY);
+ out.printf(" %-48s : Number of fake locations for map tasks\n",
+ JobCreator.SLEEPJOB_RANDOM_LOCATIONS);
+ out.printf(" %-48s : Maximum map task runtime in mili-sec\n",
+ SleepJob.GRIDMIX_SLEEP_MAX_MAP_TIME);
+ out.printf(" %-48s : Maximum reduce task runtime in mili-sec (merge+reduce)\n",
+ SleepJob.GRIDMIX_SLEEP_MAX_REDUCE_TIME);
+ out.println(" Parameters specific for STRESS submission throttling policy:");
+ out.printf(" %-48s : jobs vs task-tracker ratio\n",
+ StressJobFactory.CONF_MAX_JOB_TRACKER_RATIO);
+ out.printf(" %-48s : maps vs map-slot ratio\n",
+ StressJobFactory.CONF_OVERLOAD_MAPTASK_MAPSLOT_RATIO);
+ out.printf(" %-48s : reduces vs reduce-slot ratio\n",
+ StressJobFactory.CONF_OVERLOAD_REDUCETASK_REDUCESLOT_RATIO);
+ out.printf(" %-48s : map-slot share per job\n",
+ StressJobFactory.CONF_MAX_MAPSLOT_SHARE_PER_JOB);
+ out.printf(" %-48s : reduce-slot share per job\n",
+ StressJobFactory.CONF_MAX_REDUCESLOT_SHARE_PER_JOB);
+ }
/**
* Components in the pipeline must support the following operations for
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java?rev=966365&r1=966364&r2=966365&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java Wed Jul 21 19:06:06 2010
@@ -20,14 +20,23 @@ package org.apache.hadoop.mapred.gridmix
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.gridmix.GenerateData.GenSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
public enum JobCreator {
- LOADJOB("LOADJOB") {
+ LOADJOB {
@Override
public GridmixJob createGridmixJob(
Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
@@ -35,22 +44,39 @@ public enum JobCreator {
return new LoadJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
}},
- SLEEPJOB("SLEEPJOB") {
+ SLEEPJOB {
+ private String[] hosts;
+
@Override
public GridmixJob createGridmixJob(
Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
UserGroupInformation ugi, int seq) throws IOException {
- return new SleepJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
+ int numLocations = conf.getInt(SLEEPJOB_RANDOM_LOCATIONS, 0);
+ if (numLocations < 0) numLocations=0;
+ if ((numLocations > 0) && (hosts == null)) {
+ final JobClient client = new JobClient(new JobConf(conf));
+ ClusterStatus stat = client.getClusterStatus(true);
+ final int nTrackers = stat.getTaskTrackers();
+ final ArrayList<String> hostList = new ArrayList<String>(nTrackers);
+ final Pattern trackerPattern = Pattern.compile("tracker_([^:]*):.*");
+ final Matcher m = trackerPattern.matcher("");
+ for (String tracker : stat.getActiveTrackerNames()) {
+ m.reset(tracker);
+ if (!m.find()) {
+ continue;
+ }
+ final String name = m.group(1);
+ hostList.add(name);
+ }
+ hosts = hostList.toArray(new String[hostList.size()]);
+ }
+ return new SleepJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq,
+ numLocations, hosts);
}};
public static final String GRIDMIX_JOB_TYPE = "gridmix.job.type";
-
-
- private final String name;
-
- JobCreator(String name) {
- this.name = name;
- }
+ public static final String SLEEPJOB_RANDOM_LOCATIONS =
+ "gridmix.sleep.fake-locations";
public abstract GridmixJob createGridmixJob(
final Configuration conf, long submissionMillis, final JobStory jobdesc,
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomAlgorithms.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomAlgorithms.java?rev=966365&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomAlgorithms.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomAlgorithms.java Wed Jul 21 19:06:06 2010
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Random algorithms.
+ */
+public class RandomAlgorithms {
+
+ private interface IndexMapper {
+ int get(int pos);
+ void swap(int a, int b);
+ int getSize();
+ void reset();
+ }
+
+ /**
+ * A sparse index mapping table - useful when we want to
+ * non-destructively permute a small fraction of a large array.
+ */
+ private static class SparseIndexMapper implements IndexMapper {
+ Map<Integer, Integer> mapping = new HashMap<Integer, Integer>();
+ int size;
+
+ SparseIndexMapper(int size) {
+ this.size = size;
+ }
+
+ public int get(int pos) {
+ Integer mapped = mapping.get(pos);
+ if (mapped == null) return pos;
+ return mapped;
+ }
+
+ public void swap(int a, int b) {
+ if (a == b) return;
+ int valA = get(a);
+ int valB = get(b);
+ if (b == valA) {
+ mapping.remove(b);
+ } else {
+ mapping.put(b, valA);
+ }
+ if (a == valB) {
+ mapping.remove(a);
+ } else {
+ mapping.put(a, valB);
+ }
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public void reset() {
+ mapping.clear();
+ }
+ }
+
+ /**
+ * A dense index mapping table - useful when we want to
+ * non-destructively permute a large fraction of an array.
+ */
+ private static class DenseIndexMapper implements IndexMapper {
+ int[] mapping;
+
+ DenseIndexMapper(int size) {
+ mapping = new int[size];
+ for (int i=0; i<size; ++i) {
+ mapping[i] = i;
+ }
+ }
+
+ public int get(int pos) {
+ if ( (pos < 0) || (pos>=mapping.length) ) {
+ throw new IndexOutOfBoundsException();
+ }
+ return mapping[pos];
+ }
+
+ public void swap(int a, int b) {
+ if (a == b) return;
+ int valA = get(a);
+ int valB = get(b);
+ mapping[a]=valB;
+ mapping[b]=valA;
+ }
+
+ public int getSize() {
+ return mapping.length;
+ }
+
+ public void reset() {
+ return;
+ }
+ }
+
+ /**
+ * Iteratively pick random numbers from pool 0..n-1. Each number can only be
+ * picked once.
+ */
+ public static class Selector {
+ private IndexMapper mapping;
+ private int n;
+ private Random rand;
+
+ /**
+ * Constructor.
+ *
+ * @param n
+ * The pool of integers: 0..n-1.
+ * @param selPcnt
+ * Percentage of selected numbers. This is just a hint for internal
+ * memory optimization.
+ * @param rand
+ * Random number generator.
+ */
+ public Selector(int n, double selPcnt, Random rand) {
+ if (n <= 0) {
+ throw new IllegalArgumentException("n should be positive");
+ }
+
+ boolean sparse = (n > 200) && (selPcnt < 0.1);
+
+ this.n = n;
+ mapping = (sparse) ? new SparseIndexMapper(n) : new DenseIndexMapper(n);
+ this.rand = rand;
+ }
+
+ /**
+ * Select the next random number.
+ * @return Random number selected. Or -1 if the remaining pool is empty.
+ */
+ public int next() {
+ switch (n) {
+ case 0: return -1;
+ case 1:
+ {
+ int index = mapping.get(0);
+ --n;
+ return index;
+ }
+ default:
+ {
+ int pos = rand.nextInt(n);
+ int index = mapping.get(pos);
+ mapping.swap(pos, --n);
+ return index;
+ }
+ }
+ }
+
+ /**
+ * Get the remaining random number pool size.
+ */
+ public int getPoolSize() {
+ return n;
+ }
+
+ /**
+ * Reset the selector for reuse usage.
+ */
+ public void reset() {
+ mapping.reset();
+ n = mapping.getSize();
+ }
+ }
+
+
+ /**
+ * Selecting m random integers from 0..n-1.
+ * @return An array of selected integers.
+ */
+ public static int[] select(int m, int n, Random rand) {
+ if (m >= n) {
+ int[] ret = new int[n];
+ for (int i=0; i<n; ++i) {
+ ret[i] = i;
+ }
+ return ret;
+ }
+
+ Selector selector = new Selector(n, (float)m/n, rand);
+ int[] selected = new int[m];
+ for (int i=0; i<m; ++i) {
+ selected[i] = selector.next();
+ }
+ return selected;
+ }
+}
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java?rev=966365&r1=966364&r2=966365&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java Wed Jul 21 19:06:06 2010
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
@@ -33,6 +34,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapred.gridmix.RandomAlgorithms.Selector;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -53,16 +55,42 @@ import org.apache.commons.logging.LogFac
public class SleepJob extends GridmixJob {
public static final Log LOG = LogFactory.getLog(SleepJob.class);
-
+ private static final ThreadLocal <Random> rand =
+ new ThreadLocal <Random> () {
+ @Override protected Random initialValue() {
+ return new Random();
+ }
+ };
+
+ public static final String SLEEPJOB_MAPTASK_ONLY="gridmix.sleep.maptask-only";
+ private final boolean mapTasksOnly;
+ private final int fakeLocations;
+ private final String[] hosts;
+ private final Selector selector;
+
/**
* Interval at which to report progress, in seconds.
*/
public static final String GRIDMIX_SLEEP_INTERVAL = "gridmix.sleep.interval";
-
- public SleepJob(
- Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
- UserGroupInformation ugi, int seq) throws IOException {
+ public static final String GRIDMIX_SLEEP_MAX_MAP_TIME =
+ "gridmix.sleep.max-map-time";
+ public static final String GRIDMIX_SLEEP_MAX_REDUCE_TIME =
+ "gridmix.sleep.max-reduce-time";
+
+ private final long mapMaxSleepTime, reduceMaxSleepTime;
+
+ public SleepJob(Configuration conf, long submissionMillis, JobStory jobdesc,
+ Path outRoot, UserGroupInformation ugi, int seq, int numLocations,
+ String[] hosts) throws IOException {
super(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
+ this.fakeLocations = numLocations;
+ this.hosts = hosts;
+ this.selector = (fakeLocations > 0)? new Selector(hosts.length, (float) fakeLocations
+ / hosts.length, rand.get()) : null;
+ this.mapTasksOnly = conf.getBoolean(SLEEPJOB_MAPTASK_ONLY, false);
+ mapMaxSleepTime = conf.getLong(GRIDMIX_SLEEP_MAX_MAP_TIME, Long.MAX_VALUE);
+ reduceMaxSleepTime = conf.getLong(GRIDMIX_SLEEP_MAX_REDUCE_TIME,
+ Long.MAX_VALUE);
}
@Override
@@ -74,7 +102,7 @@ public class SleepJob extends GridmixJob
throws IOException, ClassNotFoundException, InterruptedException {
job.setMapperClass(SleepMapper.class);
job.setReducerClass(SleepReducer.class);
- job.setNumReduceTasks(jobdesc.getNumberReduces());
+ job.setNumReduceTasks((mapTasksOnly) ? 0 : jobdesc.getNumberReduces());
job.setMapOutputKeyClass(GridmixKey.class);
job.setMapOutputValueClass(NullWritable.class);
job.setSortComparatorClass(GridmixKey.Comparator.class);
@@ -340,7 +368,7 @@ public class SleepJob extends GridmixJob
@Override
void buildSplits(FilePool inputDir) throws IOException {
final List<InputSplit> splits = new ArrayList<InputSplit>();
- final int reds = jobdesc.getNumberReduces();
+ final int reds = (mapTasksOnly) ? 0 : jobdesc.getNumberReduces();
final int maps = jobdesc.getNumberMaps();
for (int i = 0; i < maps; ++i) {
final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
@@ -350,7 +378,8 @@ public class SleepJob extends GridmixJob
(ReduceTaskAttemptInfo) getSuccessfulAttemptInfo(TaskType.REDUCE,
i + j * maps);
// Include only merge/reduce time
- redDurations[j] = info.getMergeRuntime() + info.getReduceRuntime();
+ redDurations[j] = Math.min(reduceMaxSleepTime, info.getMergeRuntime()
+ + info.getReduceRuntime());
if (LOG.isDebugEnabled()) {
LOG.debug(
String.format(
@@ -359,9 +388,19 @@ public class SleepJob extends GridmixJob
}
}
final TaskAttemptInfo info = getSuccessfulAttemptInfo(TaskType.MAP, i);
- splits.add(new SleepSplit(i, info.getRuntime(), redDurations, maps,
- new String[0]));
- }
+ ArrayList<String> locations = new ArrayList<String>(fakeLocations);
+ if (fakeLocations > 0) {
+ selector.reset();
+ }
+ for (int k=0; k<fakeLocations; ++k) {
+ int index = selector.next();
+ if (index < 0) break;
+ locations.add(hosts[index]);
+ }
+
+ splits.add(new SleepSplit(i,
+ Math.min(info.getRuntime(), mapMaxSleepTime), redDurations, maps,
+ locations.toArray(new String[locations.size()]))); }
pushDescription(id(), splits);
}
}
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java?rev=966365&r1=966364&r2=966365&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java Wed Jul 21 19:06:06 2010
@@ -44,7 +44,10 @@ public class StressJobFactory extends Jo
* overloaded. For running maps, we only count them partially. Namely, a 40%
* completed map is counted as 0.6 map tasks in our calculation.
*/
- static final float OVERLOAD_MAPTASK_MAPSLOT_RATIO = 2.0f;
+ private static final float OVERLOAD_MAPTASK_MAPSLOT_RATIO = 2.0f;
+ public static final String CONF_OVERLOAD_MAPTASK_MAPSLOT_RATIO=
+ "gridmix.throttle.maps.task-to-slot-ratio";
+ final float overloadMapTaskMapSlotRatio;
/**
* The minimum ratio between pending+running reduce tasks (aka. incomplete
@@ -53,19 +56,37 @@ public class StressJobFactory extends Jo
* Namely, a 40% completed reduce is counted as 0.6 reduce tasks in our
* calculation.
*/
- static final float OVERLOAD_REDUCETASK_REDUCESLOT_RATIO = 2.5f;
+ private static final float OVERLOAD_REDUCETASK_REDUCESLOT_RATIO = 2.5f;
+ public static final String CONF_OVERLOAD_REDUCETASK_REDUCESLOT_RATIO=
+ "gridmix.throttle.reduces.task-to-slot-ratio";
+ final float overloadReduceTaskReduceSlotRatio;
/**
* The maximum share of the cluster's mapslot capacity that can be counted
* toward a job's incomplete map tasks in overload calculation.
*/
- static final float MAX_MAPSLOT_SHARE_PER_JOB=0.1f;
-
+ private static final float MAX_MAPSLOT_SHARE_PER_JOB=0.1f;
+ public static final String CONF_MAX_MAPSLOT_SHARE_PER_JOB=
+ "gridmix.throttle.maps.max-slot-share-per-job";
+ final float maxMapSlotSharePerJob;
+
/**
* The maximum share of the cluster's reduceslot capacity that can be counted
* toward a job's incomplete reduce tasks in overload calculation.
*/
- static final float MAX_REDUCESLOT_SHARE_PER_JOB=0.1f;
+ private static final float MAX_REDUCESLOT_SHARE_PER_JOB=0.1f;
+ public static final String CONF_MAX_REDUCESLOT_SHARE_PER_JOB=
+ "gridmix.throttle.reducess.max-slot-share-per-job";
+ final float maxReduceSlotSharePerJob;
+
+ /**
+ * The ratio of the maximum number of pending+running jobs over the number of
+ * task trackers.
+ */
+ private static final float MAX_JOB_TRACKER_RATIO=1.0f;
+ public static final String CONF_MAX_JOB_TRACKER_RATIO=
+ "gridmix.throttle.jobs-to-tracker-ratio";
+ final float maxJobTrackerRatio;
/**
* Creating a new instance does not start the thread.
@@ -84,6 +105,17 @@ public class StressJobFactory extends Jo
throws IOException {
super(
submitter, jobProducer, scratch, conf, startFlag, resolver);
+ overloadMapTaskMapSlotRatio = conf.getFloat(
+ CONF_OVERLOAD_MAPTASK_MAPSLOT_RATIO, OVERLOAD_MAPTASK_MAPSLOT_RATIO);
+ overloadReduceTaskReduceSlotRatio = conf.getFloat(
+ CONF_OVERLOAD_REDUCETASK_REDUCESLOT_RATIO,
+ OVERLOAD_REDUCETASK_REDUCESLOT_RATIO);
+ maxMapSlotSharePerJob = conf.getFloat(
+ CONF_MAX_MAPSLOT_SHARE_PER_JOB, MAX_MAPSLOT_SHARE_PER_JOB);
+ maxReduceSlotSharePerJob = conf.getFloat(
+ CONF_MAX_REDUCESLOT_SHARE_PER_JOB, MAX_REDUCESLOT_SHARE_PER_JOB);
+ maxJobTrackerRatio = conf.getFloat(
+ CONF_MAX_JOB_TRACKER_RATIO, MAX_JOB_TRACKER_RATIO);
}
public Thread createReaderThread() {
@@ -194,19 +226,19 @@ public class StressJobFactory extends Jo
}
}
- static float calcEffectiveIncompleteMapTasks(int mapSlotCapacity,
+ float calcEffectiveIncompleteMapTasks(int mapSlotCapacity,
int numMaps, float mapProgress) {
- float maxEffIncompleteMapTasks =
- Math.max(1.0f, mapSlotCapacity * MAX_MAPSLOT_SHARE_PER_JOB);
+ float maxEffIncompleteMapTasks = Math.max(1.0f, mapSlotCapacity
+ * maxMapSlotSharePerJob);
float mapProgressAdjusted = Math.max(Math.min(mapProgress, 1.0f), 0.0f);
return Math.min(maxEffIncompleteMapTasks,
numMaps * (1.0f - mapProgressAdjusted));
}
- static float calcEffectiveIncompleteReduceTasks(int reduceSlotCapacity,
+ float calcEffectiveIncompleteReduceTasks(int reduceSlotCapacity,
int numReduces, float reduceProgress) {
- float maxEffIncompleteReduceTasks =
- Math.max(1.0f, reduceSlotCapacity * MAX_REDUCESLOT_SHARE_PER_JOB);
+ float maxEffIncompleteReduceTasks = Math.max(1.0f, reduceSlotCapacity
+ * maxReduceSlotSharePerJob);
float reduceProgressAdjusted =
Math.max(Math.min(reduceProgress, 1.0f), 0.0f);
return Math.min(maxEffIncompleteReduceTasks,
@@ -227,7 +259,8 @@ public class StressJobFactory extends Jo
loadStatus.numJobsBackfill =
- clusterStatus.getTaskTrackers() - stats.getNumRunningJob();
+ (int) (maxJobTrackerRatio * clusterStatus.getTaskTrackers())
+ - stats.getNumRunningJob();
if (loadStatus.numJobsBackfill <= 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(System.currentTimeMillis() + " Overloaded is "
@@ -246,7 +279,7 @@ public class StressJobFactory extends Jo
clusterStatus.getMaxMapTasks(), noOfMaps, mapProgress);
}
loadStatus.mapSlotsBackfill =
- (int) ((OVERLOAD_MAPTASK_MAPSLOT_RATIO * clusterStatus.getMaxMapTasks())
+ (int) ((overloadMapTaskMapSlotRatio * clusterStatus.getMaxMapTasks())
- incompleteMapTasks);
if (loadStatus.mapSlotsBackfill <= 0) {
if (LOG.isDebugEnabled()) {
@@ -268,7 +301,7 @@ public class StressJobFactory extends Jo
}
}
loadStatus.reduceSlotsBackfill =
- (int) ((OVERLOAD_REDUCETASK_REDUCESLOT_RATIO * clusterStatus.getMaxReduceTasks())
+ (int) ((overloadReduceTaskReduceSlotRatio * clusterStatus.getMaxReduceTasks())
- incompleteReduceTasks);
if (loadStatus.reduceSlotsBackfill <= 0) {
if (LOG.isDebugEnabled()) {
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomAlgorithm.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomAlgorithm.java?rev=966365&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomAlgorithm.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomAlgorithm.java Wed Jul 21 19:06:06 2010
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.junit.Test;
+
+import com.sun.tools.javac.code.Attribute.Array;
+
+public class TestRandomAlgorithm {
+ private static final int[][] parameters = new int[][] {
+ {5, 1, 1},
+ {10, 1, 2},
+ {10, 2, 2},
+ {20, 1, 3},
+ {20, 2, 3},
+ {20, 3, 3},
+ {100, 3, 10},
+ {100, 3, 100},
+ {100, 3, 1000},
+ {100, 3, 10000},
+ {100, 3, 100000},
+ {100, 3, 1000000}
+ };
+
+ private List<Integer> convertIntArray(int[] from) {
+ List<Integer> ret = new ArrayList<Integer>(from.length);
+ for (int v : from) {
+ ret.add(v);
+ }
+ return ret;
+ }
+
+ private void testRandomSelectSelector(int niter, int m, int n) {
+ RandomAlgorithms.Selector selector = new RandomAlgorithms.Selector(n,
+ (double) m / n, new Random());
+ Map<List<Integer>, Integer> results = new HashMap<List<Integer>, Integer>(
+ niter);
+ for (int i = 0; i < niter; ++i, selector.reset()) {
+ int[] result = new int[m];
+ for (int j = 0; j < m; ++j) {
+ int v = selector.next();
+ if (v < 0)
+ break;
+ result[j]=v;
+ }
+ Arrays.sort(result);
+ List<Integer> resultAsList = convertIntArray(result);
+ Integer count = results.get(resultAsList);
+ if (count == null) {
+ results.put(resultAsList, 1);
+ } else {
+ results.put(resultAsList, ++count);
+ }
+ }
+
+ verifyResults(results, m, n);
+ }
+
+ private void testRandomSelect(int niter, int m, int n) {
+ Random random = new Random();
+ Map<List<Integer>, Integer> results = new HashMap<List<Integer>, Integer>(
+ niter);
+ for (int i = 0; i < niter; ++i) {
+ int[] result = RandomAlgorithms.select(m, n, random);
+ Arrays.sort(result);
+ List<Integer> resultAsList = convertIntArray(result);
+ Integer count = results.get(resultAsList);
+ if (count == null) {
+ results.put(resultAsList, 1);
+ } else {
+ results.put(resultAsList, ++count);
+ }
+ }
+
+ verifyResults(results, m, n);
+ }
+
+ private void verifyResults(Map<List<Integer>, Integer> results, int m, int n) {
+ if (n>=10) {
+ assertTrue(results.size() >= Math.min(m, 2));
+ }
+ for (List<Integer> result : results.keySet()) {
+ assertEquals(m, result.size());
+ Set<Integer> seen = new HashSet<Integer>();
+ for (int v : result) {
+ System.out.printf("%d ", v);
+ assertTrue((v >= 0) && (v < n));
+ assertTrue(seen.add(v));
+ }
+ System.out.printf(" ==> %d\n", results.get(result));
+ }
+ System.out.println("====");
+ }
+
+ @Test
+ public void testRandomSelect() {
+ for (int[] param : parameters) {
+ testRandomSelect(param[0], param[1], param[2]);
+ }
+ }
+
+ @Test
+ public void testRandomSelectSelector() {
+ for (int[] param : parameters) {
+ testRandomSelectSelector(param[0], param[1], param[2]);
+ }
+ }
+}
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java?rev=966365&r1=966364&r2=966365&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java Wed Jul 21 19:06:06 2010
@@ -24,7 +24,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Level;
@@ -34,6 +38,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@@ -128,6 +133,31 @@ public class TestSleepJob {
}
@Test
+ public void testRandomLocationSubmit() throws Exception {
+ policy = GridmixJobSubmissionPolicy.STRESS;
+ System.out.println(" Random locations started at " + System.currentTimeMillis());
+ doSubmission("-D"+JobCreator.SLEEPJOB_RANDOM_LOCATIONS+"=3");
+ System.out.println(" Random locations ended at " + System.currentTimeMillis());
+ }
+
+ @Test
+ public void testMapTasksOnlySubmit() throws Exception {
+ policy = GridmixJobSubmissionPolicy.STRESS;
+ System.out.println(" Map tasks only at " + System.currentTimeMillis());
+ doSubmission("-D"+SleepJob.SLEEPJOB_MAPTASK_ONLY+"=true");
+ System.out.println(" Map tasks only ended at " + System.currentTimeMillis());
+ }
+
+ @Test
+ public void testLimitTaskSleepTimeSubmit() throws Exception {
+ policy = GridmixJobSubmissionPolicy.STRESS;
+ System.out.println(" Limit sleep time only at " + System.currentTimeMillis());
+ doSubmission("-D" + SleepJob.GRIDMIX_SLEEP_MAX_MAP_TIME + "=100", "-D"
+ + SleepJob.GRIDMIX_SLEEP_MAX_REDUCE_TIME + "=200");
+ System.out.println(" Limit sleep time ended at " + System.currentTimeMillis());
+ }
+
+ @Test
public void testStressSubmit() throws Exception {
policy = GridmixJobSubmissionPolicy.STRESS;
System.out.println(" Stress started at " + System.currentTimeMillis());
@@ -143,25 +173,83 @@ public class TestSleepJob {
System.out.println("Serial ended at " + System.currentTimeMillis());
}
-
- private void doSubmission() throws Exception {
+ @Test
+ public void testRandomLocation() throws Exception {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ // testRandomLocation(0, 10, ugi);
+ testRandomLocation(1, 10, ugi);
+ testRandomLocation(2, 10, ugi);
+ }
+
+ private void testRandomLocation(int locations, int njobs, UserGroupInformation ugi) throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(JobCreator.SLEEPJOB_RANDOM_LOCATIONS, locations);
+ DebugJobProducer jobProducer = new DebugJobProducer(njobs, conf);
+ JobConf jconf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
+ JobStory story;
+ int seq=1;
+ while ((story = jobProducer.getNextJob()) != null) {
+ GridmixJob gridmixJob = JobCreator.SLEEPJOB.createGridmixJob(jconf, 0,
+ story, new Path("ignored"), ugi, seq++);
+ gridmixJob.buildSplits(null);
+ List<InputSplit> splits = new SleepJob.SleepInputFormat()
+ .getSplits(gridmixJob.getJob());
+ for (InputSplit split : splits) {
+ assertEquals(locations, split.getLocations().length);
+ }
+ }
+ }
+
+ @Test
+ public void testMapTasksOnlySleepJobs()
+ throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(SleepJob.SLEEPJOB_MAPTASK_ONLY, true);
+ DebugJobProducer jobProducer = new DebugJobProducer(5, conf);
+ JobConf jconf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ JobStory story;
+ int seq = 1;
+ while ((story = jobProducer.getNextJob()) != null) {
+ GridmixJob gridmixJob = JobCreator.SLEEPJOB.createGridmixJob(jconf, 0,
+ story, new Path("ignored"), ugi, seq++);
+ gridmixJob.buildSplits(null);
+ Job job = gridmixJob.call();
+ assertEquals(0, job.getNumReduceTasks());
+ }
+ }
+
+ private void doSubmission(String...optional) throws Exception {
final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
final Path out = GridmixTestUtils.DEST.makeQualified(GridmixTestUtils.dfs);
final Path root = new Path("/user");
Configuration conf = null;
try {
- final String[] argv = {"-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
- "-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
- "-D" + Gridmix.GRIDMIX_USR_RSV + "="
- + EchoUserResolver.class.getName(),
- "-D" + JobCreator.GRIDMIX_JOB_TYPE + "="
- + JobCreator.SLEEPJOB.name(),
- "-D" + SleepJob.GRIDMIX_SLEEP_INTERVAL +"=" +"10",
- "-generate",
- String.valueOf(GENDATA) + "m", in.toString(),
- "-"
- // ignored by DebugGridmix
+ // required options
+ final String[] required = {
+ "-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
+ "-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
+ "-D" + Gridmix.GRIDMIX_USR_RSV + "=" + EchoUserResolver.class.getName(),
+ "-D" + JobCreator.GRIDMIX_JOB_TYPE + "=" + JobCreator.SLEEPJOB.name(),
+ "-D" + SleepJob.GRIDMIX_SLEEP_INTERVAL + "=" + "10"
+ };
+ // mandatory arguments
+ final String[] mandatory = {
+ "-generate",String.valueOf(GENDATA) + "m", in.toString(), "-"
+ // ignored by DebugGridmix
};
+
+ ArrayList<String> argv = new ArrayList<String>(required.length+optional.length+mandatory.length);
+ for (String s : required) {
+ argv.add(s);
+ }
+ for (String s : optional) {
+ argv.add(s);
+ }
+ for (String s : mandatory) {
+ argv.add(s);
+ }
+
DebugGridmix client = new DebugGridmix();
conf = new Configuration();
conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy);
@@ -169,7 +257,12 @@ public class TestSleepJob {
// allow synthetic users to create home directories
GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short) 0777));
GridmixTestUtils.dfs.setPermission(root, new FsPermission((short) 0777));
- int res = ToolRunner.run(conf, client, argv);
+ String[] args = argv.toArray(new String[argv.size()]);
+ System.out.println("Command line arguments:");
+ for (int i=0; i<args.length; ++i) {
+ System.out.printf(" [%d] %s\n", i, args[i]);
+ }
+ int res = ToolRunner.run(conf, client, args);
assertEquals("Client exited with nonzero status", 0, res);
client.checkMonitor();
} catch (Exception e) {