You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2012/02/23 11:41:08 UTC
svn commit: r1292736 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/
src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/
src/docs/src/documentation/content/xdocs/
Author: amarrk
Date: Thu Feb 23 10:41:07 2012
New Revision: 1292736
URL: http://svn.apache.org/viewvc?rev=1292736&view=rev
Log:
MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for faster job submission. (amarrk)
Added:
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixStatistics.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java
hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java
hadoop/common/trunk/hadoop-mapreduce-project/src/docs/src/documentation/content/xdocs/gridmix.xml
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1292736&r1=1292735&r2=1292736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Feb 23 10:41:07 2012
@@ -14,6 +14,8 @@ Trunk (unreleased changes)
(Plamen Jeliazkov via shv)
IMPROVEMENTS
+ MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
+ faster job submission. (amarrk)
MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk)
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java?rev=1292736&r1=1292735&r2=1292736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java Thu Feb 23 10:41:07 2012
@@ -53,6 +53,7 @@ class ExecutionSummarizer implements Sta
private int numJobsInInputTrace;
private int totalSuccessfulJobs;
private int totalFailedJobs;
+ private int totalLostJobs;
private int totalMapTasksLaunched;
private int totalReduceTasksLaunched;
private long totalSimulationTime;
@@ -90,31 +91,32 @@ class ExecutionSummarizer implements Sta
simulationStartTime = System.currentTimeMillis();
}
- private void processJobState(JobStats stats) throws Exception {
+ private void processJobState(JobStats stats) {
Job job = stats.getJob();
- if (job.isSuccessful()) {
- ++totalSuccessfulJobs;
- } else {
- ++totalFailedJobs;
+ try {
+ if (job.isSuccessful()) {
+ ++totalSuccessfulJobs;
+ } else {
+ ++totalFailedJobs;
+ }
+ } catch (Exception e) {
+ // this behavior is consistent with job-monitor which marks the job as
+ // complete (lost) if the status polling bails out
+ ++totalLostJobs;
}
}
- private void processJobTasks(JobStats stats) throws Exception {
+ private void processJobTasks(JobStats stats) {
totalMapTasksLaunched += stats.getNoOfMaps();
- Job job = stats.getJob();
- totalReduceTasksLaunched += job.getNumReduceTasks();
+ totalReduceTasksLaunched += stats.getNoOfReds();
}
private void process(JobStats stats) {
- try {
- // process the job run state
- processJobState(stats);
-
- // process the tasks information
- processJobTasks(stats);
- } catch (Exception e) {
- LOG.info("Error in processing job " + stats.getJob().getJobID() + ".");
- }
+ // process the job run state
+ processJobState(stats);
+
+ // process the tasks information
+ processJobTasks(stats);
}
@Override
@@ -191,6 +193,8 @@ class ExecutionSummarizer implements Sta
.append(getNumSuccessfulJobs());
builder.append("\nTotal number of failed jobs: ")
.append(getNumFailedJobs());
+ builder.append("\nTotal number of lost jobs: ")
+ .append(getNumLostJobs());
builder.append("\nTotal number of map tasks launched: ")
.append(getNumMapTasksLaunched());
builder.append("\nTotal number of reduce task launched: ")
@@ -266,8 +270,12 @@ class ExecutionSummarizer implements Sta
return totalFailedJobs;
}
+ protected int getNumLostJobs() {
+ return totalLostJobs;
+ }
+
protected int getNumSubmittedJobs() {
- return totalSuccessfulJobs + totalFailedJobs;
+ return totalSuccessfulJobs + totalFailedJobs + totalLostJobs;
}
protected int getNumMapTasksLaunched() {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=1292736&r1=1292735&r2=1292736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Thu Feb 23 10:41:07 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
@@ -94,6 +95,31 @@ public class Gridmix extends Configured
public static final String GRIDMIX_USR_RSV = "gridmix.user.resolve.class";
/**
+ * The configuration key which determines the duration for which the
+ * job-monitor sleeps while polling for job status.
+ * This value should be specified in milliseconds.
+ */
+ public static final String GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS =
+ "gridmix.job-monitor.sleep-time-ms";
+
+ /**
+ * Default value for {@link #GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS}.
+ */
+ public static final int GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS_DEFAULT = 500;
+
+ /**
+ * The configuration key which determines the total number of job-status
+ * monitoring threads.
+ */
+ public static final String GRIDMIX_JOBMONITOR_THREADS =
+ "gridmix.job-monitor.thread-count";
+
+ /**
+ * Default value for {@link #GRIDMIX_JOBMONITOR_THREADS}.
+ */
+ public static final int GRIDMIX_JOBMONITOR_THREADS_DEFAULT = 1;
+
+ /**
* Configuration property set in simulated job's configuration whose value is
* set to the corresponding original job's name. This is not configurable by
* gridmix user.
@@ -185,8 +211,13 @@ public class Gridmix extends Configured
submitter.add(job);
// TODO add listeners, use for job dependencies
- TimeUnit.SECONDS.sleep(10);
try {
+ while (!job.isSubmitted()) {
+ try {
+ Thread.sleep(100); // sleep
+ } catch (InterruptedException ie) {}
+ }
+ // wait for completion
job.getJob().waitForCompletion(false);
} catch (ClassNotFoundException e) {
throw new IOException("Internal error", e);
@@ -241,7 +272,7 @@ public class Gridmix extends Configured
GridmixJobSubmissionPolicy policy = getJobSubmissionPolicy(conf);
LOG.info(" Submission policy is " + policy.name());
statistics = new Statistics(conf, policy.getPollingInterval(), startFlag);
- monitor = createJobMonitor(statistics);
+ monitor = createJobMonitor(statistics, conf);
int noOfSubmitterThreads =
(policy == GridmixJobSubmissionPolicy.SERIAL)
? 1
@@ -276,8 +307,13 @@ public class Gridmix extends Configured
}
}
- protected JobMonitor createJobMonitor(Statistics stats) throws IOException {
- return new JobMonitor(stats);
+ protected JobMonitor createJobMonitor(Statistics stats, Configuration conf)
+ throws IOException {
+ int delay = conf.getInt(GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS,
+ GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS_DEFAULT);
+ int numThreads = conf.getInt(GRIDMIX_JOBMONITOR_THREADS,
+ GRIDMIX_JOBMONITOR_THREADS_DEFAULT);
+ return new JobMonitor(delay, TimeUnit.MILLISECONDS, stats, numThreads);
}
protected JobSubmitter createJobSubmitter(JobMonitor monitor, int threads,
@@ -571,12 +607,13 @@ public class Gridmix extends Configured
if (monitor == null) {
return;
}
- List<Job> remainingJobs = monitor.getRemainingJobs();
+ List<JobStats> remainingJobs = monitor.getRemainingJobs();
if (remainingJobs.isEmpty()) {
return;
}
LOG.info("Killing running jobs...");
- for (Job job : remainingJobs) {
+ for (JobStats stats : remainingJobs) {
+ Job job = stats.getJob();
try {
if (!job.isComplete()) {
job.killJob();
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=1292736&r1=1292735&r2=1292736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Thu Feb 23 10:41:07 2012
@@ -72,6 +72,7 @@ abstract class GridmixJob implements Cal
}
};
+ private boolean submitted;
protected final int seq;
protected final Path outdir;
protected final Job job;
@@ -412,6 +413,14 @@ abstract class GridmixJob implements Cal
return jobdesc;
}
+ void setSubmitted() {
+ submitted = true;
+ }
+
+ boolean isSubmitted() {
+ return submitted;
+ }
+
static void pushDescription(int seq, List<InputSplit> splits) {
if (null != descCache.putIfAbsent(seq, splits)) {
throw new IllegalArgumentException("Description exists for id " + seq);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=1292736&r1=1292735&r2=1292736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Thu Feb 23 10:41:07 2012
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.mapred.gridmix;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -35,6 +36,8 @@ import org.apache.hadoop.tools.rumen.Pre
import java.io.IOException;
import java.io.InputStream;
+import java.util.List;
+import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicInteger;
@@ -179,19 +182,33 @@ abstract class JobFactory<T> implements
protected JobStory getNextJobFiltered() throws IOException {
JobStory job = getNextJobFromTrace();
+ // filter out the following jobs
+ // - unsuccessful jobs
+ // - jobs with missing submit-time
+ // - reduce only jobs
+ // These jobs are not yet supported in Gridmix
while (job != null &&
(job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS ||
- job.getSubmissionTime() < 0)) {
+ job.getSubmissionTime() < 0 || job.getNumberMaps() == 0)) {
if (LOG.isDebugEnabled()) {
- String reason = null;
+ List<String> reason = new ArrayList<String>();
if (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS) {
- reason = "STATE (" + job.getOutcome().name() + ") ";
+ reason.add("STATE (" + job.getOutcome().name() + ")");
}
if (job.getSubmissionTime() < 0) {
- reason += "SUBMISSION-TIME (" + job.getSubmissionTime() + ")";
+ reason.add("SUBMISSION-TIME (" + job.getSubmissionTime() + ")");
}
+ if (job.getNumberMaps() == 0) {
+ reason.add("ZERO-MAPS-JOB");
+ }
+
+ // TODO This should never happen. Probably we missed something!
+ if (reason.size() == 0) {
+ reason.add("N/A");
+ }
+
LOG.debug("Ignoring job " + job.getJobID() + " from the input trace."
- + " Reason: " + reason == null ? "N/A" : reason);
+ + " Reason: " + StringUtils.join(reason, ","));
}
job = getNextJobFromTrace();
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java?rev=1292736&r1=1292735&r2=1292736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java Thu Feb 23 10:41:07 2012
@@ -24,37 +24,47 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
/**
- * Component accepting submitted, running jobs and responsible for
- * monitoring jobs for success and failure. Once a job is submitted, it is
- * polled for status until complete. If a job is complete, then the monitor
- * thread returns immediately to the queue. If not, the monitor will sleep
- * for some duration.
+ * Component accepting submitted, running {@link Statistics.JobStats} and
+ * responsible for monitoring jobs for success and failure. Once a job is
+ * submitted, it is polled for status until complete. If a job is complete,
+ * then the monitor thread returns immediately to the queue. If not, the monitor
+ * will sleep for some duration.
+ *
+ * {@link JobMonitor} can be configured to use multiple threads for polling
+ * the job statuses. Use {@link Gridmix#GRIDMIX_JOBMONITOR_THREADS} to specify
+ * the total number of monitoring threads.
+ *
+ * The duration for which a monitoring thread sleeps if the first job in the
+ * queue is running can also be configured. Use
+ * {@link Gridmix#GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS} to specify a custom
+ * value.
*/
-class JobMonitor implements Gridmix.Component<Job> {
+class JobMonitor implements Gridmix.Component<JobStats> {
public static final Log LOG = LogFactory.getLog(JobMonitor.class);
- private final Queue<Job> mJobs;
- private final MonitorThread mThread;
- private final BlockingQueue<Job> runningJobs;
+ private final Queue<JobStats> mJobs;
+ private ExecutorService executor;
+ private int numPollingThreads;
+ private final BlockingQueue<JobStats> runningJobs;
private final long pollDelayMillis;
private Statistics statistics;
private boolean graceful = false;
private boolean shutdown = false;
- public JobMonitor(Statistics statistics) {
- this(5,TimeUnit.SECONDS, statistics);
- }
-
/**
* Create a JobMonitor that sleeps for the specified duration after
* polling a still-running job.
@@ -62,30 +72,37 @@ class JobMonitor implements Gridmix.Comp
* @param unit Time unit for pollDelaySec (rounded to milliseconds)
* @param statistics StatCollector , listener to job completion.
*/
- public JobMonitor(int pollDelay, TimeUnit unit, Statistics statistics) {
- mThread = new MonitorThread();
- runningJobs = new LinkedBlockingQueue<Job>();
- mJobs = new LinkedList<Job>();
+ public JobMonitor(int pollDelay, TimeUnit unit, Statistics statistics,
+ int numPollingThreads) {
+ executor = Executors.newCachedThreadPool();
+ this.numPollingThreads = numPollingThreads;
+ runningJobs = new LinkedBlockingQueue<JobStats>();
+ mJobs = new LinkedList<JobStats>();
this.pollDelayMillis = TimeUnit.MILLISECONDS.convert(pollDelay, unit);
this.statistics = statistics;
}
/**
- * Add a job to the polling queue.
+ * Add a running job's status to the polling queue.
*/
- public void add(Job job) throws InterruptedException {
- runningJobs.put(job);
+ public void add(JobStats job) throws InterruptedException {
+ synchronized (runningJobs) {
+ runningJobs.put(job);
+ }
}
/**
- * Add a submission failed job , such that it can be communicated
+ * Add a submission failed job's status, such that it can be communicated
* back to serial.
* TODO: Cleaner solution for this problem
* @param job
*/
- public void submissionFailed(Job job) {
- LOG.info("Job submission failed notification for job " + job.getJobID());
- this.statistics.add(job);
+ public void submissionFailed(JobStats job) {
+ String jobID = job.getJob().getConfiguration().get(Gridmix.ORIGINAL_JOB_ID);
+ LOG.info("Job submission failed notification for job " + jobID);
+ synchronized (statistics) {
+ this.statistics.add(job);
+ }
}
/**
@@ -108,12 +125,9 @@ class JobMonitor implements Gridmix.Comp
* @throws IllegalStateException If monitoring thread is still running.
* @return Any jobs submitted and not known to have completed.
*/
- List<Job> getRemainingJobs() {
- if (mThread.isAlive()) {
- LOG.warn("Internal error: Polling running monitor for jobs");
- }
+ List<JobStats> getRemainingJobs() {
synchronized (mJobs) {
- return new ArrayList<Job>(mJobs);
+ return new ArrayList<JobStats>(mJobs);
}
}
@@ -123,19 +137,8 @@ class JobMonitor implements Gridmix.Comp
*/
private class MonitorThread extends Thread {
- public MonitorThread() {
- super("GridmixJobMonitor");
- }
-
- /**
- * Check a job for success or failure.
- */
- public void process(Job job) throws IOException, InterruptedException {
- if (job.isSuccessful()) {
- onSuccess(job);
- } else {
- onFailure(job);
- }
+ public MonitorThread(int i) {
+ super("GridmixJobMonitor-" + i);
}
@Override
@@ -144,10 +147,12 @@ class JobMonitor implements Gridmix.Comp
boolean shutdown;
while (true) {
try {
- synchronized (mJobs) {
- graceful = JobMonitor.this.graceful;
- shutdown = JobMonitor.this.shutdown;
- runningJobs.drainTo(mJobs);
+ synchronized (runningJobs) {
+ synchronized (mJobs) {
+ graceful = JobMonitor.this.graceful;
+ shutdown = JobMonitor.this.shutdown;
+ runningJobs.drainTo(mJobs);
+ }
}
// shutdown conditions; either shutdown requested and all jobs
@@ -155,26 +160,63 @@ class JobMonitor implements Gridmix.Comp
// submitted jobs not in the monitored set
if (shutdown) {
if (!graceful) {
- while (!runningJobs.isEmpty()) {
- synchronized (mJobs) {
- runningJobs.drainTo(mJobs);
+ synchronized (runningJobs) {
+ while (!runningJobs.isEmpty()) {
+ synchronized (mJobs) {
+ runningJobs.drainTo(mJobs);
+ }
}
}
break;
- } else if (mJobs.isEmpty()) {
- break;
}
- }
- while (!mJobs.isEmpty()) {
- Job job;
+
synchronized (mJobs) {
- job = mJobs.poll();
+ if (graceful && mJobs.isEmpty()) {
+ break;
+ }
}
+ }
+ JobStats jobStats = null;
+ synchronized (mJobs) {
+ jobStats = mJobs.poll();
+ }
+ while (jobStats != null) {
+ Job job = jobStats.getJob();
+
try {
- if (job.isComplete()) {
- process(job);
- statistics.add(job);
- continue;
+ // get the job status
+ long start = System.currentTimeMillis();
+ JobStatus status = job.getStatus(); // cache the job status
+ long end = System.currentTimeMillis();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Status polling for job " + job.getJobID() + " took "
+ + (end-start) + "ms.");
+ }
+
+ // update the job progress
+ jobStats.updateJobStatus(status);
+
+ // if the job is complete, let others know
+ if (status.isJobComplete()) {
+ if (status.getState() == JobStatus.State.SUCCEEDED) {
+ onSuccess(job);
+ } else {
+ onFailure(job);
+ }
+ synchronized (statistics) {
+ statistics.add(jobStats);
+ }
+ } else {
+ // add the running job back and break
+ synchronized (mJobs) {
+ if (!mJobs.offer(jobStats)) {
+ LOG.error("Lost job " + (null == job.getJobName()
+ ? "<unknown>" : job.getJobName())); // should never
+ // happen
+ }
+ }
+ break;
}
} catch (IOException e) {
if (e.getCause() instanceof ClosedByInterruptException) {
@@ -186,18 +228,19 @@ class JobMonitor implements Gridmix.Comp
} else {
LOG.warn("Lost job " + (null == job.getJobName()
? "<unknown>" : job.getJobName()), e);
- continue;
+ synchronized (statistics) {
+ statistics.add(jobStats);
+ }
}
}
+
+ // get the next job
synchronized (mJobs) {
- if (!mJobs.offer(job)) {
- LOG.error("Lost job " + (null == job.getJobName()
- ? "<unknown>" : job.getJobName())); // should never
- // happen
- }
+ jobStats = mJobs.poll();
}
- break;
}
+
+ // sleep for a while before checking again
try {
TimeUnit.MILLISECONDS.sleep(pollDelayMillis);
} catch (InterruptedException e) {
@@ -215,7 +258,9 @@ class JobMonitor implements Gridmix.Comp
* Start the internal, monitoring thread.
*/
public void start() {
- mThread.start();
+ for (int i = 0; i < numPollingThreads; ++i) {
+ executor.execute(new MonitorThread(i));
+ }
}
/**
@@ -224,7 +269,7 @@ class JobMonitor implements Gridmix.Comp
* if no form of shutdown has been requested.
*/
public void join(long millis) throws InterruptedException {
- mThread.join(millis);
+ executor.awaitTermination(millis, TimeUnit.MILLISECONDS);
}
/**
@@ -236,7 +281,7 @@ class JobMonitor implements Gridmix.Comp
graceful = false;
shutdown = true;
}
- mThread.interrupt();
+ executor.shutdown();
}
/**
@@ -248,7 +293,7 @@ class JobMonitor implements Gridmix.Comp
graceful = true;
shutdown = true;
}
- mThread.interrupt();
+ executor.shutdown();
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java?rev=1292736&r1=1292735&r2=1292736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java Thu Feb 23 10:41:07 2012
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
/**
* Component accepting deserialized job traces, computing split data, and
@@ -46,6 +47,7 @@ class JobSubmitter implements Gridmix.Co
private final JobMonitor monitor;
private final ExecutorService sched;
private volatile boolean shutdown = false;
+ private final int queueDepth;
/**
* Initialize the submission component with downstream monitor and pool of
@@ -61,6 +63,7 @@ class JobSubmitter implements Gridmix.Co
*/
public JobSubmitter(JobMonitor monitor, int threads, int queueDepth,
FilePool inputDir, Statistics statistics) {
+ this.queueDepth = queueDepth;
sem = new Semaphore(queueDepth);
sched = new ThreadPoolExecutor(threads, threads, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
@@ -79,19 +82,25 @@ class JobSubmitter implements Gridmix.Co
this.job = job;
}
public void run() {
+ JobStats stats =
+ Statistics.generateJobStats(job.getJob(), job.getJobDesc());
try {
// pre-compute split information
try {
+ long start = System.currentTimeMillis();
job.buildSplits(inputDir);
+ long end = System.currentTimeMillis();
+ LOG.info("[JobSubmitter] Time taken to build splits for job "
+ + job.getJob().getJobID() + ": " + (end - start) + " ms.");
} catch (IOException e) {
LOG.warn("Failed to submit " + job.getJob().getJobName() + " as "
+ job.getUgi(), e);
- monitor.submissionFailed(job.getJob());
+ monitor.submissionFailed(stats);
return;
} catch (Exception e) {
LOG.warn("Failed to submit " + job.getJob().getJobName() + " as "
+ job.getUgi(), e);
- monitor.submissionFailed(job.getJob());
+ monitor.submissionFailed(stats);
return;
}
// Sleep until deadline
@@ -102,10 +111,28 @@ class JobSubmitter implements Gridmix.Co
}
try {
// submit job
- monitor.add(job.call());
- statistics.addJobStats(job.getJob(), job.getJobDesc());
- LOG.debug("SUBMIT " + job + "@" + System.currentTimeMillis() +
- " (" + job.getJob().getJobID() + ")");
+ long start = System.currentTimeMillis();
+ job.call();
+ long end = System.currentTimeMillis();
+ LOG.info("[JobSubmitter] Time taken to submit the job "
+ + job.getJob().getJobID() + ": " + (end - start) + " ms.");
+
+ // mark it as submitted
+ job.setSubmitted();
+
+ // add to the monitor
+ monitor.add(stats);
+
+ // add to the statistics
+ statistics.addJobStats(stats);
+ if (LOG.isDebugEnabled()) {
+ String jobID =
+ job.getJob().getConfiguration().get(Gridmix.ORIGINAL_JOB_ID);
+ LOG.debug("Original job '" + jobID + "' is being simulated as '"
+ + job.getJob().getJobID() + "'");
+ LOG.debug("SUBMIT " + job + "@" + System.currentTimeMillis()
+ + " (" + job.getJob().getJobID() + ")");
+ }
} catch (IOException e) {
LOG.warn("Failed to submit " + job.getJob().getJobName() + " as "
+ job.getUgi(), e);
@@ -113,21 +140,21 @@ class JobSubmitter implements Gridmix.Co
throw new InterruptedException("Failed to submit " +
job.getJob().getJobName());
}
- monitor.submissionFailed(job.getJob());
+ monitor.submissionFailed(stats);
} catch (ClassNotFoundException e) {
LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
- monitor.submissionFailed(job.getJob());
+ monitor.submissionFailed(stats);
}
} catch (InterruptedException e) {
// abort execution, remove splits if nesc
// TODO release ThdLoc
GridmixJob.pullDescription(job.id());
Thread.currentThread().interrupt();
- monitor.submissionFailed(job.getJob());
+ monitor.submissionFailed(stats);
} catch(Exception e) {
//Due to some exception job wasnt submitted.
LOG.info(" Job " + job.getJob().getJobID() + " submission failed " , e);
- monitor.submissionFailed(job.getJob());
+ monitor.submissionFailed(stats);
} finally {
sem.release();
}
@@ -141,6 +168,8 @@ class JobSubmitter implements Gridmix.Co
final boolean addToQueue = !shutdown;
if (addToQueue) {
final SubmitTask task = new SubmitTask(job);
+ LOG.info("Total number of queued jobs: "
+ + (queueDepth - sem.availablePermits()));
sem.acquire();
try {
sched.execute(task);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java?rev=1292736&r1=1292735&r2=1292736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java Thu Feb 23 10:41:07 2012
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapred.JobClien
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.gridmix.Gridmix.Component;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;
@@ -43,12 +44,12 @@ import java.util.concurrent.locks.Reentr
/**
* Component collecting the stats required by other components
* to make decisions.
- * Single thread Collector tries to collec the stats.
- * Each of thread poll updates certain datastructure(Currently ClusterStats).
- * Components interested in these datastructure, need to register.
- * StatsCollector notifies each of the listeners.
+ * Single thread collector tries to collect the stats (currently cluster stats)
+ * and caches it internally.
+ * Components interested in these stats need to register themselves and will get
+ * notified either on every job completion event or some fixed time interval.
*/
-public class Statistics implements Component<Job> {
+public class Statistics implements Component<Statistics.JobStats> {
public static final Log LOG = LogFactory.getLog(Statistics.class);
private final StatCollector statistics = new StatCollector();
@@ -62,10 +63,16 @@ public class Statistics implements Compo
private final List<StatListener<JobStats>> jobStatListeners =
new CopyOnWriteArrayList<StatListener<JobStats>>();
- //List of jobids and noofMaps for each job
- private static final Map<Integer, JobStats> jobMaps =
- new ConcurrentHashMap<Integer,JobStats>();
-
+ // A map of job-sequence-id to job-stats of submitted jobs
+ private static final Map<Integer, JobStats> submittedJobsMap =
+ new ConcurrentHashMap<Integer, JobStats>();
+
+ // total number of map tasks submitted
+ private static volatile int numMapsSubmitted = 0;
+
+ // total number of reduce tasks submitted
+ private static volatile int numReducesSubmitted = 0;
+
private int completedJobsInCurrentInterval = 0;
private final int jtPollingInterval;
private volatile boolean shutdown = false;
@@ -92,41 +99,65 @@ public class Statistics implements Compo
this.startFlag = startFlag;
}
- public void addJobStats(Job job, JobStory jobdesc) {
+ /**
+ * Generates a job stats.
+ */
+ public static JobStats generateJobStats(Job job, JobStory jobdesc) {
int seq = GridmixJob.getJobSeqId(job);
- if (seq < 0) {
- LOG.info("Not tracking job " + job.getJobName()
- + " as seq id is less than zero: " + seq);
- return;
+ // bail out if job description is missing for a job to be simulated
+ if (seq >= 0 && jobdesc == null) {
+ throw new IllegalArgumentException("JobStory not available for job "
+ + job.getJobID());
}
- int maps = 0;
- int reds = 0;
- if (jobdesc == null) {
- throw new IllegalArgumentException(
- " JobStory not available for job " + job.getJobName());
- } else {
+ int maps = -1;
+ int reds = -1;
+ if (jobdesc != null) {
+ // Note that the ZombieJob will return a >= 0 value
maps = jobdesc.getNumberMaps();
reds = jobdesc.getNumberReduces();
}
- JobStats stats = new JobStats(maps, reds, job);
- jobMaps.put(seq,stats);
+ return new JobStats(maps, reds, job);
+ }
+
+ /**
+ * Add a submitted job for monitoring.
+ */
+ public void addJobStats(JobStats stats) {
+ int seq = GridmixJob.getJobSeqId(stats.getJob());
+ if (seq < 0) {
+ LOG.info("Not tracking job " + stats.getJob().getJobName()
+ + " as seq id is less than zero: " + seq);
+ return;
+ }
+ submittedJobsMap.put(seq, stats);
+ numMapsSubmitted += stats.getNoOfMaps();
+ numReducesSubmitted += stats.getNoOfReds();
}
/**
* Used by JobMonitor to add the completed job.
*/
@Override
- public void add(Job job) {
- //This thread will be notified initially by jobmonitor incase of
+ public void add(Statistics.JobStats job) {
+ //This thread will be notified initially by job-monitor incase of
//data generation. Ignore that as we are getting once the input is
//generated.
if (!statistics.isAlive()) {
return;
}
- JobStats stat = jobMaps.remove(GridmixJob.getJobSeqId(job));
-
- if (stat == null) return;
+ JobStats stat = submittedJobsMap.remove(GridmixJob.getJobSeqId(job.getJob()));
+
+ // stat cannot be null
+ if (stat == null) {
+ LOG.error("[Statistics] Missing entry for job "
+ + job.getJob().getJobID());
+ return;
+ }
+
+ // update the total number of submitted map/reduce task count
+ numMapsSubmitted -= stat.getNoOfMaps();
+ numReducesSubmitted -= stat.getNoOfReds();
completedJobsInCurrentInterval++;
//check if we have reached the maximum level of job completions.
@@ -238,7 +269,7 @@ public class Statistics implements Compo
@Override
public void shutdown() {
shutdown = true;
- jobMaps.clear();
+ submittedJobsMap.clear();
clusterStatlisteners.clear();
jobStatListeners.clear();
statistics.interrupt();
@@ -247,7 +278,7 @@ public class Statistics implements Compo
@Override
public void abort() {
shutdown = true;
- jobMaps.clear();
+ submittedJobsMap.clear();
clusterStatlisteners.clear();
jobStatListeners.clear();
statistics.interrupt();
@@ -259,9 +290,10 @@ public class Statistics implements Compo
* TODO: In future we need to extend this to send more information.
*/
static class JobStats {
- private int noOfMaps;
- private int noOfReds;
- private Job job;
+ private final int noOfMaps;
+ private final int noOfReds;
+ private JobStatus currentStatus;
+ private final Job job;
public JobStats(int noOfMaps,int numOfReds, Job job){
this.job = job;
@@ -284,6 +316,20 @@ public class Statistics implements Compo
public Job getJob() {
return job;
}
+
+ /**
+ * Update the job statistics.
+ */
+ public synchronized void updateJobStatus(JobStatus status) {
+ this.currentStatus = status;
+ }
+
+ /**
+ * Get the current job status.
+ */
+ public synchronized JobStatus getJobStatus() {
+ return currentStatus;
+ }
}
static class ClusterStats {
@@ -316,15 +362,28 @@ public class Statistics implements Compo
}
int getNumRunningJob() {
- return jobMaps.size();
+ return submittedJobsMap.size();
}
/**
* @return runningWatitingJobs
*/
static Collection<JobStats> getRunningJobStats() {
- return jobMaps.values();
+ return submittedJobsMap.values();
}
+ /**
+ * Returns the total number of submitted map tasks
+ */
+ static int getSubmittedMapTasks() {
+ return numMapsSubmitted;
+ }
+
+ /**
+ * Returns the total number of submitted reduce tasks
+ */
+ static int getSubmittedReduceTasks() {
+ return numReducesSubmitted;
+ }
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java?rev=1292736&r1=1292735&r2=1292736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java Thu Feb 23 10:41:07 2012
@@ -25,11 +25,15 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -88,6 +92,13 @@ public class StressJobFactory extends Jo
final float maxJobTrackerRatio;
/**
+ * Represents a list of blacklisted jobs. Jobs are blacklisted when either
+ * they are complete or their status cannot be obtained. Stress mode will
+ * ignore blacklisted jobs from its overload computation.
+ */
+ private Set<JobID> blacklistedJobs = new HashSet<JobID>();
+
+ /**
* Creating a new instance does not start the thread.
*
* @param submitter Component to which deserialized jobs are passed
@@ -145,42 +156,66 @@ public class StressJobFactory extends Jo
try {
startFlag.await();
if (Thread.currentThread().isInterrupted()) {
+ LOG.warn("[STRESS] Interrupted before start!. Exiting..");
return;
}
LOG.info("START STRESS @ " + System.currentTimeMillis());
while (!Thread.currentThread().isInterrupted()) {
try {
while (loadStatus.overloaded()) {
+ // update the overload status
if (LOG.isDebugEnabled()) {
- LOG.debug("Cluster overloaded in run! Sleeping...");
+ LOG.debug("Updating the overload status.");
}
- // sleep
try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
+ checkLoadAndGetSlotsToBackfill();
+ } catch (IOException ioe) {
+ LOG.warn("[STRESS] Check failed!", ioe);
return;
}
+
+ // if the cluster is still overloaded, then sleep
+ if (loadStatus.overloaded()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[STRESS] Cluster overloaded in run! Sleeping...");
+ }
+
+ // sleep
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ LOG.warn("[STRESS] Interrupted while sleeping! Exiting.", ie);
+ return;
+ }
+ }
}
while (!loadStatus.overloaded()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Cluster underloaded in run! Stressing...");
+ LOG.debug("[STRESS] Cluster underloaded in run! Stressing...");
}
try {
//TODO This in-line read can block submission for large jobs.
final JobStory job = getNextJobFiltered();
if (null == job) {
+ LOG.warn("[STRESS] Finished consuming the input trace. "
+ + "Exiting..");
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Job Selected: " + job.getJobID());
}
- submitter.add(
- jobCreator.createGridmixJob(
- conf, 0L, job, scratch,
- userResolver.getTargetUgi(
- UserGroupInformation.createRemoteUser(job.getUser())),
- sequence.getAndIncrement()));
+
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(job.getUser());
+ UserGroupInformation tgtUgi = userResolver.getTargetUgi(ugi);
+ GridmixJob tJob =
+ jobCreator.createGridmixJob(conf, 0L, job, scratch,
+ tgtUgi, sequence.getAndIncrement());
+
+ // submit the job
+ submitter.add(tJob);
+
// TODO: We need to take care of scenario when one map/reduce
// takes more than 1 slot.
@@ -198,7 +233,7 @@ public class StressJobFactory extends Jo
loadStatus.decrementJobLoad(1);
} catch (IOException e) {
- LOG.error("Error while submitting the job ", e);
+ LOG.error("[STRESS] Error while submitting the job ", e);
error = e;
return;
}
@@ -209,6 +244,7 @@ public class StressJobFactory extends Jo
}
}
} catch (InterruptedException e) {
+ LOG.error("[STRESS] Interrupted in the main block!", e);
return;
} finally {
IOUtils.cleanup(null, jobProducer);
@@ -224,9 +260,17 @@ public class StressJobFactory extends Jo
*/
@Override
public void update(Statistics.ClusterStats item) {
- ClusterStatus clusterMetrics = item.getStatus();
+ ClusterStatus clusterStatus = item.getStatus();
try {
- checkLoadAndGetSlotsToBackfill(item, clusterMetrics);
+ // update the max cluster map/reduce task capacity
+ loadStatus.updateMapCapacity(clusterStatus.getMaxMapTasks());
+
+ loadStatus.updateReduceCapacity(clusterStatus.getMaxReduceTasks());
+
+ int numTrackers = clusterStatus.getTaskTrackers();
+ int jobLoad =
+ (int) (maxJobTrackerRatio * numTrackers) - item.getNumRunningJob();
+ loadStatus.updateJobLoad(jobLoad);
} catch (Exception e) {
LOG.error("Couldn't get the new Status",e);
}
@@ -258,22 +302,8 @@ public class StressJobFactory extends Jo
* @param clusterStatus Cluster status
* @throws java.io.IOException
*/
- private void checkLoadAndGetSlotsToBackfill(
- ClusterStats stats, ClusterStatus clusterStatus) throws IOException, InterruptedException {
-
- // update the max cluster capacity incase its updated
- int mapCapacity = clusterStatus.getMaxMapTasks();
- loadStatus.updateMapCapacity(mapCapacity);
-
- int reduceCapacity = clusterStatus.getMaxReduceTasks();
-
- loadStatus.updateReduceCapacity(reduceCapacity);
-
- int numTrackers = clusterStatus.getTaskTrackers();
-
- int jobLoad =
- (int) (maxJobTrackerRatio * numTrackers) - stats.getNumRunningJob();
- loadStatus.updateJobLoad(jobLoad);
+ protected void checkLoadAndGetSlotsToBackfill()
+ throws IOException, InterruptedException {
if (loadStatus.getJobLoad() <= 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(System.currentTimeMillis() + " [JobLoad] Overloaded is "
@@ -283,17 +313,143 @@ public class StressJobFactory extends Jo
return; // stop calculation because we know it is overloaded.
}
- float incompleteMapTasks = 0; // include pending & running map tasks.
- for (JobStats job : ClusterStats.getRunningJobStats()) {
- float mapProgress = job.getJob().mapProgress();
- int noOfMaps = job.getNoOfMaps();
- incompleteMapTasks +=
- calcEffectiveIncompleteMapTasks(mapCapacity, noOfMaps, mapProgress);
+ int mapCapacity = loadStatus.getMapCapacity();
+ int reduceCapacity = loadStatus.getReduceCapacity();
+
+ // return if the cluster status is not set
+ if (mapCapacity < 0 || reduceCapacity < 0) {
+ // note that, by default, the overload status is true
+ // missing cluster status will result into blocking of job submission
+ return;
+ }
+
+ // Determine the max permissible map & reduce task load
+ int maxMapLoad = (int) (overloadMapTaskMapSlotRatio * mapCapacity);
+ int maxReduceLoad =
+ (int) (overloadReduceTaskReduceSlotRatio * reduceCapacity);
+
+ // compute the total number of map & reduce tasks submitted
+ int totalMapTasks = ClusterStats.getSubmittedMapTasks();
+ int totalReduceTasks = ClusterStats.getSubmittedReduceTasks();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Total submitted map tasks: " + totalMapTasks);
+ LOG.debug("Total submitted reduce tasks: " + totalReduceTasks);
+ LOG.debug("Max map load: " + maxMapLoad);
+ LOG.debug("Max reduce load: " + maxReduceLoad);
+ }
+
+ // generate a pessimistic bound on the max running+pending map tasks
+ // this check is to avoid the heavy-duty actual map load calculation
+ int mapSlotsBackFill = (int) (maxMapLoad - totalMapTasks);
+
+ // generate a pessimistic bound on the max running+pending reduce tasks
+ // this check is to avoid the heavy-duty actual reduce load calculation
+ int reduceSlotsBackFill = (int) (maxReduceLoad - totalReduceTasks);
+
+ // maintain a list of seen job ids
+ Set<JobID> seenJobIDs = new HashSet<JobID>();
+
+ // check if the total number of submitted map/reduce tasks exceeds the
+ // permissible limit
+ if (totalMapTasks > maxMapLoad || totalReduceTasks > maxReduceLoad) {
+ // if yes, calculate the real load
+ float incompleteMapTasks = 0; // include pending & running map tasks.
+ float incompleteReduceTasks = 0; // include pending & running reduce tasks
+
+ for (JobStats job : ClusterStats.getRunningJobStats()) {
+ JobID id = job.getJob().getJobID();
+ seenJobIDs.add(id);
+
+ // Note that this is a hack! Ideally, ClusterStats.getRunningJobStats()
+ // should be smart enough to take care of completed jobs.
+ if (blacklistedJobs.contains(id)) {
+ LOG.warn("Ignoring blacklisted job: " + id);
+ continue;
+ }
+
+ int noOfMaps = job.getNoOfMaps();
+ int noOfReduces = job.getNoOfReds();
+
+ // consider polling for jobs where maps>0 and reds>0
+ // TODO: What about setup/cleanup tasks for cases where m=0 and r=0
+ // What otherwise?
+ if (noOfMaps > 0 || noOfReduces > 0) {
+ // get the job's status
+ JobStatus status = job.getJobStatus();
+
+ // blacklist completed jobs and continue
+ if (status != null && status.isJobComplete()) {
+ LOG.warn("Blacklisting completed job: " + id);
+ blacklistedJobs.add(id);
+ continue;
+ }
+
+ // get the map and reduce tasks' progress
+ float mapProgress = 0f;
+ float reduceProgress = 0f;
+
+ // check if the status is missing (this can happen for unpolled jobs)
+ if (status != null) {
+ mapProgress = status.getMapProgress();
+ reduceProgress = status.getReduceProgress();
+ }
+
+ incompleteMapTasks +=
+ calcEffectiveIncompleteMapTasks(mapCapacity, noOfMaps, mapProgress);
+
+ // bail out early
+ int currentMapSlotsBackFill = (int) (maxMapLoad - incompleteMapTasks);
+ if (currentMapSlotsBackFill <= 0) {
+ // reset the reduce task load since we are bailing out
+ incompleteReduceTasks = totalReduceTasks;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Terminating overload check due to high map load.");
+ }
+ break;
+ }
+
+ // compute the real reduce load
+ if (noOfReduces > 0) {
+ incompleteReduceTasks +=
+ calcEffectiveIncompleteReduceTasks(reduceCapacity, noOfReduces,
+ reduceProgress);
+ }
+
+ // bail out early
+ int currentReduceSlotsBackFill =
+ (int) (maxReduceLoad - incompleteReduceTasks);
+ if (currentReduceSlotsBackFill <= 0) {
+ // reset the map task load since we are bailing out
+ incompleteMapTasks = totalMapTasks;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Terminating overload check due to high reduce load.");
+ }
+ break;
+ }
+ } else {
+ LOG.warn("Blacklisting empty job: " + id);
+ blacklistedJobs.add(id);
+ }
+ }
+
+ // calculate the real map load on the cluster
+ mapSlotsBackFill = (int) (maxMapLoad - incompleteMapTasks);
+
+ // calculate the real reduce load on the cluster
+ reduceSlotsBackFill = (int)(maxReduceLoad - incompleteReduceTasks);
+
+ // clean up the backlisted set to keep the memory footprint minimal
+ // retain only the jobs that are seen in this cycle
+ blacklistedJobs.retainAll(seenJobIDs);
+ if (LOG.isDebugEnabled() && blacklistedJobs.size() > 0) {
+ LOG.debug("Blacklisted jobs count: " + blacklistedJobs.size());
+ }
}
- int mapSlotsBackFill =
- (int) ((overloadMapTaskMapSlotRatio * mapCapacity) - incompleteMapTasks);
- loadStatus.updateMapLoad(mapSlotsBackFill);
+ // update
+ loadStatus.updateMapLoad(mapSlotsBackFill);
+ loadStatus.updateReduceLoad(reduceSlotsBackFill);
if (loadStatus.getMapLoad() <= 0) {
if (LOG.isDebugEnabled()) {
@@ -303,23 +459,7 @@ public class StressJobFactory extends Jo
}
return; // stop calculation because we know it is overloaded.
}
-
- float incompleteReduceTasks = 0; // include pending & running reduce tasks.
- for (JobStats job : ClusterStats.getRunningJobStats()) {
- // Cached the num-reds value in JobStats
- int noOfReduces = job.getNoOfReds();
- if (noOfReduces > 0) {
- float reduceProgress = job.getJob().reduceProgress();
- incompleteReduceTasks +=
- calcEffectiveIncompleteReduceTasks(reduceCapacity, noOfReduces,
- reduceProgress);
- }
- }
- int reduceSlotsBackFill =
- (int)((overloadReduceTaskReduceSlotRatio * reduceCapacity)
- - incompleteReduceTasks);
- loadStatus.updateReduceLoad(reduceSlotsBackFill);
if (loadStatus.getReduceLoad() <= 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(System.currentTimeMillis() + " [REDUCE-LOAD] Overloaded is "
@@ -445,7 +585,7 @@ public class StressJobFactory extends Jo
|| (numJobsBackfill <= 0));
}
- public synchronized boolean overloaded() {
+ public boolean overloaded() {
return overloaded.get();
}
Added: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixStatistics.java?rev=1292736&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixStatistics.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixStatistics.java Thu Feb 23 10:41:07 2012
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * Test the Gridmix's {@link Statistics} class.
+ */
+public class TestGridmixStatistics {
+ /**
+ * Test {@link Statistics.JobStats}.
+ */
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testJobStats() throws Exception {
+ Job job = new Job() {};
+ JobStats stats = new JobStats(1, 2, job);
+ assertEquals("Incorrect num-maps", 1, stats.getNoOfMaps());
+ assertEquals("Incorrect num-reds", 2, stats.getNoOfReds());
+ assertTrue("Incorrect job", job == stats.getJob());
+ assertNull("Unexpected job status", stats.getJobStatus());
+
+ // add a new status
+ JobStatus status = new JobStatus();
+ stats.updateJobStatus(status);
+ assertNotNull("Missing job status", stats.getJobStatus());
+ assertTrue("Incorrect job status", status == stats.getJobStatus());
+ }
+
+ private static JobStory getCustomJobStory(final int numMaps,
+ final int numReds) {
+ return new JobStory() {
+ @Override
+ public InputSplit[] getInputSplits() {
+ return null;
+ }
+ @Override
+ public JobConf getJobConf() {
+ return null;
+ }
+ @Override
+ public JobID getJobID() {
+ return null;
+ }
+ @Override
+ public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int arg0, int arg1,
+ int arg2) {
+ return null;
+ }
+ @Override
+ public String getName() {
+ return null;
+ }
+ @Override
+ public int getNumberMaps() {
+ return numMaps;
+ }
+ @Override
+ public int getNumberReduces() {
+ return numReds;
+ }
+ @Override
+ public Values getOutcome() {
+ return null;
+ }
+ @Override
+ public String getQueueName() {
+ return null;
+ }
+ @Override
+ public long getSubmissionTime() {
+ return 0;
+ }
+ @Override
+ public TaskAttemptInfo getTaskAttemptInfo(TaskType arg0, int arg1,
+ int arg2) {
+ return null;
+ }
+ @Override
+ public TaskInfo getTaskInfo(TaskType arg0, int arg1) {
+ return null;
+ }
+ @Override
+ public String getUser() {
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Test {@link Statistics}.
+ */
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testStatistics() throws Exception {
+ // test job stats generation
+ Configuration conf = new Configuration();
+
+ // test dummy jobs like data-generation etc
+ Job job = new Job(conf) {
+ };
+ JobStats stats = Statistics.generateJobStats(job, null);
+ testJobStats(stats, -1, -1, null, job);
+
+ // add a job desc with 2 map and 1 reduce task
+ conf.setInt(GridmixJob.GRIDMIX_JOB_SEQ, 1);
+
+ // test dummy jobs like data-generation etc
+ job = new Job(conf) {
+ };
+ JobStory zjob = getCustomJobStory(2, 1);
+ stats = Statistics.generateJobStats(job, zjob);
+ testJobStats(stats, 2, 1, null, job);
+
+ // add a job status
+ JobStatus jStatus = new JobStatus();
+ stats.updateJobStatus(jStatus);
+ testJobStats(stats, 2, 1, jStatus, job);
+
+
+ // start the statistics
+ CountDownLatch startFlag = new CountDownLatch(1); // prevents the collector
+ // thread from starting
+ Statistics statistics = new Statistics(new JobConf(), 0, startFlag);
+ statistics.start();
+
+ testClusterStats(0, 0, 0);
+
+ // add to the statistics object
+ statistics.addJobStats(stats);
+ testClusterStats(2, 1, 1);
+
+ // add another job
+ JobStory zjob2 = getCustomJobStory(10, 5);
+ conf.setInt(GridmixJob.GRIDMIX_JOB_SEQ, 2);
+ job = new Job(conf) {
+ };
+
+ JobStats stats2 = Statistics.generateJobStats(job, zjob2);
+ statistics.addJobStats(stats2);
+ testClusterStats(12, 6, 2);
+
+ // finish off one job
+ statistics.add(stats2);
+ testClusterStats(2, 1, 1);
+
+ // finish off the other job
+ statistics.add(stats);
+ testClusterStats(0, 0, 0);
+
+ statistics.shutdown();
+ }
+
+ // test the job stats
+ private static void testJobStats(JobStats stats, int numMaps, int numReds,
+ JobStatus jStatus, Job job) {
+ assertEquals("Incorrect num map tasks", numMaps, stats.getNoOfMaps());
+ assertEquals("Incorrect num reduce tasks", numReds, stats.getNoOfReds());
+
+ if (job != null) {
+ assertNotNull("Missing job", job);
+ }
+ // check running job
+ assertTrue("Incorrect job", job == stats.getJob());
+
+ if (jStatus != null) {
+ assertNotNull("Missing job status", jStatus);
+ }
+ // check job stats
+ assertTrue("Incorrect job status", jStatus == stats.getJobStatus());
+ }
+
+ // test the cluster stats
+ private static void testClusterStats(int numSubmittedMapTasks,
+ int numSubmittedReduceTasks,
+ int numSubmittedJobs) {
+ assertEquals("Incorrect count of total number of submitted map tasks",
+ numSubmittedMapTasks, ClusterStats.getSubmittedMapTasks());
+ assertEquals("Incorrect count of total number of submitted reduce tasks",
+ numSubmittedReduceTasks,
+ ClusterStats.getSubmittedReduceTasks());
+ assertEquals("Incorrect submitted jobs",
+ numSubmittedJobs, ClusterStats.getRunningJobStats().size());
+ }
+}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=1292736&r1=1292735&r2=1292736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Thu Feb 23 10:41:07 2012
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.conf.Configuration;
@@ -96,7 +97,7 @@ public class TestGridmixSubmission {
private final BlockingQueue<Job> retiredJobs;
public TestMonitor(int expected, Statistics stats) {
- super(stats);
+ super(5, TimeUnit.SECONDS, stats, 1);
this.expected = expected;
retiredJobs = new LinkedBlockingQueue<Job>();
}
@@ -349,7 +350,7 @@ public class TestGridmixSubmission {
}
@Override
- protected JobMonitor createJobMonitor(Statistics stats) {
+ protected JobMonitor createJobMonitor(Statistics stats, Configuration conf){
monitor = new TestMonitor(NJOBS + 1, stats);
return monitor;
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java?rev=1292736&r1=1292735&r2=1292736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java Thu Feb 23 10:41:07 2012
@@ -193,7 +193,7 @@ public class TestGridmixSummary {
es.update(null);
assertEquals("ExecutionSummarizer init failed", 0,
es.getSimulationStartTime());
- testExecutionSummarizer(0, 0, 0, 0, 0, 0, es);
+ testExecutionSummarizer(0, 0, 0, 0, 0, 0, 0, es);
long simStartTime = System.currentTimeMillis();
es.start(null);
@@ -203,14 +203,24 @@ public class TestGridmixSummary {
es.getSimulationStartTime() <= System.currentTimeMillis());
// test with job stats
- JobStats stats = generateFakeJobStats(1, 10, true);
+ JobStats stats = generateFakeJobStats(1, 10, true, false);
es.update(stats);
- testExecutionSummarizer(1, 10, 0, 1, 1, 0, es);
+ testExecutionSummarizer(1, 10, 0, 1, 1, 0, 0, es);
// test with failed job
- stats = generateFakeJobStats(5, 1, false);
+ stats = generateFakeJobStats(5, 1, false, false);
es.update(stats);
- testExecutionSummarizer(6, 11, 0, 2, 1, 1, es);
+ testExecutionSummarizer(6, 11, 0, 2, 1, 1, 0, es);
+
+ // test with successful but lost job
+ stats = generateFakeJobStats(1, 1, true, true);
+ es.update(stats);
+ testExecutionSummarizer(7, 12, 0, 3, 1, 1, 1, es);
+
+ // test with failed but lost job
+ stats = generateFakeJobStats(2, 2, false, true);
+ es.update(stats);
+ testExecutionSummarizer(9, 14, 0, 4, 1, 1, 2, es);
// test finalize
// define a fake job factory
@@ -306,7 +316,7 @@ public class TestGridmixSummary {
// test the ExecutionSummarizer
private static void testExecutionSummarizer(int numMaps, int numReds,
int totalJobsInTrace, int totalJobSubmitted, int numSuccessfulJob,
- int numFailedJobs, ExecutionSummarizer es) {
+ int numFailedJobs, int numLostJobs, ExecutionSummarizer es) {
assertEquals("ExecutionSummarizer test failed [num-maps]",
numMaps, es.getNumMapTasksLaunched());
assertEquals("ExecutionSummarizer test failed [num-reducers]",
@@ -319,12 +329,14 @@ public class TestGridmixSummary {
numSuccessfulJob, es.getNumSuccessfulJobs());
assertEquals("ExecutionSummarizer test failed [num-failed jobs]",
numFailedJobs, es.getNumFailedJobs());
+ assertEquals("ExecutionSummarizer test failed [num-lost jobs]",
+ numLostJobs, es.getNumLostJobs());
}
// generate fake job stats
@SuppressWarnings("deprecation")
private static JobStats generateFakeJobStats(final int numMaps,
- final int numReds, final boolean isSuccessful)
+ final int numReds, final boolean isSuccessful, final boolean lost)
throws IOException {
// A fake job
Job fakeJob = new Job() {
@@ -335,6 +347,9 @@ public class TestGridmixSummary {
@Override
public boolean isSuccessful() throws IOException, InterruptedException {
+ if (lost) {
+ throw new IOException("Test failure!");
+ }
return isSuccessful;
};
};
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java?rev=1292736&r1=1292735&r2=1292736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java Thu Feb 23 10:41:07 2012
@@ -42,6 +42,7 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
@@ -74,7 +75,7 @@ public class TestSleepJob {
private final int expected;
public TestMonitor(int expected, Statistics stats) {
- super(stats);
+ super(5, TimeUnit.SECONDS, stats, 1);
this.expected = expected;
retiredJobs = new LinkedBlockingQueue<Job>();
}
@@ -102,7 +103,7 @@ public class TestSleepJob {
private TestMonitor monitor;
@Override
- protected JobMonitor createJobMonitor(Statistics stats) {
+ protected JobMonitor createJobMonitor(Statistics stats, Configuration c) {
monitor = new TestMonitor(NJOBS + 1, stats);
return monitor;
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/docs/src/documentation/content/xdocs/gridmix.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/docs/src/documentation/content/xdocs/gridmix.xml?rev=1292736&r1=1292735&r2=1292736&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/docs/src/documentation/content/xdocs/gridmix.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/docs/src/documentation/content/xdocs/gridmix.xml Thu Feb 23 10:41:07 2012
@@ -206,6 +206,22 @@ hadoop jar <gridmix-jar> org.apach
options using the values obtained from the original task (i.e via
trace).
</td>
+ </tr>
+ <tr>
+ <td>
+ <code>gridmix.job-monitor.thread-count</code>
+ </td>
+ <td>Total number of threads to use for polling for jobs' status. The
+ default value is 1.
+ </td>
+ </tr>
+ <tr>
+ <td>
+ <code>gridmix.job-monitor.sleep-time-ms</code>
+ </td>
+ <td>The time each Gridmix status poller thread will sleep before
+ starting the next cycle. The default value is 500 milliseconds.
+ </td>
</tr>
</table>
</section>