You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dh...@apache.org on 2010/07/16 01:30:53 UTC
svn commit: r964640 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Author: dhruba
Date: Thu Jul 15 23:30:53 2010
New Revision: 964640
URL: http://svn.apache.org/viewvc?rev=964640&view=rev
Log:
MAPREDUCE-1848. Put number of speculative, data local, rack local
tasks in JobTracker metrics. (Scott Chen via dhruba)
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=964640&r1=964639&r2=964640&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul 15 23:30:53 2010
@@ -80,6 +80,9 @@ Trunk (unreleased changes)
Also includes compatibility with security enhancements, and scalability
improvements. (Amar Kamat, Rahul Singh, Hong Tang, and cdouglas)
+ MAPREDUCE-1848. Put number of speculative, data local, rack local
+ tasks in JobTracker metrics. (Scott Chen via dhruba)
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=964640&r1=964639&r2=964640&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Jul 15 23:30:53 2010
@@ -1640,6 +1640,7 @@ public class JobInProgress {
splits = tip.getSplitNodes();
if (tip.isSpeculating()) {
speculativeMapTasks++;
+ metrics.speculateMap(id);
if (LOG.isDebugEnabled()) {
LOG.debug("Chosen speculative task, current speculativeMap task count: "
+ speculativeMapTasks);
@@ -1652,6 +1653,7 @@ public class JobInProgress {
counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
if (tip.isSpeculating()) {
speculativeReduceTasks++;
+ metrics.speculateReduce(id);
if (LOG.isDebugEnabled()) {
LOG.debug("Chosen speculative task, current speculativeReduce task count: "
+ speculativeReduceTasks);
@@ -1691,10 +1693,12 @@ public class JobInProgress {
case 0 :
LOG.info("Choosing data-local task " + tip.getTIPId());
jobCounters.incrCounter(JobCounter.DATA_LOCAL_MAPS, 1);
+ metrics.launchDataLocalMap(id);
break;
case 1:
LOG.info("Choosing rack-local task " + tip.getTIPId());
jobCounters.incrCounter(JobCounter.RACK_LOCAL_MAPS, 1);
+ metrics.launchRackLocalMap(id);
break;
default :
// check if there is any locality
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=964640&r1=964639&r2=964640&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Thu Jul 15 23:30:53 2010
@@ -162,4 +162,16 @@ class JobTrackerInstrumentation {
public void heartbeat() {
}
+
+ public void speculateMap(TaskAttemptID taskAttemptID)
+ { }
+
+ public void speculateReduce(TaskAttemptID taskAttemptID)
+ { }
+
+ public void launchDataLocalMap(TaskAttemptID taskAttemptID)
+ { }
+
+ public void launchRackLocalMap(TaskAttemptID taskAttemptID)
+ { }
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=964640&r1=964639&r2=964640&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java Thu Jul 15 23:30:53 2010
@@ -36,6 +36,10 @@ class JobTrackerMetricsInst extends JobT
private int numJobsCompleted = 0;
private int numWaitingMaps = 0;
private int numWaitingReduces = 0;
+ private int numSpeculativeMaps = 0;
+ private int numSpeculativeReduces = 0;
+ private int numDataLocalMaps = 0;
+ private int numRackLocalMaps = 0;
//Cluster status fields.
private volatile int numMapSlots = 0;
@@ -101,6 +105,10 @@ class JobTrackerMetricsInst extends JobT
metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
+ metricsRecord.incrMetric("speculative_maps", numSpeculativeMaps);
+ metricsRecord.incrMetric("speculative_reduces", numSpeculativeReduces);
+ metricsRecord.incrMetric("datalocal_maps", numDataLocalMaps);
+ metricsRecord.incrMetric("racklocal_maps", numRackLocalMaps);
metricsRecord.incrMetric("reserved_map_slots", numReservedMapSlots);
metricsRecord.incrMetric("reserved_reduce_slots", numReservedReduceSlots);
@@ -138,6 +146,10 @@ class JobTrackerMetricsInst extends JobT
numWaitingReduces = 0;
numBlackListedMapSlots = 0;
numBlackListedReduceSlots = 0;
+ numSpeculativeMaps = 0;
+ numSpeculativeReduces = 0;
+ numDataLocalMaps = 0;
+ numRackLocalMaps = 0;
numReservedMapSlots = 0;
numReservedReduceSlots = 0;
@@ -171,6 +183,16 @@ class JobTrackerMetricsInst extends JobT
}
@Override
+ public synchronized void launchDataLocalMap(TaskAttemptID taskAttemptID) {
+ ++numDataLocalMaps;
+ }
+
+ @Override
+ public synchronized void launchRackLocalMap(TaskAttemptID taskAttemptID) {
+ ++numRackLocalMaps;
+ }
+
+ @Override
public synchronized void completeMap(TaskAttemptID taskAttemptID) {
++numMapTasksCompleted;
}
@@ -182,6 +204,11 @@ class JobTrackerMetricsInst extends JobT
}
@Override
+ public synchronized void speculateMap(TaskAttemptID taskAttemptID) {
+ ++numSpeculativeMaps;
+ }
+
+ @Override
public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
++numReduceTasksLaunched;
decWaitingReduces(taskAttemptID.getJobID(), 1);
@@ -199,6 +226,11 @@ class JobTrackerMetricsInst extends JobT
}
@Override
+ public synchronized void speculateReduce(TaskAttemptID taskAttemptID) {
+ ++numSpeculativeReduces;
+ }
+
+ @Override
public synchronized void submitJob(JobConf conf, JobID id) {
++numJobsSubmitted;
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=964640&r1=964639&r2=964640&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java Thu Jul 15 23:30:53 2010
@@ -298,4 +298,324 @@ public class FakeObjectUtilities {
@Override
public void closeWriter(org.apache.hadoop.mapreduce.JobID id) { }
}
+
+ static class FakeJobTrackerMetricsInst extends JobTrackerInstrumentation {
+ public FakeJobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
+ super(tracker, conf);
+ }
+
+ int numMapTasksLaunched = 0;
+ int numMapTasksCompleted = 0;
+ int numMapTasksFailed = 0;
+ int numReduceTasksLaunched = 0;
+ int numReduceTasksCompleted = 0;
+ int numReduceTasksFailed = 0;
+ int numJobsSubmitted = 0;
+ int numJobsCompleted = 0;
+ int numWaitingMaps = 0;
+ int numWaitingReduces = 0;
+ int numSpeculativeMaps = 0;
+ int numSpeculativeReduces = 0;
+ int numDataLocalMaps = 0;
+ int numRackLocalMaps = 0;
+
+ //Cluster status fields.
+ volatile int numMapSlots = 0;
+ volatile int numReduceSlots = 0;
+ int numBlackListedMapSlots = 0;
+ int numBlackListedReduceSlots = 0;
+
+ int numReservedMapSlots = 0;
+ int numReservedReduceSlots = 0;
+ int numOccupiedMapSlots = 0;
+ int numOccupiedReduceSlots = 0;
+
+ int numJobsFailed = 0;
+ int numJobsKilled = 0;
+
+ int numJobsPreparing = 0;
+ int numJobsRunning = 0;
+
+ int numRunningMaps = 0;
+ int numRunningReduces = 0;
+
+ int numMapTasksKilled = 0;
+ int numReduceTasksKilled = 0;
+
+ int numTrackers = 0;
+ int numTrackersBlackListed = 0;
+
+ int numTrackersDecommissioned = 0;
+
+ long numHeartbeats = 0;
+
+ @Override
+ public synchronized void launchMap(TaskAttemptID taskAttemptID) {
+ ++numMapTasksLaunched;
+ decWaitingMaps(taskAttemptID.getJobID(), 1);
+ }
+
+ @Override
+ public synchronized void completeMap(TaskAttemptID taskAttemptID) {
+ ++numMapTasksCompleted;
+ }
+
+ @Override
+ public synchronized void failedMap(TaskAttemptID taskAttemptID) {
+ ++numMapTasksFailed;
+ addWaitingMaps(taskAttemptID.getJobID(), 1);
+ }
+
+ @Override
+ public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
+ ++numReduceTasksLaunched;
+ decWaitingReduces(taskAttemptID.getJobID(), 1);
+ }
+
+ @Override
+ public synchronized void completeReduce(TaskAttemptID taskAttemptID) {
+ ++numReduceTasksCompleted;
+ }
+
+ @Override
+ public synchronized void failedReduce(TaskAttemptID taskAttemptID) {
+ ++numReduceTasksFailed;
+ addWaitingReduces(taskAttemptID.getJobID(), 1);
+ }
+
+ @Override
+ public synchronized void submitJob(JobConf conf, JobID id) {
+ ++numJobsSubmitted;
+ }
+
+ @Override
+ public synchronized void completeJob(JobConf conf, JobID id) {
+ ++numJobsCompleted;
+ }
+
+ @Override
+ public synchronized void addWaitingMaps(JobID id, int task) {
+ numWaitingMaps += task;
+ }
+
+ @Override
+ public synchronized void decWaitingMaps(JobID id, int task) {
+ numWaitingMaps -= task;
+ }
+
+ @Override
+ public synchronized void addWaitingReduces(JobID id, int task) {
+ numWaitingReduces += task;
+ }
+
+ @Override
+ public synchronized void decWaitingReduces(JobID id, int task){
+ numWaitingReduces -= task;
+ }
+
+ @Override
+ public void setMapSlots(int slots) {
+ numMapSlots = slots;
+ }
+
+ @Override
+ public void setReduceSlots(int slots) {
+ numReduceSlots = slots;
+ }
+
+ @Override
+ public synchronized void addBlackListedMapSlots(int slots){
+ numBlackListedMapSlots += slots;
+ }
+
+ @Override
+ public synchronized void decBlackListedMapSlots(int slots){
+ numBlackListedMapSlots -= slots;
+ }
+
+ @Override
+ public synchronized void addBlackListedReduceSlots(int slots){
+ numBlackListedReduceSlots += slots;
+ }
+
+ @Override
+ public synchronized void decBlackListedReduceSlots(int slots){
+ numBlackListedReduceSlots -= slots;
+ }
+
+ @Override
+ public synchronized void addReservedMapSlots(int slots)
+ {
+ numReservedMapSlots += slots;
+ }
+
+ @Override
+ public synchronized void decReservedMapSlots(int slots)
+ {
+ numReservedMapSlots -= slots;
+ }
+
+ @Override
+ public synchronized void addReservedReduceSlots(int slots)
+ {
+ numReservedReduceSlots += slots;
+ }
+
+ @Override
+ public synchronized void decReservedReduceSlots(int slots)
+ {
+ numReservedReduceSlots -= slots;
+ }
+
+ @Override
+ public synchronized void addOccupiedMapSlots(int slots)
+ {
+ numOccupiedMapSlots += slots;
+ }
+
+ @Override
+ public synchronized void decOccupiedMapSlots(int slots)
+ {
+ numOccupiedMapSlots -= slots;
+ }
+
+ @Override
+ public synchronized void addOccupiedReduceSlots(int slots)
+ {
+ numOccupiedReduceSlots += slots;
+ }
+
+ @Override
+ public synchronized void decOccupiedReduceSlots(int slots)
+ {
+ numOccupiedReduceSlots -= slots;
+ }
+
+ @Override
+ public synchronized void failedJob(JobConf conf, JobID id)
+ {
+ numJobsFailed++;
+ }
+
+ @Override
+ public synchronized void killedJob(JobConf conf, JobID id)
+ {
+ numJobsKilled++;
+ }
+
+ @Override
+ public synchronized void addPrepJob(JobConf conf, JobID id)
+ {
+ numJobsPreparing++;
+ }
+
+ @Override
+ public synchronized void decPrepJob(JobConf conf, JobID id)
+ {
+ numJobsPreparing--;
+ }
+
+ @Override
+ public synchronized void addRunningJob(JobConf conf, JobID id)
+ {
+ numJobsRunning++;
+ }
+
+ @Override
+ public synchronized void decRunningJob(JobConf conf, JobID id)
+ {
+ numJobsRunning--;
+ }
+
+ @Override
+ public synchronized void addRunningMaps(int task)
+ {
+ numRunningMaps += task;
+ }
+
+ @Override
+ public synchronized void decRunningMaps(int task)
+ {
+ numRunningMaps -= task;
+ }
+
+ @Override
+ public synchronized void addRunningReduces(int task)
+ {
+ numRunningReduces += task;
+ }
+
+ @Override
+ public synchronized void decRunningReduces(int task)
+ {
+ numRunningReduces -= task;
+ }
+
+ @Override
+ public synchronized void killedMap(TaskAttemptID taskAttemptID)
+ {
+ numMapTasksKilled++;
+ }
+
+ @Override
+ public synchronized void killedReduce(TaskAttemptID taskAttemptID)
+ {
+ numReduceTasksKilled++;
+ }
+
+ @Override
+ public synchronized void addTrackers(int trackers)
+ {
+ numTrackers += trackers;
+ }
+
+ @Override
+ public synchronized void decTrackers(int trackers)
+ {
+ numTrackers -= trackers;
+ }
+
+ @Override
+ public synchronized void addBlackListedTrackers(int trackers)
+ {
+ numTrackersBlackListed += trackers;
+ }
+
+ @Override
+ public synchronized void decBlackListedTrackers(int trackers)
+ {
+ numTrackersBlackListed -= trackers;
+ }
+
+ @Override
+ public synchronized void setDecommissionedTrackers(int trackers)
+ {
+ numTrackersDecommissioned = trackers;
+ }
+
+ @Override
+ public synchronized void heartbeat() {
+ ++numHeartbeats;
+ }
+
+ @Override
+ public synchronized void speculateReduce(TaskAttemptID taskAttemptID) {
+ ++numSpeculativeReduces;
+ }
+
+ @Override
+ public synchronized void speculateMap(TaskAttemptID taskAttemptID) {
+ ++numSpeculativeMaps;
+ }
+
+ @Override
+ public synchronized void launchDataLocalMap(TaskAttemptID taskAttemptID) {
+ ++numDataLocalMaps;
+ }
+
+ @Override
+ public synchronized void launchRackLocalMap(TaskAttemptID taskAttemptID) {
+ ++numRackLocalMaps;
+ }
+ }
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java?rev=964640&r1=964639&r2=964640&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java Thu Jul 15 23:30:53 2010
@@ -28,6 +28,7 @@ import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTrackerMetricsInst;
import org.apache.hadoop.mapred.TestTaskTrackerBlacklisting.FakeJobTracker;
import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
import org.apache.hadoop.mapreduce.TaskType;
@@ -55,7 +56,7 @@ public class TestJobTrackerInstrumentati
private static int numMapSlotsToReserve = 2;
private static int numReduceSlotsToReserve = 2;
- private static MyJobTrackerMetricsInst mi;
+ private static FakeJobTrackerMetricsInst mi;
@@ -71,9 +72,9 @@ public class TestJobTrackerInstrumentati
FakeTaskScheduler.class, TaskScheduler.class);
conf.set(JTConfig.JT_INSTRUMENTATION,
- MyJobTrackerMetricsInst.class.getName());
+ FakeJobTrackerMetricsInst.class.getName());
jobTracker = new FakeJobTracker(conf, new FakeClock(), trackers);
- mi = (MyJobTrackerMetricsInst) jobTracker.getInstrumentation();
+ mi = (FakeJobTrackerMetricsInst) jobTracker.getInstrumentation();
for (String tracker : trackers) {
FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
}
@@ -390,300 +391,4 @@ public class TestJobTrackerInstrumentati
numReduceTasks);
}
}
-
- static class MyJobTrackerMetricsInst extends JobTrackerInstrumentation {
- public MyJobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
- super(tracker, conf);
- }
-
- private int numMapTasksLaunched = 0;
- private int numMapTasksCompleted = 0;
- private int numMapTasksFailed = 0;
- private int numReduceTasksLaunched = 0;
- private int numReduceTasksCompleted = 0;
- private int numReduceTasksFailed = 0;
- private int numJobsSubmitted = 0;
- private int numJobsCompleted = 0;
- private int numWaitingMaps = 0;
- private int numWaitingReduces = 0;
-
- //Cluster status fields.
- private volatile int numMapSlots = 0;
- private volatile int numReduceSlots = 0;
- private int numBlackListedMapSlots = 0;
- private int numBlackListedReduceSlots = 0;
-
- private int numReservedMapSlots = 0;
- private int numReservedReduceSlots = 0;
- private int numOccupiedMapSlots = 0;
- private int numOccupiedReduceSlots = 0;
-
- private int numJobsFailed = 0;
- private int numJobsKilled = 0;
-
- private int numJobsPreparing = 0;
- private int numJobsRunning = 0;
-
- private int numRunningMaps = 0;
- private int numRunningReduces = 0;
-
- private int numMapTasksKilled = 0;
- private int numReduceTasksKilled = 0;
-
- private int numTrackers = 0;
- private int numTrackersBlackListed = 0;
-
- private int numTrackersDecommissioned = 0;
-
- private long numHeartbeats = 0;
-
- @Override
- public synchronized void launchMap(TaskAttemptID taskAttemptID) {
- ++numMapTasksLaunched;
- decWaitingMaps(taskAttemptID.getJobID(), 1);
- }
-
- @Override
- public synchronized void completeMap(TaskAttemptID taskAttemptID) {
- ++numMapTasksCompleted;
- }
-
- @Override
- public synchronized void failedMap(TaskAttemptID taskAttemptID) {
- ++numMapTasksFailed;
- addWaitingMaps(taskAttemptID.getJobID(), 1);
- }
-
- @Override
- public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
- ++numReduceTasksLaunched;
- decWaitingReduces(taskAttemptID.getJobID(), 1);
- }
-
- @Override
- public synchronized void completeReduce(TaskAttemptID taskAttemptID) {
- ++numReduceTasksCompleted;
- }
-
- @Override
- public synchronized void failedReduce(TaskAttemptID taskAttemptID) {
- ++numReduceTasksFailed;
- addWaitingReduces(taskAttemptID.getJobID(), 1);
- }
-
- @Override
- public synchronized void submitJob(JobConf conf, JobID id) {
- ++numJobsSubmitted;
- }
-
- @Override
- public synchronized void completeJob(JobConf conf, JobID id) {
- ++numJobsCompleted;
- }
-
- @Override
- public synchronized void addWaitingMaps(JobID id, int task) {
- numWaitingMaps += task;
- }
-
- @Override
- public synchronized void decWaitingMaps(JobID id, int task) {
- numWaitingMaps -= task;
- }
-
- @Override
- public synchronized void addWaitingReduces(JobID id, int task) {
- numWaitingReduces += task;
- }
-
- @Override
- public synchronized void decWaitingReduces(JobID id, int task){
- numWaitingReduces -= task;
- }
-
- @Override
- public void setMapSlots(int slots) {
- numMapSlots = slots;
- }
-
- @Override
- public void setReduceSlots(int slots) {
- numReduceSlots = slots;
- }
-
- @Override
- public synchronized void addBlackListedMapSlots(int slots){
- numBlackListedMapSlots += slots;
- }
-
- @Override
- public synchronized void decBlackListedMapSlots(int slots){
- numBlackListedMapSlots -= slots;
- }
-
- @Override
- public synchronized void addBlackListedReduceSlots(int slots){
- numBlackListedReduceSlots += slots;
- }
-
- @Override
- public synchronized void decBlackListedReduceSlots(int slots){
- numBlackListedReduceSlots -= slots;
- }
-
- @Override
- public synchronized void addReservedMapSlots(int slots)
- {
- numReservedMapSlots += slots;
- }
-
- @Override
- public synchronized void decReservedMapSlots(int slots)
- {
- numReservedMapSlots -= slots;
- }
-
- @Override
- public synchronized void addReservedReduceSlots(int slots)
- {
- numReservedReduceSlots += slots;
- }
-
- @Override
- public synchronized void decReservedReduceSlots(int slots)
- {
- numReservedReduceSlots -= slots;
- }
-
- @Override
- public synchronized void addOccupiedMapSlots(int slots)
- {
- numOccupiedMapSlots += slots;
- }
-
- @Override
- public synchronized void decOccupiedMapSlots(int slots)
- {
- numOccupiedMapSlots -= slots;
- }
-
- @Override
- public synchronized void addOccupiedReduceSlots(int slots)
- {
- numOccupiedReduceSlots += slots;
- }
-
- @Override
- public synchronized void decOccupiedReduceSlots(int slots)
- {
- numOccupiedReduceSlots -= slots;
- }
-
- @Override
- public synchronized void failedJob(JobConf conf, JobID id)
- {
- numJobsFailed++;
- }
-
- @Override
- public synchronized void killedJob(JobConf conf, JobID id)
- {
- numJobsKilled++;
- }
-
- @Override
- public synchronized void addPrepJob(JobConf conf, JobID id)
- {
- numJobsPreparing++;
- }
-
- @Override
- public synchronized void decPrepJob(JobConf conf, JobID id)
- {
- numJobsPreparing--;
- }
-
- @Override
- public synchronized void addRunningJob(JobConf conf, JobID id)
- {
- numJobsRunning++;
- }
-
- @Override
- public synchronized void decRunningJob(JobConf conf, JobID id)
- {
- numJobsRunning--;
- }
-
- @Override
- public synchronized void addRunningMaps(int task)
- {
- numRunningMaps += task;
- }
-
- @Override
- public synchronized void decRunningMaps(int task)
- {
- numRunningMaps -= task;
- }
-
- @Override
- public synchronized void addRunningReduces(int task)
- {
- numRunningReduces += task;
- }
-
- @Override
- public synchronized void decRunningReduces(int task)
- {
- numRunningReduces -= task;
- }
-
- @Override
- public synchronized void killedMap(TaskAttemptID taskAttemptID)
- {
- numMapTasksKilled++;
- }
-
- @Override
- public synchronized void killedReduce(TaskAttemptID taskAttemptID)
- {
- numReduceTasksKilled++;
- }
-
- @Override
- public synchronized void addTrackers(int trackers)
- {
- numTrackers += trackers;
- }
-
- @Override
- public synchronized void decTrackers(int trackers)
- {
- numTrackers -= trackers;
- }
-
- @Override
- public synchronized void addBlackListedTrackers(int trackers)
- {
- numTrackersBlackListed += trackers;
- }
-
- @Override
- public synchronized void decBlackListedTrackers(int trackers)
- {
- numTrackersBlackListed -= trackers;
- }
-
- @Override
- public synchronized void setDecommissionedTrackers(int trackers)
- {
- numTrackersDecommissioned = trackers;
- }
-
- @Override
- public synchronized void heartbeat() {
- ++numHeartbeats;
- }
- }
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=964640&r1=964639&r2=964640&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Thu Jul 15 23:30:53 2010
@@ -28,6 +28,7 @@ import junit.framework.TestSuite;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTrackerMetricsInst;
import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobCounter;
@@ -37,6 +38,7 @@ import org.apache.hadoop.mapreduce.split
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.StaticMapping;
+import org.mortbay.log.Log;
/**
* A JUnit test to test configured task limits.
@@ -57,6 +59,7 @@ public class TestRackAwareTaskPlacement
static FakeJobTracker jobTracker;
static String jtIdentifier = "test";
private static int jobCounter;
+ private static FakeJobTrackerMetricsInst fakeInst;
public static Test suite() {
TestSetup setup =
@@ -67,7 +70,10 @@ public class TestRackAwareTaskPlacement
conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
conf.setClass("topology.node.switch.mapping.impl",
StaticMapping.class, DNSToSwitchMapping.class);
+ conf.set(JTConfig.JT_INSTRUMENTATION,
+ FakeJobTrackerMetricsInst.class.getName());
jobTracker = new FakeJobTracker(conf, new FakeClock(), trackers);
+ fakeInst = (FakeJobTrackerMetricsInst) jobTracker.getInstrumentation();
// Set up the Topology Information
for (int i = 0; i < allHosts.length; i++) {
StaticMapping.addNodeToRack(allHosts[i], allRacks[i]);
@@ -169,6 +175,9 @@ public class TestRackAwareTaskPlacement
assertEquals("Number of Other-local maps", 0,
counters.getCounter(JobCounter.OTHER_LOCAL_MAPS));
+ // Also verify jobtracker instrumentation
+ assertEquals("Number of data local maps", 3, fakeInst.numDataLocalMaps);
+ assertEquals("Number of rack local maps", 1, fakeInst.numRackLocalMaps);
}
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java?rev=964640&r1=964639&r2=964640&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java Thu Jul 15 23:30:53 2010
@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTrackerMetricsInst;
import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@@ -43,6 +44,7 @@ public class TestSpeculativeExecution ex
};
static SpecFakeClock clock;
static final Log LOG = LogFactory.getLog(TestSpeculativeExecution.class);
+ private static FakeJobTrackerMetricsInst fakeInst;
static String trackers[] = new String[] {"tracker_tracker1:1000",
@@ -56,8 +58,11 @@ public class TestSpeculativeExecution ex
JobConf conf = new JobConf();
conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
+ conf.set(JTConfig.JT_INSTRUMENTATION,
+ FakeJobTrackerMetricsInst.class.getName());
jobTracker = new FakeJobTracker(conf, (clock = new SpecFakeClock()),
trackers);
+ fakeInst = (FakeJobTrackerMetricsInst) jobTracker.getInstrumentation();
for (String tracker : trackers) {
FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
}
@@ -127,6 +132,12 @@ public class TestSpeculativeExecution ex
"Running reduces count should be updated from " + oldRunningReduces +
" to " + (oldRunningReduces - 1), job.runningReduces(),
oldRunningReduces - 1);
+ // Verify total speculative tasks by jobtracker instrumentation
+ assertEquals("Total speculative maps", 1, fakeInst.numSpeculativeMaps);
+ assertEquals("Total speculative reduces", 1,
+ fakeInst.numSpeculativeReduces);
+ LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
+ LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
job.finishTask(taskAttemptID[7]);
}
@@ -168,6 +179,12 @@ public class TestSpeculativeExecution ex
job.finishTask(taskAttemptID[8]);
assertEquals("Tracker "+ trackers[2] + " expected to be slow ",
job.isSlowTracker(trackers[2]), true);
+ // Verify total speculative tasks by jobtracker instrumentation
+ assertEquals("Total speculative maps", 1, fakeInst.numSpeculativeMaps);
+ assertEquals("Total speculative reduces", 1,
+ fakeInst.numSpeculativeReduces);
+ LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
+ LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
}
public void testTaskToSpeculate() throws IOException {
@@ -200,6 +217,12 @@ public class TestSpeculativeExecution ex
taskAttemptID[5] = job.findReduceTask(trackers[4]);
assertEquals(taskAttemptID[5].getTaskID().getId(),3);
+ // Verify total speculative tasks by jobtracker instrumentation
+ assertEquals("Total speculative maps", 1, fakeInst.numSpeculativeMaps);
+ assertEquals("Total speculative reduces", 3,
+ fakeInst.numSpeculativeReduces);
+ LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
+ LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
}
/*
@@ -236,6 +259,12 @@ public class TestSpeculativeExecution ex
job.progressMade(taskAttemptID[4], 0.20f);
taskAttemptID[5] = job.findMapTask(trackers[4]);
assertEquals(taskAttemptID[5].getTaskID().getId(),4);
+ // Verify total speculative tasks by jobtracker instrumentation
+ assertEquals("Total speculative maps", 2, fakeInst.numSpeculativeMaps);
+ assertEquals("Total speculative reduces", 3,
+ fakeInst.numSpeculativeReduces);
+ LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
+ LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
}
/*
@@ -254,6 +283,12 @@ public class TestSpeculativeExecution ex
assertEquals(speculativeCap(1200,1150,20), 10);
//Tests the fact that the max tasks launched is 0.01 * #slots
assertEquals(speculativeCap(1200,1150,4000), 20);
+ // Verify total speculative tasks by jobtracker instrumentation
+ assertEquals("Total speculative maps", 72, fakeInst.numSpeculativeMaps);
+ assertEquals("Total speculative reduces", 3,
+ fakeInst.numSpeculativeReduces);
+ LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
+ LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
}
private int speculativeCap(int totalTasks, int numEarlyComplete, int slots)