You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:34:26 UTC
svn commit: r1077034 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
contrib/fairscheduler/
contrib/fairscheduler/src/test/org/apache/hadoop/mapred/
mapred/org/apache/hadoop/m...
Author: omalley
Date: Fri Mar 4 03:34:25 2011
New Revision: 1077034
URL: http://svn.apache.org/viewvc?rev=1077034&view=rev
Log:
commit 3b6c557e3e1f3d03df503114b0123263b1a56e9e
Author: Hemanth Yamijala <yh...@apache.org>
Date: Sat Oct 24 17:42:59 2009 +0530
MAPREDUCE:1103 from https://issues.apache.org/jira/secure/attachment/12423030/1103_v5_yahoo_1.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1103. Added more metrics to Jobtracker. (sharad)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/ivy.xml
hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar 4 03:34:25 2011
@@ -166,8 +166,9 @@ public class TestCapacityScheduler exten
private int speculativeMapTaskCounter = 0;
private int speculativeReduceTaskCounter = 0;
public FakeJobInProgress(JobID jId, JobConf jobConf,
- FakeTaskTrackerManager taskTrackerManager, String user) {
- super(jId, jobConf);
+ FakeTaskTrackerManager taskTrackerManager, String user,
+ JobTracker jt) {
+ super(jId, jobConf, jt);
this.taskTrackerManager = taskTrackerManager;
this.startTime = System.currentTimeMillis();
this.status = new JobStatus(jId, 0f, 0f, JobStatus.PREP);
@@ -310,8 +311,9 @@ public class TestCapacityScheduler exten
static class FakeFailingJobInProgress extends FakeJobInProgress {
public FakeFailingJobInProgress(JobID id, JobConf jobConf,
- FakeTaskTrackerManager taskTrackerManager, String user) {
- super(id, jobConf, taskTrackerManager, user);
+ FakeTaskTrackerManager taskTrackerManager, String user,
+ JobTracker jt) {
+ super(id, jobConf, taskTrackerManager, user, jt);
}
@Override
@@ -765,7 +767,7 @@ public class TestCapacityScheduler exten
FakeJobInProgress job =
new FakeJobInProgress(new JobID("test", ++jobCounter),
(jobConf == null ? new JobConf(conf) : jobConf), taskTrackerManager,
- jobConf.getUser());
+ jobConf.getUser(), UtilsForTests.getJobTracker());
job.getStatus().setRunState(state);
taskTrackerManager.submitJob(job);
return job;
@@ -2588,7 +2590,8 @@ public class TestCapacityScheduler exten
//Submit a job whose initialization would fail always.
FakeJobInProgress job =
new FakeFailingJobInProgress(new JobID("test", ++jobCounter),
- new JobConf(), taskTrackerManager,"u1");
+ new JobConf(), taskTrackerManager,"u1",
+ UtilsForTests.getJobTracker());
job.getStatus().setRunState(JobStatus.PREP);
taskTrackerManager.submitJob(job);
//check if job is present in waiting list.
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/ivy.xml?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/ivy.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/ivy.xml Fri Mar 4 03:34:25 2011
@@ -38,5 +38,21 @@
name="junit"
rev="${junit.version}"
conf="common->default"/>
+ <dependency org="org.mortbay.jetty"
+ name="jetty-util"
+ rev="${jetty-util.version}"
+ conf="common->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="jetty"
+ rev="${jetty.version}"
+ conf="common->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="jsp-api-2.1"
+ rev="${jetty.version}"
+ conf="common->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="jsp-2.1"
+ rev="${jetty.version}"
+ conf="common->master"/>
</dependencies>
</ivy-module>
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Fri Mar 4 03:34:25 2011
@@ -51,8 +51,9 @@ public class TestFairScheduler extends T
private FakeTaskTrackerManager taskTrackerManager;
public FakeJobInProgress(JobConf jobConf,
- FakeTaskTrackerManager taskTrackerManager) throws IOException {
- super(new JobID("test", ++jobCounter), jobConf);
+ FakeTaskTrackerManager taskTrackerManager,
+ JobTracker jt) throws IOException {
+ super(new JobID("test", ++jobCounter), jobConf, jt);
this.taskTrackerManager = taskTrackerManager;
this.startTime = System.currentTimeMillis();
this.status = new JobStatus();
@@ -292,7 +293,8 @@ public class TestFairScheduler extends T
jobConf.setNumReduceTasks(reduces);
if (pool != null)
jobConf.set(POOL_PROPERTY, pool);
- JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager);
+ JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager,
+ UtilsForTests.getJobTracker());
job.getStatus().setRunState(state);
taskTrackerManager.submitJob(job);
job.startTime = clock.time;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 03:34:25 2011
@@ -258,14 +258,14 @@ class JobInProgress {
/**
* Create an almost empty JobInProgress, which can be used only for tests
*/
- protected JobInProgress(JobID jobid, JobConf conf) {
+ protected JobInProgress(JobID jobid, JobConf conf, JobTracker tracker) {
this.conf = conf;
this.jobId = jobid;
this.numMapTasks = conf.getNumMapTasks();
this.numReduceTasks = conf.getNumReduceTasks();
this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL;
this.anyCacheLevel = this.maxLevel+1;
- this.jobtracker = null;
+ this.jobtracker = tracker;
this.restartCount = 0;
}
@@ -292,6 +292,7 @@ class JobInProgress {
+ jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
this.jobtracker = jobtracker;
this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
+ this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
this.startTime = System.currentTimeMillis();
status.setStartTime(startTime);
this.localFs = FileSystem.getLocal(default_conf);
@@ -1413,6 +1414,7 @@ class JobInProgress {
name = Values.CLEANUP.name();
} else if (tip.isMapTask()) {
++runningMapTasks;
+ metrics.addRunningMaps(jobId, 1);
name = Values.MAP.name();
counter = Counter.TOTAL_LAUNCHED_MAPS;
splits = tip.getSplitNodes();
@@ -1421,6 +1423,7 @@ class JobInProgress {
metrics.launchMap(id);
} else {
++runningReduceTasks;
+ metrics.addRunningReduces(jobId, 1);
name = Values.REDUCE.name();
counter = Counter.TOTAL_LAUNCHED_REDUCES;
if (tip.getActiveTasks().size() > 1)
@@ -1541,8 +1544,10 @@ class JobInProgress {
long now = System.currentTimeMillis();
FallowSlotInfo info = map.get(taskTracker);
+ int reservedSlots = 0;
if (info == null) {
info = new FallowSlotInfo(now, numSlots);
+ reservedSlots = numSlots;
} else {
// Increment metering info if the reservation is changing
if (info.getNumSlots() != numSlots) {
@@ -1554,11 +1559,18 @@ class JobInProgress {
jobCounters.incrCounter(counter, fallowSlotMillis);
// Update
+ reservedSlots = numSlots - info.getNumSlots();
info.setTimestamp(now);
info.setNumSlots(numSlots);
}
}
map.put(taskTracker, info);
+ if (type == TaskType.MAP) {
+ jobtracker.getInstrumentation().addReservedMapSlots(reservedSlots);
+ }
+ else {
+ jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
+ }
}
public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
@@ -1584,6 +1596,13 @@ class JobInProgress {
jobCounters.incrCounter(counter, fallowSlotMillis);
map.remove(taskTracker);
+ if (type == TaskType.MAP) {
+ jobtracker.getInstrumentation().decReservedMapSlots(info.getNumSlots());
+ }
+ else {
+ jobtracker.getInstrumentation().decReservedReduceSlots(
+ info.getNumSlots());
+ }
}
public int getNumReservedTaskTrackersForMaps() {
@@ -2265,7 +2284,7 @@ class JobInProgress {
this.status.setSetupProgress(1.0f);
// move the job to running state if the job is in prep state
if (this.status.getRunState() == JobStatus.PREP) {
- this.status.setRunState(JobStatus.RUNNING);
+ changeStateTo(JobStatus.RUNNING);
JobHistory.JobInfo.logStarted(profile.getJobID());
}
} else if (tip.isJobCleanupTask()) {
@@ -2294,6 +2313,7 @@ class JobInProgress {
jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
} else if (tip.isMapTask()) {
runningMapTasks -= 1;
+ metrics.decRunningMaps(jobId, 1);
// check if this was a sepculative task
if (oldNumAttempts > 1) {
speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
@@ -2307,6 +2327,7 @@ class JobInProgress {
}
} else {
runningReduceTasks -= 1;
+ metrics.decRunningReduces(jobId, 1);
if (oldNumAttempts > 1) {
speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
}
@@ -2323,6 +2344,31 @@ class JobInProgress {
}
/**
+ * Job state change must happen thru this call
+ */
+ private void changeStateTo(int newState) {
+ int oldState = this.status.getRunState();
+ if (oldState == newState) {
+ return; //old and new states are same
+ }
+ this.status.setRunState(newState);
+
+ //update the metrics
+ if (oldState == JobStatus.PREP) {
+ this.jobtracker.getInstrumentation().decPrepJob(conf, jobId);
+ } else if (oldState == JobStatus.RUNNING) {
+ this.jobtracker.getInstrumentation().decRunningJob(conf, jobId);
+ }
+
+ if (newState == JobStatus.PREP) {
+ this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
+ } else if (newState == JobStatus.RUNNING) {
+ this.jobtracker.getInstrumentation().addRunningJob(conf, jobId);
+ }
+
+ }
+
+ /**
* The job is done since all it's component tasks are either
* successful or have failed.
*/
@@ -2332,7 +2378,7 @@ class JobInProgress {
// All tasks are complete, then the job is done!
//
if (this.status.getRunState() == JobStatus.RUNNING ) {
- this.status.setRunState(JobStatus.SUCCEEDED);
+ changeStateTo(JobStatus.SUCCEEDED);
this.status.setCleanupProgress(1.0f);
if (maps.length == 0) {
this.status.setMapProgress(1.0f);
@@ -2371,7 +2417,7 @@ class JobInProgress {
this.status.setCleanupProgress(1.0f);
if (jobTerminationState == JobStatus.FAILED) {
- this.status.setRunState(JobStatus.FAILED);
+ changeStateTo(JobStatus.FAILED);
// Log the job summary
JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
@@ -2381,7 +2427,7 @@ class JobInProgress {
this.finishedMapTasks,
this.finishedReduceTasks);
} else {
- this.status.setRunState(JobStatus.KILLED);
+ changeStateTo(JobStatus.KILLED);
// Log the job summary
JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
@@ -2395,6 +2441,13 @@ class JobInProgress {
jobtracker.getInstrumentation().terminateJob(
this.conf, this.status.getJobID());
+ if (jobTerminationState == JobStatus.FAILED) {
+ jobtracker.getInstrumentation().failedJob(
+ this.conf, this.status.getJobID());
+ } else {
+ jobtracker.getInstrumentation().killedJob(
+ this.conf, this.status.getJobID());
+ }
}
}
@@ -2536,6 +2589,7 @@ class JobInProgress {
launchedSetup = false;
} else if (tip.isMapTask()) {
runningMapTasks -= 1;
+ metrics.decRunningMaps(jobId, 1);
metrics.failedMap(taskid);
// remove from the running queue and put it in the non-running cache
// if the tip is not complete i.e if the tip still needs to be run
@@ -2545,6 +2599,7 @@ class JobInProgress {
}
} else {
runningReduceTasks -= 1;
+ metrics.decRunningReduces(jobId, 1);
metrics.failedReduce(taskid);
// remove from the running queue and put in the failed queue if the tip
// is not complete
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:34:25 2011
@@ -405,10 +405,11 @@ public class JobTracker implements MRCon
// tracker is lost, and if it is blacklisted, remove
// it from the count of blacklisted trackers in the cluster
if (isBlacklisted(trackerName)) {
- faultyTrackers.numBlacklistedTrackers -= 1;
+ faultyTrackers.decrBlackListedTrackers(1);
}
updateTaskTrackerStatus(trackerName, null);
statistics.taskTrackerRemoved(trackerName);
+ getInstrumentation().decTrackers(1);
// remove the mapping from the hosts list
String hostname = newProfile.getHost();
hostnameToTaskTracker.get(hostname).remove(trackerName);
@@ -718,6 +719,16 @@ public class JobTracker implements MRCon
}
}
+ private void incrBlackListedTrackers(int count) {
+ numBlacklistedTrackers += count;
+ getInstrumentation().addBlackListedTrackers(count);
+ }
+
+ private void decrBlackListedTrackers(int count) {
+ numBlacklistedTrackers -= count;
+ getInstrumentation().decBlackListedTrackers(count);
+ }
+
private void blackListTracker(String hostName, String reason, ReasonForBlackListing rfb) {
FaultInfo fi = getFaultInfo(hostName, true);
boolean blackListed = fi.isBlacklisted();
@@ -875,7 +886,7 @@ public class JobTracker implements MRCon
getInstrumentation().addBlackListedReduceSlots(
reduceSlots);
}
- numBlacklistedTrackers += uniqueHostsMap.remove(hostName);
+ incrBlackListedTrackers(uniqueHostsMap.remove(hostName));
}
}
@@ -895,7 +906,7 @@ public class JobTracker implements MRCon
}
uniqueHostsMap.put(hostName,
numTrackersOnHost);
- numBlacklistedTrackers -= numTrackersOnHost;
+ decrBlackListedTrackers(numTrackersOnHost);
}
}
@@ -2754,6 +2765,7 @@ public class JobTracker implements MRCon
hostnameToTaskTracker.put(hostname, trackers);
}
statistics.taskTrackerAdded(status.getTrackerName());
+ getInstrumentation().addTrackers(1);
LOG.info("Adding tracker " + status.getTrackerName() + " to host "
+ hostname);
trackers.add(taskTracker);
@@ -3062,6 +3074,8 @@ public class JobTracker implements MRCon
if (oldStatus != null) {
totalMaps -= oldStatus.countMapTasks();
totalReduces -= oldStatus.countReduceTasks();
+ getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots());
+ getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots());
if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
int mapSlots = oldStatus.getMaxMapSlots();
totalMapTaskCapacity -= mapSlots;
@@ -3084,6 +3098,8 @@ public class JobTracker implements MRCon
if (status != null) {
totalMaps += status.countMapTasks();
totalReduces += status.countReduceTasks();
+ getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots());
+ getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots());
if (!faultyTrackers.isBlacklisted(status.getHost())) {
int mapSlots = status.getMaxMapSlots();
totalMapTaskCapacity += mapSlots;
@@ -3192,7 +3208,7 @@ public class JobTracker implements MRCon
// if this is lost tracker that came back now, and if it blacklisted
// increment the count of blacklisted trackers in the cluster
if (isBlacklisted(trackerName)) {
- faultyTrackers.numBlacklistedTrackers += 1;
+ faultyTrackers.incrBlackListedTrackers(1);
}
addNewTracker(taskTracker);
}
@@ -4234,12 +4250,13 @@ public class JobTracker implements MRCon
}
// main decommission
- private synchronized void decommissionNodes(Set<String> hosts)
+ synchronized void decommissionNodes(Set<String> hosts)
throws IOException {
LOG.info("Decommissioning " + hosts.size() + " nodes");
// create a list of tracker hostnames
synchronized (taskTrackers) {
synchronized (trackerExpiryQueue) {
+ int trackersDecommissioned = 0;
for (String host : hosts) {
LOG.info("Decommissioning host " + host);
Set<TaskTracker> trackers = hostnameToTaskTracker.remove(host);
@@ -4248,11 +4265,14 @@ public class JobTracker implements MRCon
LOG.info("Decommission: Losing tracker " + tracker +
" on host " + host);
lostTaskTracker(tracker); // lose the tracker
- updateTaskTrackerStatus(tracker.getStatus().getTrackerName(), null);
+ updateTaskTrackerStatus(
+ tracker.getStatus().getTrackerName(), null);
}
+ trackersDecommissioned += trackers.size();
}
LOG.info("Host " + host + " is ready for decommissioning");
}
+ getInstrumentation().setDecommissionedTrackers(trackersDecommissioned);
}
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Fri Mar 4 03:34:25 2011
@@ -84,4 +84,80 @@ class JobTrackerInstrumentation {
public void decBlackListedReduceSlots(int slots)
{ }
+
+ public void addReservedMapSlots(int slots)
+ { }
+
+ public void decReservedMapSlots(int slots)
+ { }
+
+ public void addReservedReduceSlots(int slots)
+ { }
+
+ public void decReservedReduceSlots(int slots)
+ { }
+
+ public void addOccupiedMapSlots(int slots)
+ { }
+
+ public void decOccupiedMapSlots(int slots)
+ { }
+
+ public void addOccupiedReduceSlots(int slots)
+ { }
+
+ public void decOccupiedReduceSlots(int slots)
+ { }
+
+ public void failedJob(JobConf conf, JobID id)
+ { }
+
+ public void killedJob(JobConf conf, JobID id)
+ { }
+
+ public void addPrepJob(JobConf conf, JobID id)
+ { }
+
+ public void decPrepJob(JobConf conf, JobID id)
+ { }
+
+ public void addRunningJob(JobConf conf, JobID id)
+ { }
+
+ public void decRunningJob(JobConf conf, JobID id)
+ { }
+
+ public void addRunningMaps(JobID id, int task)
+ { }
+
+ public void decRunningMaps(JobID id, int task)
+ { }
+
+ public void addRunningReduces(JobID id, int task)
+ { }
+
+ public void decRunningReduces(JobID id, int task)
+ { }
+
+ public void killedMap(TaskAttemptID taskAttemptID)
+ { }
+
+ public void killedReduce(TaskAttemptID taskAttemptID)
+ { }
+
+ public void addTrackers(int trackers)
+ { }
+
+ public void decTrackers(int trackers)
+ { }
+
+ public void addBlackListedTrackers(int trackers)
+ { }
+
+ public void decBlackListedTrackers(int trackers)
+ { }
+
+ public void setDecommissionedTrackers(int trackers)
+ { }
+
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java Fri Mar 4 03:34:25 2011
@@ -22,8 +22,6 @@ import org.apache.hadoop.metrics.Metrics
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
-import org.apache.hadoop.metrics.util.MetricsRegistry;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater {
private final MetricsRecord metricsRecord;
@@ -45,6 +43,27 @@ class JobTrackerMetricsInst extends JobT
private int numBlackListedMapSlots = 0;
private int numBlackListedReduceSlots = 0;
+ private int numReservedMapSlots = 0;
+ private int numReservedReduceSlots = 0;
+ private int numOccupiedMapSlots = 0;
+ private int numOccupiedReduceSlots = 0;
+
+ private int numJobsFailed = 0;
+ private int numJobsKilled = 0;
+
+ private int numJobsPreparing = 0;
+ private int numJobsRunning = 0;
+
+ private int numRunningMaps = 0;
+ private int numRunningReduces = 0;
+
+ private int numMapTasksKilled = 0;
+ private int numReduceTasksKilled = 0;
+
+ private int numTrackers = 0;
+ private int numTrackersBlackListed = 0;
+ private int numTrackersDecommissioned = 0;
+
public JobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
super(tracker, conf);
String sessionId = conf.getSessionId();
@@ -78,6 +97,28 @@ class JobTrackerMetricsInst extends JobT
metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
+
+ metricsRecord.incrMetric("reserved_map_slots", numReservedMapSlots);
+ metricsRecord.incrMetric("reserved_reduce_slots", numReservedReduceSlots);
+ metricsRecord.incrMetric("occupied_map_slots", numOccupiedMapSlots);
+ metricsRecord.incrMetric("occupied_reduce_slots", numOccupiedReduceSlots);
+
+ metricsRecord.incrMetric("jobs_failed", numJobsFailed);
+ metricsRecord.incrMetric("jobs_killed", numJobsKilled);
+
+ metricsRecord.incrMetric("jobs_preparing", numJobsPreparing);
+ metricsRecord.incrMetric("jobs_running", numJobsRunning);
+
+ metricsRecord.incrMetric("running_maps", numRunningMaps);
+ metricsRecord.incrMetric("running_reduces", numRunningReduces);
+
+ metricsRecord.incrMetric("maps_killed", numMapTasksKilled);
+ metricsRecord.incrMetric("reduces_killed", numReduceTasksKilled);
+
+ metricsRecord.incrMetric("trackers", numTrackers);
+ metricsRecord.incrMetric("trackers_blacklisted", numTrackersBlackListed);
+ metricsRecord.setMetric("trackers_decommissioned",
+ numTrackersDecommissioned);
numMapTasksLaunched = 0;
numMapTasksCompleted = 0;
@@ -91,6 +132,26 @@ class JobTrackerMetricsInst extends JobT
numWaitingReduces = 0;
numBlackListedMapSlots = 0;
numBlackListedReduceSlots = 0;
+
+ numReservedMapSlots = 0;
+ numReservedReduceSlots = 0;
+ numOccupiedMapSlots = 0;
+ numOccupiedReduceSlots = 0;
+
+ numJobsFailed = 0;
+ numJobsKilled = 0;
+
+ numJobsPreparing = 0;
+ numJobsRunning = 0;
+
+ numRunningMaps = 0;
+ numRunningReduces = 0;
+
+ numMapTasksKilled = 0;
+ numReduceTasksKilled = 0;
+
+ numTrackers = 0;
+ numTrackersBlackListed = 0;
}
metricsRecord.update();
@@ -166,12 +227,12 @@ class JobTrackerMetricsInst extends JobT
}
@Override
- public void setMapSlots(int slots) {
+ public synchronized void setMapSlots(int slots) {
numMapSlots = slots;
}
@Override
- public void setReduceSlots(int slots) {
+ public synchronized void setReduceSlots(int slots) {
numReduceSlots = slots;
}
@@ -194,4 +255,154 @@ class JobTrackerMetricsInst extends JobT
public synchronized void decBlackListedReduceSlots(int slots){
numBlackListedReduceSlots -= slots;
}
+
+ @Override
+ public synchronized void addReservedMapSlots(int slots)
+ {
+ numReservedMapSlots += slots;
+ }
+
+ @Override
+ public synchronized void decReservedMapSlots(int slots)
+ {
+ numReservedMapSlots -= slots;
+ }
+
+ @Override
+ public synchronized void addReservedReduceSlots(int slots)
+ {
+ numReservedReduceSlots += slots;
+ }
+
+ @Override
+ public synchronized void decReservedReduceSlots(int slots)
+ {
+ numReservedReduceSlots -= slots;
+ }
+
+ @Override
+ public synchronized void addOccupiedMapSlots(int slots)
+ {
+ numOccupiedMapSlots += slots;
+ }
+
+ @Override
+ public synchronized void decOccupiedMapSlots(int slots)
+ {
+ numOccupiedMapSlots -= slots;
+ }
+
+ @Override
+ public synchronized void addOccupiedReduceSlots(int slots)
+ {
+ numOccupiedReduceSlots += slots;
+ }
+
+ @Override
+ public synchronized void decOccupiedReduceSlots(int slots)
+ {
+ numOccupiedReduceSlots -= slots;
+ }
+
+ @Override
+ public synchronized void failedJob(JobConf conf, JobID id)
+ {
+ numJobsFailed++;
+ }
+
+ @Override
+ public synchronized void killedJob(JobConf conf, JobID id)
+ {
+ numJobsKilled++;
+ }
+
+ @Override
+ public synchronized void addPrepJob(JobConf conf, JobID id)
+ {
+ numJobsPreparing++;
+ }
+
+ @Override
+ public synchronized void decPrepJob(JobConf conf, JobID id)
+ {
+ numJobsPreparing--;
+ }
+
+ @Override
+ public synchronized void addRunningJob(JobConf conf, JobID id)
+ {
+ numJobsRunning++;
+ }
+
+ @Override
+ public synchronized void decRunningJob(JobConf conf, JobID id)
+ {
+ numJobsRunning--;
+ }
+
+ @Override
+ public synchronized void addRunningMaps(JobID id, int task)
+ {
+ numRunningMaps += task;
+ }
+
+ @Override
+ public synchronized void decRunningMaps(JobID id, int task)
+ {
+ numRunningMaps -= task;
+ }
+
+ @Override
+ public synchronized void addRunningReduces(JobID id, int task)
+ {
+ numRunningReduces += task;
+ }
+
+ @Override
+ public synchronized void decRunningReduces(JobID id, int task)
+ {
+ numRunningReduces -= task;
+ }
+
+ @Override
+ public synchronized void killedMap(TaskAttemptID taskAttemptID)
+ {
+ numMapTasksKilled++;
+ }
+
+ @Override
+ public synchronized void killedReduce(TaskAttemptID taskAttemptID)
+ {
+ numReduceTasksKilled++;
+ }
+
+ @Override
+ public synchronized void addTrackers(int trackers)
+ {
+ numTrackers += trackers;
+ }
+
+ @Override
+ public synchronized void decTrackers(int trackers)
+ {
+ numTrackers -= trackers;
+ }
+
+ @Override
+ public synchronized void addBlackListedTrackers(int trackers)
+ {
+ numTrackersBlackListed += trackers;
+ }
+
+ @Override
+ public synchronized void decBlackListedTrackers(int trackers)
+ {
+ numTrackersBlackListed -= trackers;
+ }
+
+ @Override
+ public synchronized void setDecommissionedTrackers(int trackers)
+ {
+ numTrackersDecommissioned = trackers;
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java Fri Mar 4 03:34:25 2011
@@ -37,25 +37,93 @@ public class TestJobKillAndFail extends
public void testJobFailAndKill() throws IOException {
MiniMRCluster mr = null;
try {
- mr = new MiniMRCluster(2, "file:///", 3);
+ JobConf jtConf = new JobConf();
+ jtConf.set("mapred.jobtracker.instrumentation",
+ JTInstrumentation.class.getName());
+ mr = new MiniMRCluster(2, "file:///", 3, null, null, jtConf);
+ JTInstrumentation instr = (JTInstrumentation)
+ mr.getJobTrackerRunner().getJobTracker().getInstrumentation();
// run the TCs
JobConf conf = mr.createJobConf();
-
+
Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");
RunningJob job = UtilsForTests.runJobFail(conf, inDir, outDir);
// Checking that the Job got failed
assertEquals(job.getJobState(), JobStatus.FAILED);
+ assertTrue(instr.verifyJob());
+ assertEquals(1, instr.failed);
+ instr.reset();
+
job = UtilsForTests.runJobKill(conf, inDir, outDir);
// Checking that the Job got killed
assertTrue(job.isComplete());
assertEquals(job.getJobState(), JobStatus.KILLED);
+ assertTrue(instr.verifyJob());
+ assertEquals(1, instr.killed);
} finally {
if (mr != null) {
mr.shutdown();
}
}
}
+
+ static class JTInstrumentation extends JobTrackerInstrumentation {
+ volatile int failed;
+ volatile int killed;
+ volatile int addPrep;
+ volatile int decPrep;
+ volatile int addRunning;
+ volatile int decRunning;
+
+ void reset() {
+ failed = 0;
+ killed = 0;
+ addPrep = 0;
+ decPrep = 0;
+ addRunning = 0;
+ decRunning = 0;
+ }
+
+ boolean verifyJob() {
+ return addPrep==1 && decPrep==1 && addRunning==1 && decRunning==1;
+ }
+
+ public JTInstrumentation(JobTracker jt, JobConf conf) {
+ super(jt, conf);
+ }
+
+ public synchronized void addPrepJob(JobConf conf, JobID id)
+ {
+ addPrep++;
+ }
+
+ public synchronized void decPrepJob(JobConf conf, JobID id)
+ {
+ decPrep++;
+ }
+
+ public synchronized void addRunningJob(JobConf conf, JobID id)
+ {
+ addRunning++;
+ }
+
+ public synchronized void decRunningJob(JobConf conf, JobID id)
+ {
+ decRunning++;
+ }
+
+ public synchronized void failedJob(JobConf conf, JobID id)
+ {
+ failed++;
+ }
+
+ public synchronized void killedJob(JobConf conf, JobID id)
+ {
+ killed++;
+ }
+ }
+
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Fri Mar 4 03:34:25 2011
@@ -44,8 +44,9 @@ public class TestJobQueueTaskScheduler e
private FakeTaskTrackerManager taskTrackerManager;
public FakeJobInProgress(JobConf jobConf,
- FakeTaskTrackerManager taskTrackerManager) throws IOException {
- super(new JobID("test", ++jobCounter), jobConf);
+ FakeTaskTrackerManager taskTrackerManager, JobTracker jt)
+ throws IOException {
+ super(new JobID("test", ++jobCounter), jobConf, jt);
this.taskTrackerManager = taskTrackerManager;
this.startTime = System.currentTimeMillis();
this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
@@ -267,7 +268,8 @@ public class TestJobQueueTaskScheduler e
int numJobs, int state)
throws IOException {
for (int i = 0; i < numJobs; i++) {
- JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager);
+ JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager,
+ UtilsForTests.getJobTracker());
job.getStatus().setRunState(state);
taskTrackerManager.submitJob(job);
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java?rev=1077034&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java Fri Mar 4 03:34:25 2011
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.examples.SleepJob;
+
+@SuppressWarnings("deprecation")
+public class TestJobTrackerInstrumentation extends TestCase {
+
+ public void testSlots() throws IOException {
+ MiniMRCluster mr = null;
+ try {
+ JobConf jtConf = new JobConf();
+ jtConf.set("mapred.jobtracker.instrumentation",
+ MyJobTrackerMetricsInst.class.getName());
+ mr = new MiniMRCluster(2, "file:///", 3, null, null, jtConf);
+ MyJobTrackerMetricsInst instr = (MyJobTrackerMetricsInst)
+ mr.getJobTrackerRunner().getJobTracker().getInstrumentation();
+
+ JobConf conf = mr.createJobConf();
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+ int numMapTasks = 3;
+ int numReduceTasks = 2;
+ job.run(numMapTasks, numReduceTasks, 10000, 1, 10000, 1);
+
+ synchronized (instr) {
+ //after the job completes, incr and decr should be equal
+ assertEquals(instr.incrOccupiedMapSlots,
+ instr.decrOccupiedMapSlots);
+ assertEquals(instr.incrOccupiedReduceSlots,
+ instr.decrOccupiedReduceSlots);
+ assertEquals(instr.incrRunningMaps,
+ instr.decrRunningMaps);
+ assertEquals(instr.incrRunningReduces,
+ instr.decrRunningReduces);
+ assertEquals(instr.incrReservedMapSlots,
+ instr.decrReservedMapSlots);
+ assertEquals(instr.incrReservedReduceSlots,
+ instr.decrReservedReduceSlots);
+
+ //validate that atleast once the callbacks happened
+ assertTrue(instr.incrOccupiedMapSlots > 0);
+ assertTrue(instr.incrOccupiedReduceSlots > 0);
+ assertTrue(instr.incrRunningMaps > 0);
+ assertTrue(instr.incrRunningReduces > 0);
+ }
+ } finally {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+ }
+
+ static class MyJobTrackerMetricsInst extends JobTrackerInstrumentation {
+ public MyJobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
+ super(tracker, conf);
+ }
+
+ private int incrReservedMapSlots = 0;
+ private int decrReservedMapSlots = 0;
+ private int incrReservedReduceSlots = 0;
+ private int decrReservedReduceSlots = 0;
+ private int incrOccupiedMapSlots = 0;
+ private int decrOccupiedMapSlots = 0;
+ private int incrOccupiedReduceSlots = 0;
+ private int decrOccupiedReduceSlots = 0;
+ private int incrRunningMaps = 0;
+ private int decrRunningMaps = 0;
+ private int incrRunningReduces = 0;
+ private int decrRunningReduces = 0;
+
+ @Override
+ public synchronized void addReservedMapSlots(int slots)
+ {
+ incrReservedMapSlots += slots;
+ }
+
+ @Override
+ public synchronized void decReservedMapSlots(int slots)
+ {
+ decrReservedMapSlots += slots;
+ }
+
+ @Override
+ public synchronized void addReservedReduceSlots(int slots)
+ {
+ incrReservedReduceSlots += slots;
+ }
+
+ @Override
+ public synchronized void decReservedReduceSlots(int slots)
+ {
+ decrReservedReduceSlots += slots;
+ }
+
+ @Override
+ public synchronized void addOccupiedMapSlots(int slots)
+ {
+ incrOccupiedMapSlots += slots;
+ }
+
+ @Override
+ public synchronized void decOccupiedMapSlots(int slots)
+ {
+ decrOccupiedMapSlots += slots;
+ }
+
+ @Override
+ public synchronized void addOccupiedReduceSlots(int slots)
+ {
+ incrOccupiedReduceSlots += slots;
+ }
+
+ @Override
+ public synchronized void decOccupiedReduceSlots(int slots)
+ {
+ decrOccupiedReduceSlots += slots;
+ }
+
+ @Override
+ public synchronized void addRunningMaps(JobID id, int task)
+ {
+ incrRunningMaps += task;
+ }
+
+ @Override
+ public synchronized void decRunningMaps(JobID id, int task)
+ {
+ decrRunningMaps += task;
+ }
+
+ @Override
+ public synchronized void addRunningReduces(JobID id, int task)
+ {
+ incrRunningReduces += task;
+ }
+
+ @Override
+ public synchronized void decRunningReduces(JobID id, int task)
+ {
+ decrRunningReduces += task;
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java Fri Mar 4 03:34:25 2011
@@ -43,8 +43,10 @@ public class TestParallelInitialization
class FakeJobInProgress extends JobInProgress {
public FakeJobInProgress(JobConf jobConf,
- FakeTaskTrackerManager taskTrackerManager) throws IOException {
- super(new JobID("test", ++jobCounter), jobConf);
+ FakeTaskTrackerManager taskTrackerManager,
+ JobTracker jt) throws IOException {
+ super(new JobID("test", ++jobCounter), jobConf,
+ jt);
this.startTime = System.currentTimeMillis();
this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
this.status.setJobPriority(JobPriority.NORMAL);
@@ -213,7 +215,8 @@ public class TestParallelInitialization
// will be inited first and that will hang
for (int i = 0; i < NUM_JOBS; i++) {
- jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager);
+ jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager,
+ UtilsForTests.getJobTracker());
jobs[i].getStatus().setRunState(JobStatus.PREP);
taskTrackerManager.submitJob(jobs[i]);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java Fri Mar 4 03:34:25 2011
@@ -32,7 +32,8 @@ public class TestResourceEstimation exte
jc.setNumMapTasks(maps);
jc.setNumReduceTasks(reduces);
- JobInProgress jip = new JobInProgress(jid, jc);
+ JobInProgress jip = new JobInProgress(jid, jc,
+ UtilsForTests.getJobTracker());
//unfortunately, we can't set job input size from here.
ResourceEstimator re = new ResourceEstimator(jip);
@@ -65,7 +66,8 @@ public class TestResourceEstimation exte
jc.setNumMapTasks(maps);
jc.setNumReduceTasks(reduces);
- JobInProgress jip = new JobInProgress(jid, jc) {
+ JobInProgress jip = new JobInProgress(jid, jc,
+ UtilsForTests.getJobTracker()) {
long getInputLength() {
return singleMapInputSize*desiredMaps();
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Mar 4 03:34:25 2011
@@ -731,6 +731,18 @@ public class UtilsForTests {
}
return pid;
}
-
+
+ static JobTracker getJobTracker() {
+ JobConf conf = new JobConf();
+ conf.set("mapred.job.tracker", "localhost:0");
+ conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+ JobTracker jt;
+ try {
+ jt = new JobTracker(conf);
+ return jt;
+ } catch (Exception e) {
+ throw new RuntimeException("Could not start jt", e);
+ }
+ }
}