You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:09:33 UTC
svn commit: r1077383 - in
/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src:
java/org/apache/hadoop/mapred/gridmix/ test/org/apache/hadoop/mapred/gridmix/
Author: omalley
Date: Fri Mar 4 04:09:33 2011
New Revision: 1077383
URL: http://svn.apache.org/viewvc?rev=1077383&view=rev
Log:
commit 5f9d982a5ed9525686877955b5dbed450468a746
Author: Rahul Kumar Singh <rk...@yahoo-inc.com>
Date: Sat Apr 10 17:19:45 2010 +0530
MAPREDUCE:1526 from https://issues.apache.org/jira/secure/attachment/12440983/1594-yhadoop-20-1xx-1-5.patc://issues.apache.org/jira/secure/attachment/12441333/1526-yhadoop-20-101-4.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1526. Cache the job related information while submitting the job
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=1077383&r1=1077382&r2=1077383&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Fri Mar 4 04:09:33 2011
@@ -159,7 +159,7 @@ public class Gridmix extends Configured
GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
conf, GridmixJobSubmissionPolicy.STRESS);
LOG.info(" Submission policy is " + policy.name());
- statistics = new Statistics(conf, policy.getPollingInterval(), startFlag,userResolver);
+ statistics = new Statistics(conf, policy.getPollingInterval(), startFlag);
monitor = createJobMonitor(statistics);
int noOfSubmitterThreads = (policy == GridmixJobSubmissionPolicy.SERIAL) ? 1
: Runtime.getRuntime().availableProcessors() + 1;
@@ -168,7 +168,7 @@ public class Gridmix extends Configured
monitor, conf.getInt(
GRIDMIX_SUB_THR, noOfSubmitterThreads), conf.getInt(
GRIDMIX_QUE_DEP, 5), new FilePool(
- conf, ioPath), userResolver);
+ conf, ioPath), userResolver,statistics);
factory = createJobFactory(
submitter, traceIn, scratchDir, conf, startFlag, userResolver);
@@ -190,9 +190,10 @@ public class Gridmix extends Configured
return new JobMonitor(stats);
}
- protected JobSubmitter createJobSubmitter(JobMonitor monitor, int threads,
- int queueDepth, FilePool pool,UserResolver resolver) throws IOException {
- return new JobSubmitter(monitor, threads, queueDepth, pool, resolver);
+ protected JobSubmitter createJobSubmitter(
+ JobMonitor monitor, int threads, int queueDepth, FilePool pool,
+ UserResolver resolver, Statistics statistics) throws IOException {
+ return new JobSubmitter(monitor, threads, queueDepth, pool, statistics);
}
protected JobFactory createJobFactory(
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=1077383&r1=1077382&r2=1077383&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Fri Mar 4 04:09:33 2011
@@ -75,6 +75,7 @@ abstract class GridmixJob implements Cal
protected final long submissionTimeNanos;
private static final ConcurrentHashMap<Integer,List<InputSplit>> descCache =
new ConcurrentHashMap<Integer,List<InputSplit>>();
+ protected static final String GRIDMIX_JOB_SEQ = "gridmix.job.seq";
public GridmixJob(
final Configuration conf, long submissionMillis, final JobStory jobdesc,
@@ -89,7 +90,7 @@ abstract class GridmixJob implements Cal
public Job run() throws IOException {
Job ret = new Job(conf, nameFormat.get().format("%05d", seq)
.toString());
- ret.getConfiguration().setInt("gridmix.job.seq", seq);
+ ret.getConfiguration().setInt(GRIDMIX_JOB_SEQ, seq);
ret.getConfiguration().set(ORIGNAME,
null == jobdesc.getJobID() ? "<unknown>" : jobdesc.getJobID()
.toString());
@@ -160,8 +161,7 @@ abstract class GridmixJob implements Cal
}
static List<InputSplit> pullDescription(JobContext jobCtxt) {
- return pullDescription(jobCtxt.getConfiguration().getInt(
- "gridmix.job.seq", -1));
+ return pullDescription(GridmixJob.getJobSeqId(jobCtxt));
}
static List<InputSplit> pullDescription(int seq) {
@@ -211,6 +211,10 @@ abstract class GridmixJob implements Cal
return id();
}
+ static int getJobSeqId(JobContext job) {
+ return job.getConfiguration().getInt(GRIDMIX_JOB_SEQ,-1);
+ }
+
public static class DraftPartitioner<V> extends Partitioner<GridmixKey,V> {
public int getPartition(GridmixKey key, V value, int numReduceTasks) {
return key.getPartition();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java?rev=1077383&r1=1077382&r2=1077383&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java Fri Mar 4 04:09:33 2011
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.mapred.gridmix;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.ExecutorService;
@@ -25,12 +28,6 @@ import java.util.concurrent.RejectedExec
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.security.PrivilegedExceptionAction;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.security.UserGroupInformation;
/**
* Component accepting deserialized job traces, computing split data, and
@@ -43,12 +40,12 @@ class JobSubmitter implements Gridmix.Co
public static final Log LOG = LogFactory.getLog(JobSubmitter.class);
- final Semaphore sem;
+ private final Semaphore sem;
private final FilePool inputDir;
private final JobMonitor monitor;
+ private final Statistics statistics;
private final ExecutorService sched;
private volatile boolean shutdown = false;
- private final UserResolver resolver;
/**
* Initialize the submission component with downstream monitor and pool of
@@ -59,16 +56,18 @@ class JobSubmitter implements Gridmix.Co
* @param queueDepth Max depth of pending work queue
* See {@link Gridmix#GRIDMIX_QUE_DEP}.
* @param inputDir Set of files from which split data may be mined for
- * synthetic jobs.
+ * synthetic job
+ * @param statistics
*/
- public JobSubmitter(JobMonitor monitor, int threads, int queueDepth,
- FilePool inputDir, UserResolver resolver) {
+ public JobSubmitter(
+ JobMonitor monitor, int threads, int queueDepth, FilePool inputDir,
+ Statistics statistics) {
sem = new Semaphore(queueDepth);
sched = new ThreadPoolExecutor(threads, threads, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
this.inputDir = inputDir;
this.monitor = monitor;
- this.resolver = resolver;
+ this.statistics = statistics;
}
/**
@@ -106,6 +105,7 @@ class JobSubmitter implements Gridmix.Co
try {
// submit job
monitor.add(job.call());
+ statistics.addJobStats(job.getJob(), job.getJobDesc());
LOG.debug("SUBMIT " + job + "@" + System.currentTimeMillis() +
" (" + job.getJob().getJobID() + ")");
} catch (IOException e) {
@@ -132,7 +132,7 @@ class JobSubmitter implements Gridmix.Co
monitor.submissionFailed(job.getJob());
} finally {
sem.release();
- }
+ }
}
}
@@ -154,6 +154,7 @@ class JobSubmitter implements Gridmix.Co
/**
* (Re)scan the set of input files from which splits are derived.
+ * @throws java.io.IOException
*/
public void refreshFilePool() throws IOException {
inputDir.refresh();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java?rev=1077383&r1=1077382&r2=1077383&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java Fri Mar 4 04:09:33 2011
@@ -17,29 +17,28 @@
*/
package org.apache.hadoop.mapred.gridmix;
-import org.apache.hadoop.mapred.gridmix.Gridmix.Component;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobClient;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.gridmix.Gridmix.Component;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.TaskReport;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.tools.rumen.JobStory;
+import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
-import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
/**
* Component collecting the stats required by other components
@@ -63,6 +62,10 @@ public class Statistics implements Compo
private final List<StatListener<JobStats>> jobStatListeners =
new ArrayList<StatListener<JobStats>>();
+ //List of jobids and noofMaps for each job
+ private static final Map<Integer, JobStats> jobMaps =
+ new ConcurrentHashMap<Integer,JobStats>();
+
private int completedJobsInCurrentInterval = 0;
private final int jtPollingInterval;
private volatile boolean shutdown = false;
@@ -72,37 +75,42 @@ public class Statistics implements Compo
private final ReentrantLock lock = new ReentrantLock();
private final Condition jobCompleted = lock.newCondition();
private final CountDownLatch startFlag;
- private final UserResolver userResolver;
- private static Map<JobID, TaskReport[]> jobTaskReports =
- new ConcurrentHashMap<JobID, TaskReport[]>();
public Statistics(
- final Configuration conf, int pollingInterval, CountDownLatch startFlag,UserResolver userResolver)
- throws IOException {
+ final Configuration conf, int pollingInterval, CountDownLatch startFlag)
+ throws IOException, InterruptedException {
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
- this.userResolver = userResolver;
- try {
this.cluster = ugi.doAs(new PrivilegedExceptionAction<JobClient>(){
- public JobClient run() {
- try {
- return new JobClient(new JobConf(conf));
- } catch (IOException e) {
- LOG.error(" error while createing job client " + e.getMessage());
- }
- return null;
+ public JobClient run() throws IOException {
+ return new JobClient(new JobConf(conf));
}
});
- } catch (InterruptedException e) {
- LOG.error(" Exception in statisitics " + e.getMessage());
- } catch (IOException e) {
- LOG.error("Exception in statistics " + e.getMessage());
- }
+
this.jtPollingInterval = pollingInterval;
maxJobCompletedInInterval = conf.getInt(
MAX_JOBS_COMPLETED_IN_POLL_INTERVAL_KEY, 1);
this.startFlag = startFlag;
}
+ public void addJobStats(Job job, JobStory jobdesc) {
+ int seq = GridmixJob.getJobSeqId(job);
+ if (seq < 0) {
+ LOG.info("Not tracking job " + job.getJobName()
+ + " as seq id is less than zero: " + seq);
+ return;
+ }
+
+ int maps = 0;
+ if (jobdesc == null) {
+ throw new IllegalArgumentException(
+ " JobStory not available for job " + job.getJobName());
+ } else {
+ maps = jobdesc.getNumberMaps();
+ }
+ JobStats stats = new JobStats(maps,job);
+ jobMaps.put(seq,stats);
+ }
+
/**
* Used by JobMonitor to add the completed job.
*/
@@ -114,27 +122,25 @@ public class Statistics implements Compo
if(!statistics.isAlive()) {
return;
}
+ JobStats stat = jobMaps.remove(GridmixJob.getJobSeqId(job));
+
+ if (stat == null) return;
+
completedJobsInCurrentInterval++;
- if (job.getJobID() != null) {
- jobTaskReports.remove(job.getJobID());
- }
+
//check if we have reached the maximum level of job completions.
if (completedJobsInCurrentInterval >= maxJobCompletedInInterval) {
if (LOG.isDebugEnabled()) {
LOG.debug(
- " Reached maximum limit of jobs in a polling interval " +
+ "Reached maximum limit of jobs in a polling interval " +
completedJobsInCurrentInterval);
}
completedJobsInCurrentInterval = 0;
lock.lock();
try {
//Job is completed notify all the listeners.
- if (jobStatListeners.size() > 0) {
- for (StatListener<JobStats> l : jobStatListeners) {
- JobStats stats = new JobStats();
- stats.setCompleteJob(job);
- l.update(stats);
- }
+ for (StatListener<JobStats> l : jobStatListeners) {
+ l.update(stat);
}
this.jobCompleted.signalAll();
} finally {
@@ -145,7 +151,6 @@ public class Statistics implements Compo
//TODO: We have just 2 types of listeners as of now . If no of listeners
//increase then we should move to map kind of model.
-
public void addClusterStatsObservers(StatListener<ClusterStats> listener) {
clusterStatlisteners.add(listener);
}
@@ -197,93 +202,26 @@ public class Statistics implements Compo
if (clusterStatlisteners.size() > 0) {
try {
ClusterStatus clusterStatus = cluster.getClusterStatus();
- JobStatus[] allJobs = cluster.getAllJobs();
- List<JobStatus> runningWaitingJobs = getRunningWaitingJobs(allJobs);
- getJobReports(runningWaitingJobs);
updateAndNotifyClusterStatsListeners(
- clusterStatus, runningWaitingJobs);
+ clusterStatus);
} catch (IOException e) {
LOG.error(
"Statistics io exception while polling JT ", e);
return;
- } catch (InterruptedException e) {
- LOG.error(
- "Statistics interrupt exception while polling JT ", e);
- return;
}
}
}
}
private void updateAndNotifyClusterStatsListeners(
- ClusterStatus clusterStatus, List<JobStatus> runningWaitingJobs) {
+ ClusterStatus clusterStatus) {
ClusterStats stats = ClusterStats.getClusterStats();
stats.setClusterMetric(clusterStatus);
- stats.setRunningWaitingJobs(runningWaitingJobs);
for (StatListener<ClusterStats> listener : clusterStatlisteners) {
listener.update(stats);
}
}
- private void getJobReports(List<JobStatus> jobs) throws IOException {
- for (final JobStatus job : jobs) {
-
- final UserGroupInformation user = userResolver.getTargetUgi(
- UserGroupInformation.createRemoteUser(job.getUsername()));
- try {
- user.doAs(
- new PrivilegedExceptionAction<Void>() {
- public Void run() {
- JobID id = job.getJobID();
- if (!jobTaskReports.containsKey(id)) {
- try {
- jobTaskReports.put(
- id, cluster.getMapTaskReports(
- org.apache.hadoop.mapred.JobID.downgrade(id)));
- } catch (IOException e) {
- LOG.error(
- " Couldnt get the MapTaskResports for " + job.getJobId() +
- " job username "+ job.getUsername() +" cause " + user);
- }
- }
- return null;
- }
- });
- } catch (InterruptedException e) {
- LOG.error(
- " Could nt get information for user " + user + " and job " +
- job.getJobId());
- } catch (IOException e) {
- LOG.error(
- " Could nt get information for user " + user + " and job " +
- job.getJobId());
- throw new IOException(e);
- }
- }
- }
-
- /**
- * From the list of Jobs , give the list of jobs whoes state is eigther
- * PREP or RUNNING.
- *
- * @param allJobs
- * @return
- * @throws java.io.IOException
- * @throws InterruptedException
- */
- private List<JobStatus> getRunningWaitingJobs(JobStatus[] allJobs)
- throws IOException, InterruptedException {
- List<JobStatus> result = new ArrayList<JobStatus>();
- for (JobStatus job : allJobs) {
- //TODO Check if job.getStatus() makes a rpc call
- int state = job.getRunState();
- if (JobStatus.PREP == state || JobStatus.RUNNING == state) {
- result.add(job);
- }
- }
- return result;
- }
-
}
/**
@@ -298,7 +236,7 @@ public class Statistics implements Compo
@Override
public void shutdown() {
shutdown = true;
- jobTaskReports.clear();
+ jobMaps.clear();
clusterStatlisteners.clear();
jobStatListeners.clear();
statistics.interrupt();
@@ -307,7 +245,7 @@ public class Statistics implements Compo
@Override
public void abort() {
shutdown = true;
- jobTaskReports.clear();
+ jobMaps.clear();
clusterStatlisteners.clear();
jobStatListeners.clear();
statistics.interrupt();
@@ -319,21 +257,31 @@ public class Statistics implements Compo
* TODO: In future we need to extend this to send more information.
*/
static class JobStats {
- private Job completedJob;
+ private int noOfMaps;
+ private Job job;
- public Job getCompleteJob() {
- return completedJob;
+ public JobStats(int noOfMaps,Job job){
+ this.job = job;
+ this.noOfMaps = noOfMaps;
+ }
+ public int getNoOfMaps() {
+ return noOfMaps;
}
- public void setCompleteJob(Job job) {
- this.completedJob = job;
+ /**
+ * Returns the job ,
+ * We should not use job.getJobID it returns null in 20.1xx.
+ * Use (GridmixJob.getJobSeqId(job)) instead
+ * @return job
+ */
+ public Job getJob() {
+ return job;
}
}
static class ClusterStats {
private ClusterStatus status = null;
private static ClusterStats stats = new ClusterStats();
- private List<JobStatus> runningWaitingJobs;
private ClusterStats() {
@@ -360,20 +308,15 @@ public class Statistics implements Compo
return status;
}
+ int getNumRunningJob() {
+ return jobMaps.size();
+ }
+
/**
* @return runningWatitingJobs
*/
- public List<JobStatus> getRunningWaitingJobs() {
- return runningWaitingJobs;
- }
-
- public void setRunningWaitingJobs(List<JobStatus> runningWaitingJobs) {
- this.runningWaitingJobs = runningWaitingJobs;
+ static Collection<JobStats> getRunningJobStats() {
+ return jobMaps.values();
}
-
- public Map<JobID, TaskReport[]> getJobReports() {
- return jobTaskReports;
- }
-
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java?rev=1077383&r1=1077382&r2=1077383&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java Fri Mar 4 04:09:33 2011
@@ -17,27 +17,26 @@
*/
package org.apache.hadoop.mapred.gridmix;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
-import java.util.List;
public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
public static final Log LOG = LogFactory.getLog(StressJobFactory.class);
private LoadStatus loadStatus = new LoadStatus();
- private List<JobStatus> runningWaitingJobs;
private final Condition overloaded = this.lock.newCondition();
/**
* The minimum ratio between pending+running map tasks (aka. incomplete map
@@ -45,7 +44,7 @@ public class StressJobFactory extends Jo
* overloaded. For running maps, we only count them partially. Namely, a 40%
* completed map is counted as 0.6 map tasks in our calculation.
*/
- static final float OVERLAOD_MAPTASK_MAPSLOT_RATIO = 2.0f;
+ static final float OVERLOAD_MAPTASK_MAPSLOT_RATIO = 2.0f;
/**
* Creating a new instance does not start the thread.
@@ -115,7 +114,7 @@ public class StressJobFactory extends Jo
}
int noOfSlotsAvailable = loadStatus.numSlotsBackfill;
- LOG.info(" No of slots to be backfilled are " + noOfSlotsAvailable);
+ LOG.info("No of slots to be backfilled are " + noOfSlotsAvailable);
for (int i = 0; i < noOfSlotsAvailable; i++) {
try {
@@ -133,7 +132,7 @@ public class StressJobFactory extends Jo
UserGroupInformation.createRemoteUser(
job.getUser())), sequence.getAndIncrement()));
} catch (IOException e) {
- LOG.error(" EXCEPTOIN in availableSlots ", e);
+ LOG.error("Error while submitting the job ", e);
error = e;
return;
}
@@ -152,7 +151,6 @@ public class StressJobFactory extends Jo
}
/**
- * <p/>
* STRESS Once you get the notification from StatsCollector.Collect the
* clustermetrics. Update current loadStatus with new load status of JT.
*
@@ -163,11 +161,11 @@ public class StressJobFactory extends Jo
lock.lock();
try {
ClusterStatus clusterMetrics = item.getStatus();
- LoadStatus newStatus;
- runningWaitingJobs = item.getRunningWaitingJobs();
- newStatus = checkLoadAndGetSlotsToBackfill(item, clusterMetrics);
- loadStatus.isOverloaded = newStatus.isOverloaded;
- loadStatus.numSlotsBackfill = newStatus.numSlotsBackfill;
+ try {
+ checkLoadAndGetSlotsToBackfill(item,clusterMetrics);
+ } catch (IOException e) {
+ LOG.error("Couldn't get the new Status",e);
+ }
overloaded.signalAll();
} finally {
lock.unlock();
@@ -178,47 +176,44 @@ public class StressJobFactory extends Jo
* We try to use some light-weight mechanism to determine cluster load.
*
* @param stats
- * @param clusterStatus
- * @return Whether, from job client perspective, the cluster is overloaded.
+ * @param clusterStatus Cluster status
+ * @throws java.io.IOException
*/
- private LoadStatus checkLoadAndGetSlotsToBackfill(
- Statistics.ClusterStats stats, ClusterStatus clusterStatus) {
- LoadStatus loadStatus = new LoadStatus();
+ private void checkLoadAndGetSlotsToBackfill(
+ ClusterStats stats, ClusterStatus clusterStatus) throws IOException {
// If there are more jobs than number of task trackers, we assume the
- // cluster is overloaded. This is to bound the memory usage of the
- // simulator job tracker, in situations where we have jobs with small
- // number of map tasks and large number of reduce tasks.
- if (runningWaitingJobs.size() >= clusterStatus.getTaskTrackers()) {
+ // cluster is overloaded.
+ if (stats.getNumRunningJob() >= clusterStatus.getTaskTrackers()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
System.currentTimeMillis() + " Overloaded is " +
Boolean.TRUE.toString() + " #runningJobs >= taskTrackerCount (" +
- runningWaitingJobs.size() + " >= " +
+ stats.getNumRunningJob() + " >= " +
clusterStatus.getTaskTrackers() + " )\n");
}
loadStatus.isOverloaded = true;
loadStatus.numSlotsBackfill = 0;
- return loadStatus;
+ return;
}
float incompleteMapTasks = 0; // include pending & running map tasks.
- for (JobStatus job : runningWaitingJobs) {
- incompleteMapTasks += (1 - Math.min(
- job.mapProgress(), 1.0)) * stats.getJobReports().get(
- job.getJobID()).length;
+ for (JobStats job : ClusterStats.getRunningJobStats()) {
+ float mapProgress = job.getJob().mapProgress();
+ int noOfMaps = job.getNoOfMaps();
+ incompleteMapTasks += (1 - Math.min(mapProgress,1.0))* noOfMaps;
}
float overloadedThreshold =
- OVERLAOD_MAPTASK_MAPSLOT_RATIO * clusterStatus.getMaxMapTasks();
+ OVERLOAD_MAPTASK_MAPSLOT_RATIO * clusterStatus.getMaxMapTasks();
boolean overloaded = incompleteMapTasks > overloadedThreshold;
String relOp = (overloaded) ? ">" : "<=";
if (LOG.isDebugEnabled()) {
- LOG.info(
+ LOG.debug(
System.currentTimeMillis() + " Overloaded is " + Boolean.toString(
overloaded) + " incompleteMapTasks " + relOp + " " +
- OVERLAOD_MAPTASK_MAPSLOT_RATIO + "*mapSlotCapacity" + "(" +
+ OVERLOAD_MAPTASK_MAPSLOT_RATIO + "*mapSlotCapacity" + "(" +
incompleteMapTasks + " " + relOp + " " +
- OVERLAOD_MAPTASK_MAPSLOT_RATIO + "*" +
+ OVERLOAD_MAPTASK_MAPSLOT_RATIO + "*" +
clusterStatus.getMaxMapTasks() + ")");
}
if (overloaded) {
@@ -233,12 +228,11 @@ public class StressJobFactory extends Jo
if (LOG.isDebugEnabled()) {
LOG.debug("Current load Status is " + loadStatus);
}
- return loadStatus;
}
static class LoadStatus {
- volatile boolean isOverloaded = false;
- volatile int numSlotsBackfill = -1;
+ boolean isOverloaded = false;
+ int numSlotsBackfill = -1;
public String toString() {
return " is Overloaded " + isOverloaded + " no of slots available " +
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java?rev=1077383&r1=1077382&r2=1077383&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java Fri Mar 4 04:09:33 2011
@@ -39,8 +39,7 @@ public interface UserResolver {
* Configure the user map given the URI and configuration. The resolver's
* contract will define how the resource will be interpreted, but the default
* will typically interpret the URI as a {@link org.apache.hadoop.fs.Path}
- * listing target users. The format of this file is defined by {@link
- * #parseUserList}.
+ * listing target users.
* @param userdesc URI (possibly null) from which user information may be
* loaded per the subclass contract.
* @param conf The tool configuration.
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=1077383&r1=1077382&r2=1077383&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Fri Mar 4 04:09:33 2011
@@ -352,7 +352,6 @@ public class TestGridmixSubmission {
conf = new Configuration();
conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy);
conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
-// GridmixTestUtils.createHomeAndStagingDirectory((JobConf)conf);
// allow synthetic users to create home directories
GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short)0777));
GridmixTestUtils.dfs.setPermission(root, new FsPermission((short)0777));