You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/07/14 11:22:30 UTC
svn commit: r963986 [2/2] - in /hadoop/mapreduce/trunk: ./
src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/
src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java?rev=963986&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java Wed Jul 14 09:22:29 2010
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class SleepJob extends GridmixJob {
+ public static final Log LOG = LogFactory.getLog(SleepJob.class);
+
+ /**
+ * Interval at which to report progress, in seconds.
+ */
+ public static final String GRIDMIX_SLEEP_INTERVAL = "gridmix.sleep.interval";
+
+ public SleepJob(
+ Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
+ UserGroupInformation ugi, int seq) throws IOException {
+ super(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
+ }
+
+ @Override
+ public Job call()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ ugi.doAs(
+ new PrivilegedExceptionAction<Job>() {
+ public Job run()
+ throws IOException, ClassNotFoundException, InterruptedException {
+ job.setMapperClass(SleepMapper.class);
+ job.setReducerClass(SleepReducer.class);
+ job.setNumReduceTasks(jobdesc.getNumberReduces());
+ job.setMapOutputKeyClass(GridmixKey.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setSortComparatorClass(GridmixKey.Comparator.class);
+ job.setGroupingComparatorClass(SpecGroupingComparator.class);
+ job.setInputFormatClass(SleepInputFormat.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setPartitionerClass(DraftPartitioner.class);
+ job.setJarByClass(SleepJob.class);
+ job.getConfiguration().setBoolean(Job.USED_GENERIC_PARSER, true);
+ job.submit();
+ return job;
+
+ }
+ });
+
+ return job;
+ }
+
+ public static class SleepMapper
+ extends Mapper<LongWritable, LongWritable, GridmixKey, NullWritable> {
+
+ @Override
+ public void map(LongWritable key, LongWritable value, Context context)
+ throws IOException, InterruptedException {
+ context.setStatus("Sleeping... " + value.get() + " ms left");
+ long now = System.currentTimeMillis();
+ if (now < key.get()) {
+ TimeUnit.MILLISECONDS.sleep(key.get() - now);
+ }
+ }
+
+ @Override
+ public void cleanup(Context context)
+ throws IOException, InterruptedException {
+ final int nReds = context.getNumReduceTasks();
+ if (nReds > 0) {
+ final SleepSplit split = (SleepSplit) context.getInputSplit();
+ int id = split.getId();
+ final int nMaps = split.getNumMaps();
+ //This is a hack to pass the sleep duration via Gridmix key
+ //TODO: We need to come up with better solution for this.
+
+ final GridmixKey key = new GridmixKey(GridmixKey.REDUCE_SPEC, 0, 0L);
+ for (int i = id, idx = 0; i < nReds; i += nMaps) {
+ key.setPartition(i);
+ key.setReduceOutputBytes(split.getReduceDurations(idx++));
+ id += nReds;
+ context.write(key, NullWritable.get());
+ }
+ }
+ }
+
+ }
+
+ public static class SleepReducer
+ extends Reducer<GridmixKey, NullWritable, NullWritable, NullWritable> {
+
+ private long duration = 0L;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ if (!context.nextKey() ||
+ context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
+ throw new IOException("Missing reduce spec");
+ }
+ for (NullWritable ignored : context.getValues()) {
+ final GridmixKey spec = context.getCurrentKey();
+ duration += spec.getReduceOutputBytes();
+ }
+ long sleepInterval =
+ context.getConfiguration().getLong(GRIDMIX_SLEEP_INTERVAL, 5);
+ final long RINTERVAL =
+ TimeUnit.MILLISECONDS.convert(sleepInterval, TimeUnit.SECONDS);
+ //This is to stop accumulating deviation from expected sleep time
+ //over a period of time.
+ long start = System.currentTimeMillis();
+ long slept = 0L;
+ long sleep = 0L;
+ while (slept < duration) {
+ final long rem = duration - slept;
+ sleep = Math.min(rem, RINTERVAL);
+ context.setStatus("Sleeping... " + rem + " ms left");
+ TimeUnit.MILLISECONDS.sleep(sleep);
+ slept = System.currentTimeMillis() - start;
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ final String msg = "Slept for " + duration;
+ LOG.info(msg);
+ context.setStatus(msg);
+ }
+ }
+
+ public static class SleepInputFormat
+ extends InputFormat<LongWritable, LongWritable> {
+
+ @Override
+ public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+ return pullDescription(jobCtxt);
+ }
+
+ @Override
+ public RecordReader<LongWritable, LongWritable> createRecordReader(
+ InputSplit split, final TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ final long duration = split.getLength();
+ long sleepInterval =
+ context.getConfiguration().getLong(GRIDMIX_SLEEP_INTERVAL, 5);
+ final long RINTERVAL =
+ TimeUnit.MILLISECONDS.convert(sleepInterval, TimeUnit.SECONDS);
+ if (RINTERVAL <= 0) {
+ throw new IOException(
+ "Invalid " + GRIDMIX_SLEEP_INTERVAL + ": " + RINTERVAL);
+ }
+ return new RecordReader<LongWritable, LongWritable>() {
+ long start = -1;
+ long slept = 0L;
+ long sleep = 0L;
+ final LongWritable key = new LongWritable();
+ final LongWritable val = new LongWritable();
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ if (start == -1) {
+ start = System.currentTimeMillis();
+ }
+ slept += sleep;
+ sleep = Math.min(duration - slept, RINTERVAL);
+ key.set(slept + sleep + start);
+ val.set(duration - slept);
+ return slept < duration;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return slept / ((float) duration);
+ }
+
+ @Override
+ public LongWritable getCurrentKey() {
+ return key;
+ }
+
+ @Override
+ public LongWritable getCurrentValue() {
+ return val;
+ }
+
+ @Override
+ public void close() throws IOException {
+ final String msg = "Slept for " + duration;
+ LOG.info(msg);
+ }
+
+ public void initialize(InputSplit split, TaskAttemptContext ctxt) {
+ }
+ };
+ }
+ }
+
+ public static class SleepSplit extends InputSplit implements Writable {
+ private int id;
+ private int nSpec;
+ private int nMaps;
+ private long sleepDuration;
+ private long[] reduceDurations = new long[0];
+ private String[] locations = new String[0];
+
+ public SleepSplit() {
+ }
+
+ public SleepSplit(
+ int id, long sleepDuration, long[] reduceDurations, int nMaps,
+ String[] locations) {
+ this.id = id;
+ this.sleepDuration = sleepDuration;
+ nSpec = reduceDurations.length;
+ this.reduceDurations = reduceDurations;
+ this.nMaps = nMaps;
+ this.locations = locations;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, id);
+ WritableUtils.writeVLong(out, sleepDuration);
+ WritableUtils.writeVInt(out, nMaps);
+ WritableUtils.writeVInt(out, nSpec);
+ for (int i = 0; i < nSpec; ++i) {
+ WritableUtils.writeVLong(out, reduceDurations[i]);
+ }
+ WritableUtils.writeVInt(out, locations.length);
+ for (int i = 0; i < locations.length; ++i) {
+ Text.writeString(out, locations[i]);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = WritableUtils.readVInt(in);
+ sleepDuration = WritableUtils.readVLong(in);
+ nMaps = WritableUtils.readVInt(in);
+ nSpec = WritableUtils.readVInt(in);
+ if (reduceDurations.length < nSpec) {
+ reduceDurations = new long[nSpec];
+ }
+ for (int i = 0; i < nSpec; ++i) {
+ reduceDurations[i] = WritableUtils.readVLong(in);
+ }
+ final int nLoc = WritableUtils.readVInt(in);
+ if (nLoc != locations.length) {
+ locations = new String[nLoc];
+ }
+ for (int i = 0; i < nLoc; ++i) {
+ locations[i] = Text.readString(in);
+ }
+ }
+
+ @Override
+ public long getLength() {
+ return sleepDuration;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public int getNumMaps() {
+ return nMaps;
+ }
+
+ public long getReduceDurations(int i) {
+ return reduceDurations[i];
+ }
+
+ @Override
+ public String[] getLocations() {
+ return locations;
+ }
+ }
+
+ private TaskAttemptInfo getSuccessfulAttemptInfo(TaskType type, int task) {
+ TaskAttemptInfo ret;
+ for (int i = 0; true; ++i) {
+ // Rumen should make up an attempt if it's missing. Or this won't work
+ // at all. It's hard to discern what is happening in there.
+ ret = jobdesc.getTaskAttemptInfo(type, task, i);
+ if (ret.getRunState() == TaskStatus.State.SUCCEEDED) {
+ break;
+ }
+ }
+ if(ret.getRunState() != TaskStatus.State.SUCCEEDED) {
+ LOG.warn("No sucessful attempts tasktype " + type +" task "+ task);
+ }
+
+ return ret;
+ }
+
+ @Override
+ void buildSplits(FilePool inputDir) throws IOException {
+ final List<InputSplit> splits = new ArrayList<InputSplit>();
+ final int reds = jobdesc.getNumberReduces();
+ final int maps = jobdesc.getNumberMaps();
+ for (int i = 0; i < maps; ++i) {
+ final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
+ final long[] redDurations = new long[nSpec];
+ for (int j = 0; j < nSpec; ++j) {
+ final ReduceTaskAttemptInfo info =
+ (ReduceTaskAttemptInfo) getSuccessfulAttemptInfo(TaskType.REDUCE,
+ i + j * maps);
+ // Include only merge/reduce time
+ redDurations[j] = info.getMergeRuntime() + info.getReduceRuntime();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ String.format(
+ "SPEC(%d) %d -> %d %d/%d", id(), i, i + j * maps, redDurations[j],
+ info.getRuntime()));
+ }
+ }
+ final TaskAttemptInfo info = getSuccessfulAttemptInfo(TaskType.MAP, i);
+ splits.add(new SleepSplit(i, info.getRuntime(), redDurations, maps,
+ new String[0]));
+ }
+ pushDescription(id(), splits);
+ }
+}
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java?rev=963986&r1=963985&r2=963986&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java Wed Jul 14 09:22:29 2010
@@ -20,14 +20,21 @@ package org.apache.hadoop.mapred.gridmix
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.gridmix.Gridmix.Component;
-import org.apache.hadoop.mapreduce.Cluster;
-import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.rumen.JobStory;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@@ -45,7 +52,7 @@ public class Statistics implements Compo
public static final Log LOG = LogFactory.getLog(Statistics.class);
private final StatCollector statistics = new StatCollector();
- private final Cluster cluster;
+ private JobClient cluster;
//List of cluster status listeners.
private final List<StatListener<ClusterStats>> clusterStatlisteners =
@@ -55,6 +62,10 @@ public class Statistics implements Compo
private final List<StatListener<JobStats>> jobStatListeners =
new ArrayList<StatListener<JobStats>>();
+ //List of jobids and noofMaps for each job
+ private static final Map<Integer, JobStats> jobMaps =
+ new ConcurrentHashMap<Integer,JobStats>();
+
private int completedJobsInCurrentInterval = 0;
private final int jtPollingInterval;
private volatile boolean shutdown = false;
@@ -66,15 +77,40 @@ public class Statistics implements Compo
private final CountDownLatch startFlag;
public Statistics(
- Configuration conf, int pollingInterval, CountDownLatch startFlag)
- throws IOException {
- this.cluster = new Cluster(conf);
+ final Configuration conf, int pollingInterval, CountDownLatch startFlag)
+ throws IOException, InterruptedException {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ this.cluster = ugi.doAs(new PrivilegedExceptionAction<JobClient>() {
+ public JobClient run() throws IOException {
+ return new JobClient(new JobConf(conf));
+ }
+ });
+
this.jtPollingInterval = pollingInterval;
maxJobCompletedInInterval = conf.getInt(
MAX_JOBS_COMPLETED_IN_POLL_INTERVAL_KEY, 1);
this.startFlag = startFlag;
}
+ public void addJobStats(Job job, JobStory jobdesc) {
+ int seq = GridmixJob.getJobSeqId(job);
+ if (seq < 0) {
+ LOG.info("Not tracking job " + job.getJobName()
+ + " as seq id is less than zero: " + seq);
+ return;
+ }
+
+ int maps = 0;
+ if (jobdesc == null) {
+ throw new IllegalArgumentException(
+ " JobStory not available for job " + job.getJobName());
+ } else {
+ maps = jobdesc.getNumberMaps();
+ }
+ JobStats stats = new JobStats(maps,job);
+ jobMaps.put(seq,stats);
+ }
+
/**
* Used by JobMonitor to add the completed job.
*/
@@ -86,6 +122,10 @@ public class Statistics implements Compo
if (!statistics.isAlive()) {
return;
}
+ JobStats stat = jobMaps.remove(GridmixJob.getJobSeqId(job));
+
+ if (stat == null) return;
+
completedJobsInCurrentInterval++;
//check if we have reached the maximum level of job completions.
if (completedJobsInCurrentInterval >= maxJobCompletedInInterval) {
@@ -98,12 +138,8 @@ public class Statistics implements Compo
lock.lock();
try {
//Job is completed notify all the listeners.
- if (jobStatListeners.size() > 0) {
- for (StatListener<JobStats> l : jobStatListeners) {
- JobStats stats = new JobStats();
- stats.setCompleteJob(job);
- l.update(stats);
- }
+ for (StatListener<JobStats> l : jobStatListeners) {
+ l.update(stat);
}
this.jobCompleted.signalAll();
} finally {
@@ -165,58 +201,26 @@ public class Statistics implements Compo
// only if there are clusterStats listener.
if (clusterStatlisteners.size() > 0) {
try {
- ClusterMetrics clusterStatus = cluster.getClusterStatus();
- Job[] allJobs = cluster.getAllJobs();
- List<Job> runningWaitingJobs = getRunningWaitingJobs(allJobs);
- updateAndNotifyClusterStatsListeners(
- clusterStatus, runningWaitingJobs);
+ ClusterStatus clusterStatus = cluster.getClusterStatus();
+ updateAndNotifyClusterStatsListeners(clusterStatus);
} catch (IOException e) {
LOG.error(
"Statistics io exception while polling JT ", e);
return;
- } catch (InterruptedException e) {
- LOG.error(
- "Statistics interrupt exception while polling JT ", e);
- return;
}
}
}
}
private void updateAndNotifyClusterStatsListeners(
- ClusterMetrics clusterMetrics, List<Job> runningWaitingJobs) {
+ ClusterStatus clusterStatus) {
ClusterStats stats = ClusterStats.getClusterStats();
- stats.setClusterMetric(clusterMetrics);
- stats.setRunningWaitingJobs(runningWaitingJobs);
+ stats.setClusterMetric(clusterStatus);
for (StatListener<ClusterStats> listener : clusterStatlisteners) {
listener.update(stats);
}
}
-
- /**
- * From the list of Jobs , give the list of jobs whoes state is eigther
- * PREP or RUNNING.
- *
- * @param allJobs
- * @return
- * @throws java.io.IOException
- * @throws InterruptedException
- */
- private List<Job> getRunningWaitingJobs(Job[] allJobs)
- throws IOException, InterruptedException {
- List<Job> result = new ArrayList<Job>();
- for (Job job : allJobs) {
- //TODO Check if job.getStatus() makes a rpc call
- org.apache.hadoop.mapreduce.JobStatus.State state =
- job.getStatus().getState();
- if (org.apache.hadoop.mapreduce.JobStatus.State.PREP.equals(state) ||
- org.apache.hadoop.mapreduce.JobStatus.State.RUNNING.equals(state)) {
- result.add(job);
- }
- }
- return result;
- }
}
/**
@@ -231,6 +235,7 @@ public class Statistics implements Compo
@Override
public void shutdown() {
shutdown = true;
+ jobMaps.clear();
clusterStatlisteners.clear();
jobStatListeners.clear();
statistics.interrupt();
@@ -239,6 +244,7 @@ public class Statistics implements Compo
@Override
public void abort() {
shutdown = true;
+ jobMaps.clear();
clusterStatlisteners.clear();
jobStatListeners.clear();
statistics.interrupt();
@@ -250,21 +256,31 @@ public class Statistics implements Compo
* TODO: In future we need to extend this to send more information.
*/
static class JobStats {
- private Job completedJob;
+ private int noOfMaps;
+ private Job job;
- public Job getCompleteJob() {
- return completedJob;
+ public JobStats(int noOfMaps,Job job){
+ this.job = job;
+ this.noOfMaps = noOfMaps;
+ }
+ public int getNoOfMaps() {
+ return noOfMaps;
}
- public void setCompleteJob(Job job) {
- this.completedJob = job;
+ /**
+ * Returns the job ,
+ * We should not use job.getJobID it returns null in 20.1xx.
+ * Use (GridmixJob.getJobSeqId(job)) instead
+ * @return job
+ */
+ public Job getJob() {
+ return job;
}
}
static class ClusterStats {
- private ClusterMetrics status = null;
+ private ClusterStatus status = null;
private static ClusterStats stats = new ClusterStats();
- private List<Job> runningWaitingJobs;
private ClusterStats() {
@@ -280,26 +296,26 @@ public class Statistics implements Compo
/**
* @param metrics
*/
- void setClusterMetric(ClusterMetrics metrics) {
+ void setClusterMetric(ClusterStatus metrics) {
this.status = metrics;
}
/**
* @return metrics
*/
- public ClusterMetrics getStatus() {
+ public ClusterStatus getStatus() {
return status;
}
+ int getNumRunningJob() {
+ return jobMaps.size();
+ }
+
/**
* @return runningWatitingJobs
*/
- public List<Job> getRunningWaitingJobs() {
- return runningWaitingJobs;
- }
-
- public void setRunningWaitingJobs(List<Job> runningWaitingJobs) {
- this.runningWaitingJobs = runningWaitingJobs;
+ static Collection<JobStats> getRunningJobStats() {
+ return jobMaps.values();
}
}
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java?rev=963986&r1=963985&r2=963986&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java Wed Jul 14 09:22:29 2010
@@ -22,30 +22,50 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.ClusterMetrics;
-import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
import java.io.IOException;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
public static final Log LOG = LogFactory.getLog(StressJobFactory.class);
- private LoadStatus loadStatus = new LoadStatus();
- private List<Job> runningWaitingJobs;
- private final Condition overloaded = this.lock.newCondition();
+ private final LoadStatus loadStatus = new LoadStatus();
+ private final Condition condUnderloaded = this.lock.newCondition();
/**
* The minimum ratio between pending+running map tasks (aka. incomplete map
* tasks) and cluster map slot capacity for us to consider the cluster is
* overloaded. For running maps, we only count them partially. Namely, a 40%
* completed map is counted as 0.6 map tasks in our calculation.
*/
- static final float OVERLAOD_MAPTASK_MAPSLOT_RATIO = 2.0f;
+ static final float OVERLOAD_MAPTASK_MAPSLOT_RATIO = 2.0f;
+
+ /**
+ * The minimum ratio between pending+running reduce tasks (aka. incomplete
+ * reduce tasks) and cluster reduce slot capacity for us to consider the
+ * cluster is overloaded. For running reduces, we only count them partially.
+ * Namely, a 40% completed reduce is counted as 0.6 reduce tasks in our
+ * calculation.
+ */
+ static final float OVERLOAD_REDUCETASK_REDUCESLOT_RATIO = 2.5f;
+
+ /**
+ * The maximum share of the cluster's mapslot capacity that can be counted
+ * toward a job's incomplete map tasks in overload calculation.
+ */
+ static final float MAX_MAPSLOT_SHARE_PER_JOB=0.1f;
+
+ /**
+ * The maximum share of the cluster's reduceslot capacity that can be counted
+ * toward a job's incomplete reduce tasks in overload calculation.
+ */
+ static final float MAX_REDUCESLOT_SHARE_PER_JOB=0.1f;
/**
* Creating a new instance does not start the thread.
@@ -60,14 +80,10 @@ public class StressJobFactory extends Jo
*/
public StressJobFactory(
JobSubmitter submitter, JobStoryProducer jobProducer, Path scratch,
- Configuration conf, CountDownLatch startFlag) throws IOException {
+ Configuration conf, CountDownLatch startFlag, UserResolver resolver)
+ throws IOException {
super(
- submitter, jobProducer, scratch, conf, startFlag);
-
- //Setting isOverloaded as true , now JF would wait for atleast first
- //set of ClusterStats based on which it can decide how many job it has
- //to submit.
- this.loadStatus.isOverloaded = true;
+ submitter, jobProducer, scratch, conf, startFlag, resolver);
}
public Thread createReaderThread() {
@@ -104,33 +120,39 @@ public class StressJobFactory extends Jo
while (!Thread.currentThread().isInterrupted()) {
lock.lock();
try {
- while (loadStatus.isOverloaded) {
+ while (loadStatus.overloaded()) {
//Wait while JT is overloaded.
try {
- overloaded.await();
+ condUnderloaded.await();
} catch (InterruptedException ie) {
return;
}
}
- int noOfSlotsAvailable = loadStatus.numSlotsBackfill;
- LOG.info(" No of slots to be backfilled are " + noOfSlotsAvailable);
-
- for (int i = 0; i < noOfSlotsAvailable; i++) {
+ while (!loadStatus.overloaded()) {
try {
final JobStory job = getNextJobFiltered();
if (null == job) {
return;
}
- //TODO: We need to take care of scenario when one map takes more
- //than 1 slot.
- i += job.getNumberMaps();
-
submitter.add(
- new GridmixJob(
- conf, 0L, job, scratch, sequence.getAndIncrement()));
+ jobCreator.createGridmixJob(
+ conf, 0L, job, scratch,
+ userResolver.getTargetUgi(
+ UserGroupInformation.createRemoteUser(job.getUser())),
+ sequence.getAndIncrement()));
+ // TODO: We need to take care of scenario when one map/reduce
+ // takes more than 1 slot.
+ loadStatus.mapSlotsBackfill -=
+ calcEffectiveIncompleteMapTasks(
+ loadStatus.mapSlotCapacity, job.getNumberMaps(), 0.0f);
+ loadStatus.reduceSlotsBackfill -=
+ calcEffectiveIncompleteReduceTasks(
+ loadStatus.reduceSlotCapacity, job.getNumberReduces(),
+ 0.0f);
+ --loadStatus.numJobsBackfill;
} catch (IOException e) {
- LOG.error(" EXCEPTOIN in availableSlots ", e);
+ LOG.error("Error while submitting the job ", e);
error = e;
return;
}
@@ -149,7 +171,6 @@ public class StressJobFactory extends Jo
}
/**
- * <p/>
* STRESS Once you get the notification from StatsCollector.Collect the
* clustermetrics. Update current loadStatus with new load status of JT.
*
@@ -159,98 +180,145 @@ public class StressJobFactory extends Jo
public void update(Statistics.ClusterStats item) {
lock.lock();
try {
- ClusterMetrics clusterMetrics = item.getStatus();
- LoadStatus newStatus;
- runningWaitingJobs = item.getRunningWaitingJobs();
- newStatus = checkLoadAndGetSlotsToBackfill(clusterMetrics);
- loadStatus.isOverloaded = newStatus.isOverloaded;
- loadStatus.numSlotsBackfill = newStatus.numSlotsBackfill;
- overloaded.signalAll();
+ ClusterStatus clusterMetrics = item.getStatus();
+ try {
+ checkLoadAndGetSlotsToBackfill(item,clusterMetrics);
+ } catch (Exception e) {
+ LOG.error("Couldn't get the new Status",e);
+ }
+ if (!loadStatus.overloaded()) {
+ condUnderloaded.signalAll();
+ }
} finally {
lock.unlock();
}
}
+ static float calcEffectiveIncompleteMapTasks(int mapSlotCapacity,
+ int numMaps, float mapProgress) {
+ float maxEffIncompleteMapTasks =
+ Math.max(1.0f, mapSlotCapacity * MAX_MAPSLOT_SHARE_PER_JOB);
+ float mapProgressAdjusted = Math.max(Math.min(mapProgress, 1.0f), 0.0f);
+ return Math.min(maxEffIncompleteMapTasks,
+ numMaps * (1.0f - mapProgressAdjusted));
+ }
+
+ static float calcEffectiveIncompleteReduceTasks(int reduceSlotCapacity,
+ int numReduces, float reduceProgress) {
+ float maxEffIncompleteReduceTasks =
+ Math.max(1.0f, reduceSlotCapacity * MAX_REDUCESLOT_SHARE_PER_JOB);
+ float reduceProgressAdjusted =
+ Math.max(Math.min(reduceProgress, 1.0f), 0.0f);
+ return Math.min(maxEffIncompleteReduceTasks,
+ numReduces * (1.0f - reduceProgressAdjusted));
+ }
+
/**
* We try to use some light-weight mechanism to determine cluster load.
*
- * @param clusterStatus
- * @return Whether, from job client perspective, the cluster is overloaded.
+ * @param stats
+ * @param clusterStatus Cluster status
+ * @throws java.io.IOException
*/
- private LoadStatus checkLoadAndGetSlotsToBackfill(
- ClusterMetrics clusterStatus) {
- LoadStatus loadStatus = new LoadStatus();
- // If there are more jobs than number of task trackers, we assume the
- // cluster is overloaded. This is to bound the memory usage of the
- // simulator job tracker, in situations where we have jobs with small
- // number of map tasks and large number of reduce tasks.
- if (runningWaitingJobs.size() >= clusterStatus.getTaskTrackerCount()) {
+ private void checkLoadAndGetSlotsToBackfill(
+ ClusterStats stats, ClusterStatus clusterStatus) throws IOException, InterruptedException {
+ loadStatus.mapSlotCapacity = clusterStatus.getMaxMapTasks();
+ loadStatus.reduceSlotCapacity = clusterStatus.getMaxReduceTasks();
+
+
+ loadStatus.numJobsBackfill =
+ clusterStatus.getTaskTrackers() - stats.getNumRunningJob();
+ if (loadStatus.numJobsBackfill <= 0) {
if (LOG.isDebugEnabled()) {
- LOG.debug(
- System.currentTimeMillis() + " Overloaded is " +
- Boolean.TRUE.toString() + " #runningJobs >= taskTrackerCount (" +
- runningWaitingJobs.size() + " >= " +
- clusterStatus.getTaskTrackerCount() + " )\n");
- }
- loadStatus.isOverloaded = true;
- loadStatus.numSlotsBackfill = 0;
- return loadStatus;
+ LOG.debug(System.currentTimeMillis() + " Overloaded is "
+ + Boolean.TRUE.toString() + " NumJobsBackfill is "
+ + loadStatus.numJobsBackfill);
+ }
+ return; // stop calculation because we know it is overloaded.
}
float incompleteMapTasks = 0; // include pending & running map tasks.
- for (Job job : runningWaitingJobs) {
- try{
- incompleteMapTasks += (1 - Math.min(
- job.getStatus().getMapProgress(), 1.0)) *
- ((JobConf) job.getConfiguration()).getNumMapTasks();
- }catch(IOException io) {
- //There might be issues with this current job
- //try others
- LOG.error(" Error while calculating load " , io);
- continue;
- }catch(InterruptedException ie) {
- //There might be issues with this current job
- //try others
- LOG.error(" Error while calculating load " , ie);
- continue;
+ for (JobStats job : ClusterStats.getRunningJobStats()) {
+ float mapProgress = job.getJob().mapProgress();
+ int noOfMaps = job.getNoOfMaps();
+ incompleteMapTasks +=
+ calcEffectiveIncompleteMapTasks(
+ clusterStatus.getMaxMapTasks(), noOfMaps, mapProgress);
+ }
+ loadStatus.mapSlotsBackfill =
+ (int) ((OVERLOAD_MAPTASK_MAPSLOT_RATIO * clusterStatus.getMaxMapTasks())
+ - incompleteMapTasks);
+ if (loadStatus.mapSlotsBackfill <= 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(System.currentTimeMillis() + " Overloaded is "
+ + Boolean.TRUE.toString() + " MapSlotsBackfill is "
+ + loadStatus.mapSlotsBackfill);
}
+ return; // stop calculation because we know it is overloaded.
}
- float overloadedThreshold =
- OVERLAOD_MAPTASK_MAPSLOT_RATIO * clusterStatus.getMapSlotCapacity();
- boolean overloaded = incompleteMapTasks > overloadedThreshold;
- String relOp = (overloaded) ? ">" : "<=";
- if (LOG.isDebugEnabled()) {
- LOG.info(
- System.currentTimeMillis() + " Overloaded is " + Boolean.toString(
- overloaded) + " incompleteMapTasks " + relOp + " " +
- OVERLAOD_MAPTASK_MAPSLOT_RATIO + "*mapSlotCapacity" + "(" +
- incompleteMapTasks + " " + relOp + " " +
- OVERLAOD_MAPTASK_MAPSLOT_RATIO + "*" +
- clusterStatus.getMapSlotCapacity() + ")");
- }
- if (overloaded) {
- loadStatus.isOverloaded = true;
- loadStatus.numSlotsBackfill = 0;
- } else {
- loadStatus.isOverloaded = false;
- loadStatus.numSlotsBackfill =
- (int) (overloadedThreshold - incompleteMapTasks);
+ float incompleteReduceTasks = 0; // include pending & running reduce tasks.
+ for (JobStats job : ClusterStats.getRunningJobStats()) {
+ int noOfReduces = job.getJob().getNumReduceTasks();
+ if (noOfReduces > 0) {
+ float reduceProgress = job.getJob().reduceProgress();
+ incompleteReduceTasks +=
+ calcEffectiveIncompleteReduceTasks(
+ clusterStatus.getMaxReduceTasks(), noOfReduces, reduceProgress);
+ }
+ }
+ loadStatus.reduceSlotsBackfill =
+ (int) ((OVERLOAD_REDUCETASK_REDUCESLOT_RATIO * clusterStatus.getMaxReduceTasks())
+ - incompleteReduceTasks);
+ if (loadStatus.reduceSlotsBackfill <= 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(System.currentTimeMillis() + " Overloaded is "
+ + Boolean.TRUE.toString() + " ReduceSlotsBackfill is "
+ + loadStatus.reduceSlotsBackfill);
+ }
+ return; // stop calculation because we know it is overloaded.
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Current load Status is " + loadStatus);
+ LOG.debug(System.currentTimeMillis() + " Overloaded is "
+ + Boolean.FALSE.toString() + "Current load Status is "
+ + loadStatus);
}
- return loadStatus;
}
static class LoadStatus {
- volatile boolean isOverloaded = false;
- volatile int numSlotsBackfill = -1;
+ int mapSlotsBackfill;
+ int mapSlotCapacity;
+ int reduceSlotsBackfill;
+ int reduceSlotCapacity;
+ int numJobsBackfill;
+ /**
+ * Construct the LoadStatus in an unknown state - assuming the cluster is
+ * overloaded by setting numSlotsBackfill=0.
+ */
+ LoadStatus() {
+ mapSlotsBackfill = 0;
+ reduceSlotsBackfill = 0;
+ numJobsBackfill = 0;
+
+ mapSlotCapacity = -1;
+ reduceSlotCapacity = -1;
+ }
+
+ public boolean overloaded() {
+ return (mapSlotsBackfill <= 0) || (reduceSlotsBackfill <= 0)
+ || (numJobsBackfill <= 0);
+ }
+
public String toString() {
- return " is Overloaded " + isOverloaded + " no of slots available " +
- numSlotsBackfill;
+ // TODO Use StringBuilder instead
+ return " Overloaded = " + overloaded()
+ + ", MapSlotBackfill = " + mapSlotsBackfill
+ + ", MapSlotCapacity = " + mapSlotCapacity
+ + ", ReduceSlotBackfill = " + reduceSlotsBackfill
+ + ", ReduceSlotCapacity = " + reduceSlotCapacity
+ + ", NumJobsBackfill = " + numJobsBackfill;
}
}
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SubmitterUserResolver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SubmitterUserResolver.java?rev=963986&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SubmitterUserResolver.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SubmitterUserResolver.java Wed Jul 14 09:22:29 2010
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Resolves all UGIs to the submitting user.
+ */
+public class SubmitterUserResolver implements UserResolver {
+ public static final Log LOG = LogFactory.getLog(SubmitterUserResolver.class);
+
+ private UserGroupInformation ugi = null;
+
+ public SubmitterUserResolver() {
+ LOG.info(" Current user resolver is SubmitterUserResolver ");
+ }
+
+ public synchronized boolean setTargetUsers(URI userdesc, Configuration conf)
+ throws IOException {
+ ugi = UserGroupInformation.getLoginUser();
+ return false;
+ }
+
+ public synchronized UserGroupInformation getTargetUgi(
+ UserGroupInformation ugi) {
+ return this.ugi;
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java?rev=963986&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java Wed Jul 14 09:22:29 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.LineReader;
+
+/**
+ * Maps users in the trace to a set of valid target users on the test cluster.
+ */
+public interface UserResolver {
+
+ /**
+ * Configure the user map given the URI and configuration. The resolver's
+ * contract will define how the resource will be interpreted, but the default
+ * will typically interpret the URI as a {@link org.apache.hadoop.fs.Path}
+ * listing target users.
+ * @param userdesc URI (possibly null) from which user information may be
+ * loaded per the subclass contract.
+ * @param conf The tool configuration.
+ * @return true if the resource provided was used in building the list of
+ * target users
+ */
+ public boolean setTargetUsers(URI userdesc, Configuration conf)
+ throws IOException;
+
+ /**
+ * Map the given UGI to another per the subclass contract.
+ * @param ugi User information from the trace.
+ */
+ public UserGroupInformation getTargetUgi(UserGroupInformation ugi);
+
+}
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=963986&r1=963985&r2=963986&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Wed Jul 14 09:22:29 2010
@@ -37,18 +37,18 @@ class DebugJobFactory {
public static JobFactory getFactory(
JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
- CountDownLatch startFlag) throws IOException {
+ CountDownLatch startFlag, UserResolver resolver) throws IOException {
GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
conf, GridmixJobSubmissionPolicy.STRESS);
- if (policy.name().equalsIgnoreCase("REPLAY")) {
+ if (policy == GridmixJobSubmissionPolicy.REPLAY) {
return new DebugReplayJobFactory(
- submitter, scratch, numJobs, conf, startFlag);
- } else if (policy.name().equalsIgnoreCase("STRESS")) {
+ submitter, scratch, numJobs, conf, startFlag, resolver);
+ } else if (policy == GridmixJobSubmissionPolicy.STRESS) {
return new DebugStressJobFactory(
- submitter, scratch, numJobs, conf, startFlag);
- } else if (policy.name().equalsIgnoreCase("SERIAL")) {
+ submitter, scratch, numJobs, conf, startFlag, resolver);
+ } else if (policy == GridmixJobSubmissionPolicy.SERIAL) {
return new DebugSerialJobFactory(
- submitter, scratch, numJobs, conf, startFlag);
+ submitter, scratch, numJobs, conf, startFlag, resolver);
}
return null;
@@ -58,10 +58,10 @@ class DebugJobFactory {
implements Debuggable {
public DebugReplayJobFactory(
JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
- CountDownLatch startFlag) throws IOException {
+ CountDownLatch startFlag, UserResolver resolver) throws IOException {
super(
submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
- startFlag);
+ startFlag, resolver);
}
@Override
@@ -75,10 +75,10 @@ class DebugJobFactory {
implements Debuggable {
public DebugSerialJobFactory(
JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
- CountDownLatch startFlag) throws IOException {
+ CountDownLatch startFlag, UserResolver resolver) throws IOException {
super(
submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
- startFlag);
+ startFlag, resolver);
}
@Override
@@ -91,10 +91,10 @@ class DebugJobFactory {
implements Debuggable {
public DebugStressJobFactory(
JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
- CountDownLatch startFlag) throws IOException {
+ CountDownLatch startFlag, UserResolver resolver) throws IOException {
super(
submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
- startFlag);
+ startFlag, resolver);
}
@Override
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java?rev=963986&r1=963985&r2=963986&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java Wed Jul 14 09:22:29 2010
@@ -17,8 +17,11 @@
*/
package org.apache.hadoop.mapred.gridmix;
+import org.apache.hadoop.mapred.TaskStatus.State;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.MapTaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
@@ -100,6 +103,7 @@ public class DebugJobProducer implements
static final int VAR_BYTES = 4 << 20;
static final int MAX_MAP = 5;
static final int MAX_RED = 3;
+ final Configuration conf;
static void initDist(
Random r, double min, int[] recs, long[] bytes, long tot_recs,
@@ -143,6 +147,7 @@ public class DebugJobProducer implements
id = seq.getAndIncrement();
name = String.format("MOCKJOB%05d", id);
+ this.conf = conf;
LOG.info(name + " (" + seed + ")");
submitTime = timestamp.addAndGet(
TimeUnit.MILLISECONDS.convert(
@@ -204,7 +209,9 @@ public class DebugJobProducer implements
@Override
public String getUser() {
- return "FOOBAR";
+ String s = String.format("foobar%d", id);
+ GridmixTestUtils.createHomeAndStagingDirectory(s, (JobConf)conf);
+ return s;
}
@Override
@@ -254,6 +261,23 @@ public class DebugJobProducer implements
@Override
public TaskAttemptInfo getTaskAttemptInfo(
TaskType taskType, int taskNumber, int taskAttemptNumber) {
+ switch (taskType) {
+ case MAP:
+ return new MapTaskAttemptInfo(
+ State.SUCCEEDED,
+ new TaskInfo(
+ m_bytesIn[taskNumber], m_recsIn[taskNumber],
+ m_bytesOut[taskNumber], m_recsOut[taskNumber], -1),
+ 100);
+
+ case REDUCE:
+ return new ReduceTaskAttemptInfo(
+ State.SUCCEEDED,
+ new TaskInfo(
+ r_bytesIn[taskNumber], r_recsIn[taskNumber],
+ r_bytesOut[taskNumber], r_recsOut[taskNumber], -1),
+ 100, 100, 100);
+ }
throw new UnsupportedOperationException();
}
@@ -270,7 +294,8 @@ public class DebugJobProducer implements
@Override
public String getQueueName() {
- return JobConf.DEFAULT_QUEUE_NAME;
+ String qName = "q" + ((id % 2) + 1);
+ return qName;
}
public static void reset() {
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java?rev=963986&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java Wed Jul 14 09:22:29 2010
@@ -0,0 +1,92 @@
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
+import org.apache.hadoop.security.Groups;
+
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class GridmixTestUtils {
+ static final Path DEST = new Path("/gridmix");
+ static FileSystem dfs = null;
+ static MiniDFSCluster dfsCluster = null;
+ static MiniMRCluster mrCluster = null;
+
+ public static void initCluster() throws IOException {
+ Configuration conf = new Configuration();
+ conf.set("mapred.queue.names", "default,q1,q2");
+ dfsCluster = new MiniDFSCluster(conf, 3, true, null);
+ dfs = dfsCluster.getFileSystem();
+ conf.set(JTConfig.JT_RETIREJOBS, "false");
+ mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1, null, null,
+ new JobConf(conf));
+ }
+
+ public static void shutdownCluster() throws IOException {
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+ }
+
+ /**
+ * Methods to generate the home directory for dummy users.
+ *
+ * @param conf
+ */
+ public static void createHomeAndStagingDirectory(String user, JobConf conf) {
+ try {
+ FileSystem fs = dfsCluster.getFileSystem();
+ String path = "/user/" + user;
+ Path homeDirectory = new Path(path);
+ if(fs.exists(homeDirectory)) {
+ fs.delete(homeDirectory,true);
+ }
+ TestGridmixSubmission.LOG.info(
+ "Creating Home directory : " + homeDirectory);
+ fs.mkdirs(homeDirectory);
+ changePermission(user,homeDirectory, fs);
+ Path stagingArea =
+ new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
+ "/tmp/hadoop/mapred/staging"));
+ TestGridmixSubmission.LOG.info(
+ "Creating Staging root directory : " + stagingArea);
+ fs.mkdirs(stagingArea);
+ fs.setPermission(stagingArea, new FsPermission((short) 0777));
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ }
+
+ static void changePermission(String user, Path homeDirectory, FileSystem fs)
+ throws IOException {
+ fs.setOwner(homeDirectory, user, "");
+ }
+}
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=963986&r1=963985&r2=963986&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Wed Jul 14 09:22:29 2010
@@ -28,68 +28,59 @@ import java.util.concurrent.LinkedBlocki
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskCounter;
-import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.util.ToolRunner;
-import static org.apache.hadoop.mapreduce.TaskCounter.*;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.log4j.Level;
+import static org.apache.hadoop.mapreduce.TaskCounter.MAP_INPUT_RECORDS;
+import static org.apache.hadoop.mapreduce.TaskCounter.MAP_OUTPUT_BYTES;
+import static org.apache.hadoop.mapreduce.TaskCounter.MAP_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapreduce.TaskCounter.REDUCE_INPUT_RECORDS;
+import static org.apache.hadoop.mapreduce.TaskCounter.REDUCE_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapreduce.TaskCounter.REDUCE_SHUFFLE_BYTES;
+import static org.apache.hadoop.mapreduce.TaskCounter.SPLIT_RAW_BYTES;
+
public class TestGridmixSubmission {
static GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.REPLAY;
+ public static final Log LOG = LogFactory.getLog(Gridmix.class);
{
((Log4JLogger)LogFactory.getLog("org.apache.hadoop.mapred.gridmix")
).getLogger().setLevel(Level.DEBUG);
}
- private static FileSystem dfs = null;
- private static MiniDFSCluster dfsCluster = null;
- private static MiniMRCluster mrCluster = null;
-
- private static final int NJOBS = 2;
- private static final long GENDATA = 50; // in megabytes
+ private static final int NJOBS = 3;
+ private static final long GENDATA = 30; // in megabytes
private static final int GENSLOP = 100 * 1024; // +/- 100k for logs
@BeforeClass
- public static void initCluster() throws IOException {
- Configuration conf = new Configuration();
- conf.setBoolean(JTConfig.JT_RETIREJOBS, false);
- conf.setInt(JTConfig.JT_RETIREJOB_CACHE_SIZE, 1000);
- conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, true);
- conf.setInt(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, 1);
- dfsCluster = new MiniDFSCluster(conf, 3, true, null);
- dfs = dfsCluster.getFileSystem();
- mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1, null, null,
- new JobConf(conf));
+ public static void init() throws IOException {
+ GridmixTestUtils.initCluster();
}
@AfterClass
- public static void shutdownCluster() throws IOException {
- if (mrCluster != null) {
- mrCluster.shutdown();
- }
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- }
+ public static void shutDown() throws IOException {
+ GridmixTestUtils.shutdownCluster();
}
static class TestMonitor extends JobMonitor {
@@ -109,36 +100,72 @@ public class TestGridmixSubmission {
assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded));
final HashMap<String,JobStory> sub = new HashMap<String,JobStory>();
for (JobStory spec : submitted) {
- sub.put(spec.getName(), spec);
+ sub.put(spec.getJobID().toString(), spec);
}
+ final JobClient client = new JobClient(
+ GridmixTestUtils.mrCluster.createJobConf());
for (Job job : succeeded) {
final String jobname = job.getJobName();
if ("GRIDMIX_GENDATA".equals(jobname)) {
- final Path in = new Path("foo").makeQualified(dfs);
- final Path out = new Path("/gridmix").makeQualified(dfs);
- final ContentSummary generated = dfs.getContentSummary(in);
+ if (!job.getConfiguration().getBoolean(
+ GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true)) {
+ assertEquals(" Improper queue for " + job.getJobName(),
+ job.getConfiguration().get("mapred.job.queue.name"),
+ "q1");
+ } else {
+ assertEquals(" Improper queue for " + job.getJobName(),
+ job.getConfiguration().get("mapred.job.queue.name"),
+ "default");
+ }
+ final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
+ final Path out = new Path("/gridmix").makeQualified(GridmixTestUtils.dfs);
+ final ContentSummary generated = GridmixTestUtils.dfs.getContentSummary(in);
assertTrue("Mismatched data gen", // +/- 100k for logs
(GENDATA << 20) < generated.getLength() + GENSLOP ||
(GENDATA << 20) > generated.getLength() - GENSLOP);
- FileStatus[] outstat = dfs.listStatus(out);
+ FileStatus[] outstat = GridmixTestUtils.dfs.listStatus(out);
assertEquals("Mismatched job count", NJOBS, outstat.length);
continue;
}
+
+ if (!job.getConfiguration().getBoolean(
+ GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true)) {
+ assertEquals(" Improper queue for " + job.getJobName() + " " ,
+ job.getConfiguration().get("mapred.job.queue.name"),"q1" );
+ } else {
+ assertEquals(" Improper queue for " + job.getJobName() + " ",
+ job.getConfiguration().get("mapred.job.queue.name"),
+ sub.get(job.getConfiguration().get(GridmixJob.ORIGNAME))
+ .getQueueName());
+ }
+
final JobStory spec =
- sub.get(job.getJobName().replace("GRIDMIX", "MOCKJOB"));
+ sub.get(job.getConfiguration().get(GridmixJob.ORIGNAME));
assertNotNull("No spec for " + job.getJobName(), spec);
assertNotNull("No counters for " + job.getJobName(), job.getCounters());
+ final String specname = spec.getName();
+ final FileStatus stat =
+ GridmixTestUtils.dfs.getFileStatus(
+ new Path(GridmixTestUtils.DEST,
+ "" + Integer.valueOf(specname.substring(specname.length() - 5))));
+ assertEquals("Wrong owner for " + job.getJobName(), spec.getUser(),
+ stat.getOwner());
final int nMaps = spec.getNumberMaps();
final int nReds = spec.getNumberReduces();
+ // TODO Blocked by MAPREDUCE-118
+ if (true) return;
+ // TODO
System.out.println(jobname + ": " + nMaps + "/" + nReds);
- final TaskReport[] mReports = job.getTaskReports(TaskType.MAP);
+ final TaskReport[] mReports =
+ client.getMapTaskReports(JobID.downgrade(job.getJobID()));
assertEquals("Mismatched map count", nMaps, mReports.length);
check(TaskType.MAP, job, spec, mReports,
0, 0, SLOPBYTES, nReds);
- final TaskReport[] rReports = job.getTaskReports(TaskType.REDUCE);
+ final TaskReport[] rReports =
+ client.getReduceTaskReports(JobID.downgrade(job.getJobID()));
assertEquals("Mismatched reduce count", nReds, rReports.length);
check(TaskType.REDUCE, job, spec, rReports,
nMaps * SLOPBYTES, 2 * nMaps, 0, 0);
@@ -161,12 +188,12 @@ public class TestGridmixSubmission {
for (int i = 0; i < runTasks.length; ++i) {
final TaskInfo specInfo;
- final Counters counters = runTasks[i].getTaskCounters();
+ final Counters counters = runTasks[i].getCounters();
switch (type) {
case MAP:
runInputBytes[i] = counters.findCounter("FileSystemCounters",
"HDFS_BYTES_READ").getValue() -
- counters.findCounter(TaskCounter.SPLIT_RAW_BYTES).getValue();
+ counters.findCounter(SPLIT_RAW_BYTES).getValue();
runInputRecords[i] =
(int)counters.findCounter(MAP_INPUT_RECORDS).getValue();
runOutputBytes[i] =
@@ -287,8 +314,6 @@ public class TestGridmixSubmission {
@Override
protected JobMonitor createJobMonitor(Statistics stats) {
- Configuration conf = new Configuration();
- conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy.name());
monitor = new TestMonitor(NJOBS + 1, stats);
return monitor;
}
@@ -296,9 +321,10 @@ public class TestGridmixSubmission {
@Override
protected JobFactory createJobFactory(JobSubmitter submitter,
String traceIn, Path scratchDir, Configuration conf,
- CountDownLatch startFlag) throws IOException {
+ CountDownLatch startFlag, UserResolver userResolver)
+ throws IOException {
factory = DebugJobFactory.getFactory(
- submitter, scratchDir, NJOBS, conf, startFlag);
+ submitter, scratchDir, NJOBS, conf, startFlag, userResolver);
return factory;
}
}
@@ -307,7 +333,7 @@ public class TestGridmixSubmission {
public void testReplaySubmit() throws Exception {
policy = GridmixJobSubmissionPolicy.REPLAY;
System.out.println(" Replay started at " + System.currentTimeMillis());
- doSubmission();
+ doSubmission(false);
System.out.println(" Replay ended at " + System.currentTimeMillis());
}
@@ -315,35 +341,55 @@ public class TestGridmixSubmission {
public void testStressSubmit() throws Exception {
policy = GridmixJobSubmissionPolicy.STRESS;
System.out.println(" Stress started at " + System.currentTimeMillis());
- doSubmission();
+ doSubmission(false);
System.out.println(" Stress ended at " + System.currentTimeMillis());
}
@Test
+ public void testStressSubmitWithDefaultQueue() throws Exception {
+ policy = GridmixJobSubmissionPolicy.STRESS;
+ System.out.println(" Stress with default q started at "
+ + System.currentTimeMillis());
+ doSubmission(true);
+ System.out.println(" Stress with default q ended at "
+ + System.currentTimeMillis());
+ }
+
+ @Test
public void testSerialSubmit() throws Exception {
policy = GridmixJobSubmissionPolicy.SERIAL;
System.out.println("Serial started at " + System.currentTimeMillis());
- doSubmission();
+ doSubmission(false);
System.out.println("Serial ended at " + System.currentTimeMillis());
}
- public void doSubmission() throws Exception {
- final Path in = new Path("foo").makeQualified(dfs);
- final Path out = new Path("/gridmix").makeQualified(dfs);
+ private void doSubmission(boolean useDefaultQueue) throws Exception {
+ final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
+ final Path out = GridmixTestUtils.DEST.makeQualified(GridmixTestUtils.dfs);
final Path root = new Path("/user");
Configuration conf = null;
try{
final String[] argv = {
"-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
"-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
+ "-D" + Gridmix.GRIDMIX_USR_RSV + "=" + EchoUserResolver.class.getName(),
"-generate", String.valueOf(GENDATA) + "m",
in.toString(),
"-" // ignored by DebugGridmix
};
DebugGridmix client = new DebugGridmix();
- conf = mrCluster.createJobConf();
- conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy.name());
- //conf.setInt(Gridmix.GRIDMIX_KEY_LEN, 2);
+ conf = new Configuration();
+ conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy);
+ if (useDefaultQueue) {
+ conf.setBoolean(GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, false);
+ conf.set(GridmixJob.GRIDMIX_DEFAULT_QUEUE, "q1");
+ } else {
+ conf.setBoolean(GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true);
+ }
+ conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
+ // allow synthetic users to create home directories
+ GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short)0777));
+ GridmixTestUtils.dfs.setPermission(root, new FsPermission((short)0777));
int res = ToolRunner.run(conf, client, argv);
assertEquals("Client exited with nonzero status", 0, res);
client.checkMonitor();
@@ -352,7 +398,7 @@ public class TestGridmixSubmission {
} finally {
in.getFileSystem(conf).delete(in, true);
out.getFileSystem(conf).delete(out, true);
- root.getFileSystem(conf).delete(root, true);
+ root.getFileSystem(conf).delete(root,true);
}
}
Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java?rev=963986&r1=963985&r2=963986&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java Wed Jul 14 09:22:29 2010
@@ -40,7 +40,9 @@ public class TestRecordFactory {
final RecordFactory f = new AvgRecordFactory(targetBytes, targetRecs, conf);
targetRecs = targetRecs <= 0 && targetBytes >= 0
? Math.max(1,
- targetBytes / conf.getInt("gridmix.missing.rec.size", 64 * 1024))
+ targetBytes
+ / conf.getInt(AvgRecordFactory.GRIDMIX_MISSING_REC_SIZE,
+ 64 * 1024))
: targetRecs;
long records = 0L;
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java?rev=963986&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java Wed Jul 14 09:22:29 2010
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.*;
+
+public class TestSleepJob {
+
+ public static final Log LOG = LogFactory.getLog(Gridmix.class);
+
+ {
+ ((Log4JLogger) LogFactory.getLog("org.apache.hadoop.mapred.gridmix"))
+ .getLogger().setLevel(Level.DEBUG);
+ }
+
+ static GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.REPLAY;
+ private static final int NJOBS = 2;
+ private static final long GENDATA = 50; // in megabytes
+
+
+ @BeforeClass
+ public static void init() throws IOException {
+ GridmixTestUtils.initCluster();
+ }
+
+ @AfterClass
+ public static void shutDown() throws IOException {
+ GridmixTestUtils.shutdownCluster();
+ }
+
+ static class TestMonitor extends JobMonitor {
+ private final BlockingQueue<Job> retiredJobs;
+ private final int expected;
+
+ public TestMonitor(int expected, Statistics stats) {
+ super(stats);
+ this.expected = expected;
+ retiredJobs = new LinkedBlockingQueue<Job>();
+ }
+
+ @Override
+ protected void onSuccess(Job job) {
+ System.out.println(" Job Sucess " + job);
+ retiredJobs.add(job);
+ }
+
+ @Override
+ protected void onFailure(Job job) {
+ fail("Job failure: " + job);
+ }
+
+ public void verify(ArrayList<JobStory> submitted) throws Exception {
+ assertEquals("Bad job count", expected, retiredJobs.size());
+ }
+ }
+
+
+ static class DebugGridmix extends Gridmix {
+
+ private JobFactory factory;
+ private TestMonitor monitor;
+
+ @Override
+ protected JobMonitor createJobMonitor(Statistics stats) {
+ monitor = new TestMonitor(NJOBS + 1, stats);
+ return monitor;
+ }
+
+ @Override
+ protected JobFactory createJobFactory(
+ JobSubmitter submitter, String traceIn, Path scratchDir,
+ Configuration conf, CountDownLatch startFlag, UserResolver userResolver)
+ throws IOException {
+ factory =
+ DebugJobFactory.getFactory(submitter, scratchDir, NJOBS, conf,
+ startFlag, userResolver);
+ return factory;
+ }
+
+ public void checkMonitor() throws Exception {
+ monitor.verify(((DebugJobFactory.Debuggable) factory).getSubmitted());
+ }
+ }
+
+
+ @Test
+ public void testReplaySubmit() throws Exception {
+ policy = GridmixJobSubmissionPolicy.REPLAY;
+ System.out.println(" Replay started at " + System.currentTimeMillis());
+ doSubmission();
+ System.out.println(" Replay ended at " + System.currentTimeMillis());
+ }
+
+ @Test
+ public void testStressSubmit() throws Exception {
+ policy = GridmixJobSubmissionPolicy.STRESS;
+ System.out.println(" Stress started at " + System.currentTimeMillis());
+ doSubmission();
+ System.out.println(" Stress ended at " + System.currentTimeMillis());
+ }
+
+ @Test
+ public void testSerialSubmit() throws Exception {
+ policy = GridmixJobSubmissionPolicy.SERIAL;
+ System.out.println("Serial started at " + System.currentTimeMillis());
+ doSubmission();
+ System.out.println("Serial ended at " + System.currentTimeMillis());
+ }
+
+
+ private void doSubmission() throws Exception {
+ final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
+ final Path out = GridmixTestUtils.DEST.makeQualified(GridmixTestUtils.dfs);
+ final Path root = new Path("/user");
+ Configuration conf = null;
+ try {
+ final String[] argv = {"-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
+ "-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
+ "-D" + Gridmix.GRIDMIX_USR_RSV + "="
+ + EchoUserResolver.class.getName(),
+ "-D" + JobCreator.GRIDMIX_JOB_TYPE + "="
+ + JobCreator.SLEEPJOB.name(),
+ "-D" + SleepJob.GRIDMIX_SLEEP_INTERVAL +"=" +"10",
+ "-generate",
+ String.valueOf(GENDATA) + "m", in.toString(),
+ "-"
+ // ignored by DebugGridmix
+ };
+ DebugGridmix client = new DebugGridmix();
+ conf = new Configuration();
+ conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy);
+ conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
+ // allow synthetic users to create home directories
+ GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short) 0777));
+ GridmixTestUtils.dfs.setPermission(root, new FsPermission((short) 0777));
+ int res = ToolRunner.run(conf, client, argv);
+ assertEquals("Client exited with nonzero status", 0, res);
+ client.checkMonitor();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ in.getFileSystem(conf).delete(in, true);
+ out.getFileSystem(conf).delete(out, true);
+ root.getFileSystem(conf).delete(root, true);
+ }
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestUserResolve.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestUserResolve.java?rev=963986&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestUserResolve.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestUserResolve.java Wed Jul 14 09:22:29 2010
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class TestUserResolve {
+
+ static Path userlist;
+
+ @BeforeClass
+ public static void writeUserList() throws IOException {
+ final Configuration conf = new Configuration();
+ final FileSystem fs = FileSystem.getLocal(conf);
+ final Path wd =
+ new Path(new Path(System.getProperty("test.build.data", "/tmp"))
+ .makeQualified(fs),
+ "gridmixUserResolve");
+ userlist = new Path(wd, "users");
+ FSDataOutputStream out = null;
+ try {
+ out = fs.create(userlist, true);
+ out.writeBytes("user0,groupA,groupB,groupC\n");
+ out.writeBytes("user1,groupA,groupC\n");
+ out.writeBytes("user2,groupB\n");
+ out.writeBytes("user3,groupA,groupB,groupC\n");
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
+ @Test
+ public void testRoundRobinResolver() throws Exception {
+ final Configuration conf = new Configuration();
+ final UserResolver rslv = new RoundRobinUserResolver();
+
+ boolean fail = false;
+ try {
+ rslv.setTargetUsers(null, conf);
+ } catch (IOException e) {
+ fail = true;
+ }
+ assertTrue("User list required for RoundRobinUserResolver", fail);
+
+ rslv.setTargetUsers(new URI(userlist.toString()), conf);
+ UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser("hfre0");
+ assertEquals("user0", rslv.getTargetUgi(ugi1).getUserName());
+ assertEquals("user1",
+ rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre1"))
+ .getUserName());
+ assertEquals("user2",
+ rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre2"))
+ .getUserName());
+ assertEquals("user0", rslv.getTargetUgi(ugi1).getUserName());
+ assertEquals("user3",
+ rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre3"))
+ .getUserName());
+ assertEquals("user0", rslv.getTargetUgi(ugi1).getUserName());
+ }
+
+ @Test
+ public void testSubmitterResolver() throws Exception {
+ final Configuration conf = new Configuration();
+ final UserResolver rslv = new SubmitterUserResolver();
+ rslv.setTargetUsers(null, conf);
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ assertEquals(ugi, rslv.getTargetUgi((UserGroupInformation)null));
+ System.out.println(" Submitter current user " + ugi);
+ System.out.println(" Target ugi "
+ + rslv.getTargetUgi((UserGroupInformation) null));
+ }
+}