You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/05/26 04:46:12 UTC

svn commit: r1127764 - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/ mr...

Author: mahadev
Date: Thu May 26 02:46:11 2011
New Revision: 1127764

URL: http://svn.apache.org/viewvc?rev=1127764&view=rev
Log:
MAPREDUCE-2527. Metrics for MRAppMaster (Luke lu via mahadev)

Added:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/metrics/
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/metrics/MRAppMetrics.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/metrics/
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/metrics/TestMRAppMetrics.java
Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptCompletedEvent.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Thu May 26 02:46:11 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   MAPREDUCE-279
 
+    MAPREDUCE-2527. Metrics for MRAppMaster (Luke lu via mahadev)
+
     Add debug config for delaying delete of local files. (cdouglas)
 
     Fixing race condition leader to hung jobs in scheduler negotiator (mahadev)

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu May 26 02:46:11 2011
@@ -58,6 +58,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
 import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
 import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
@@ -67,6 +68,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -109,6 +111,7 @@ public class MRAppMaster extends Composi
   private Clock clock;
   private final int startCount;
   private final ApplicationId appID;
+  protected final MRAppMetrics metrics;
   private Set<TaskId> completedTasksFromPreviousRun;
   private AppContext context;
   private Dispatcher dispatcher;
@@ -132,6 +135,7 @@ public class MRAppMaster extends Composi
     this.clock = clock;
     this.appID = applicationId;
     this.startCount = startCount;
+    this.metrics = MRAppMetrics.create();
     LOG.info("Created MRAppMaster for application " + applicationId);
   }
 
@@ -269,8 +273,8 @@ public class MRAppMaster extends Composi
 
     // create single job
     Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(),
-                      taskAttemptListener, jobTokenSecretManager, fsTokens, 
-                      clock, startCount, completedTasksFromPreviousRun);
+        taskAttemptListener, jobTokenSecretManager, fsTokens, clock, startCount,
+        completedTasksFromPreviousRun, metrics);
     ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
 
     dispatcher.register(JobFinishEvent.Type.class,
@@ -467,6 +471,10 @@ public class MRAppMaster extends Composi
 
   @Override
   public void start() {
+    // metrics system init is really init & start.
+    // It's more test friendly to put it here.
+    DefaultMetricsSystem.initialize("MRAppMaster");
+
     startJobs();
     //start all the components
     super.start();

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Thu May 26 02:46:11 2011
@@ -33,7 +33,7 @@ public enum JobEventType {
   //Producer:Task
   JOB_TASK_COMPLETED,
   JOB_MAP_TASK_RESCHEDULED,
-  JOB_TASK_ATTEMPT_COMPLETED_EVENT,  // why "_EVENT" only on this one?
+  JOB_TASK_ATTEMPT_COMPLETED,
 
   //Producer:Any component
   JOB_DIAGNOSTIC_UPDATE,

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptCompletedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptCompletedEvent.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptCompletedEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptCompletedEvent.java Thu May 26 02:46:11 2011
@@ -27,7 +27,7 @@ public class JobTaskAttemptCompletedEven
 
   public JobTaskAttemptCompletedEvent(TaskAttemptCompletionEvent completionEvent) {
     super(completionEvent.getAttemptId().getTaskId().getJobId(), 
-        JobEventType.JOB_TASK_ATTEMPT_COMPLETED_EVENT);
+        JobEventType.JOB_TASK_ATTEMPT_COMPLETED);
     this.completionEvent = completionEvent;
   }
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Thu May 26 02:46:11 2011
@@ -75,6 +75,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
@@ -143,6 +144,7 @@ public class JobImpl implements org.apac
   private final Set<TaskId> mapTasks = new LinkedHashSet<TaskId>();
   private final Set<TaskId> reduceTasks = new LinkedHashSet<TaskId>();
   private final EventHandler eventHandler;
+  private final MRAppMetrics metrics;
 
   private boolean lazyTasksCopyNeeded = false;
   private volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
@@ -222,7 +224,7 @@ public class JobImpl implements org.apac
 
           // Transitions from RUNNING state
           .addTransition(JobState.RUNNING, JobState.RUNNING,
-              JobEventType.JOB_TASK_ATTEMPT_COMPLETED_EVENT,
+              JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
               TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
           .addTransition
               (JobState.RUNNING,
@@ -252,7 +254,7 @@ public class JobImpl implements org.apac
               JobEventType.JOB_TASK_COMPLETED,
               new KillWaitTaskCompletedTransition())
           .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
-              JobEventType.JOB_TASK_ATTEMPT_COMPLETED_EVENT,
+              JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
               TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
           .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
@@ -313,7 +315,7 @@ public class JobImpl implements org.apac
               EnumSet.of(JobEventType.JOB_INIT,
                   JobEventType.JOB_KILL,
                   JobEventType.JOB_TASK_COMPLETED,
-                  JobEventType.JOB_TASK_ATTEMPT_COMPLETED_EVENT,
+                  JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
                   JobEventType.JOB_DIAGNOSTIC_UPDATE,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
@@ -349,10 +351,11 @@ public class JobImpl implements org.apac
       EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
       JobTokenSecretManager jobTokenSecretManager,
       Credentials fsTokenCredentials, Clock clock, int startCount, 
-      Set<TaskId> completedTasksFromPreviousRun) {
+      Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics) {
 
     this.jobId = recordFactory.newRecordInstance(JobId.class);
     this.conf = conf;
+    this.metrics = metrics;
     this.clock = clock;
     this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
     this.startCount = startCount;
@@ -654,15 +657,31 @@ public class JobImpl implements org.apac
     } else if (task.getType() == TaskType.REDUCE) {
       reduceTasks.add(task.getID());
     }
+    metrics.waitingTask(task);
   }
 
   private void setFinishTime() {
     finishTime = clock.getTime();
   }
   
-  private void finished() {
+  private JobState finished(JobState finalState) {
+    if (getState() == JobState.RUNNING) {
+      metrics.endRunningJob(this);
+    }
     if (finishTime == 0) setFinishTime();
     eventHandler.handle(new JobFinishEvent(jobId));
+
+    switch (finalState) {
+      case KILLED:
+        metrics.killedJob(this);
+        break;
+      case FAILED:
+        metrics.failedJob(this);
+        break;
+      case SUCCEEDED:
+        metrics.completedJob(this);
+    }
+    return finalState;
   }
 
   @Override
@@ -694,6 +713,8 @@ public class JobImpl implements org.apac
     @Override
     public JobState transition(JobImpl job, JobEvent event) {
       job.startTime = job.clock.getTime();
+      job.metrics.submittedJob(job);
+      job.metrics.preparingJob(job);
       try {
         setup(job);
         job.fs = FileSystem.get(job.conf);
@@ -852,12 +873,14 @@ public class JobImpl implements org.apac
         createMapTasks(job, inputLength, taskSplitMetaInfo);
         createReduceTasks(job);
 
+        job.metrics.endPreparingJob(job);
         return JobState.INITED;
 
       } catch (Exception e) {
         LOG.warn("Job init failed", e);
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
-        return JobState.FAILED;
+        job.metrics.endPreparingJob(job);
+        return job.finished(JobState.FAILED);
       }
     }
 
@@ -949,7 +972,8 @@ public class JobImpl implements org.apac
                 job.conf, splits[i], 
                 job.taskAttemptListener, 
                 job.committer, job.jobToken, job.fsTokens.getAllTokens(), 
-                job.clock, job.completedTasksFromPreviousRun, job.startCount);
+                job.clock, job.completedTasksFromPreviousRun, job.startCount,
+                job.metrics);
         job.addTask(task);
       }
       LOG.info("Input size for job " + job.jobId + " = " + inputLength
@@ -965,7 +989,7 @@ public class JobImpl implements org.apac
                 job.conf, job.numMapTasks, 
                 job.taskAttemptListener, job.committer, job.jobToken,
                 job.fsTokens.getAllTokens(), job.clock, 
-                job.completedTasksFromPreviousRun, job.startCount);
+                job.completedTasksFromPreviousRun, job.startCount, job.metrics);
         job.addTask(task);
       }
       LOG.info("Number of reduces for job " + job.jobId + " = "
@@ -1011,6 +1035,7 @@ public class JobImpl implements org.apac
              job.isUber, 0, 0,  // FIXME: lose latter two args again (old-style uber junk:  needs to go along with 98% of other old-style uber junk)
              JobState.NEW.toString());
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
+      job.metrics.runningJob(job);
     }
   }
 
@@ -1054,7 +1079,7 @@ public class JobImpl implements org.apac
               job.finishTime, 0, 0,
               org.apache.hadoop.mapreduce.JobStatus.State.FAILED.toString()); //TODO correct state
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
-      job.finished();
+      job.finished(JobState.KILLED);
     }
   }
 
@@ -1063,7 +1088,7 @@ public class JobImpl implements org.apac
     @Override
     public void transition(JobImpl job, JobEvent event) {
       job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
-      job.finished();
+      job.finished(JobState.KILLED);
     }
   }
 
@@ -1075,6 +1100,7 @@ public class JobImpl implements org.apac
         job.eventHandler.handle(
             new TaskEvent(task.getID(), TaskEventType.T_KILL));
       }
+      job.metrics.endRunningJob(job);
     }
   }
 
@@ -1177,8 +1203,7 @@ public class JobImpl implements org.apac
         //TODO This event not likely required - sent via abort(). 
 
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
-        job.finished();
-        return JobState.FAILED;
+        return job.finished(JobState.FAILED);
       }
       
       //check for Job success
@@ -1201,8 +1226,7 @@ public class JobImpl implements org.apac
         LOG.info("Calling handler for JobFinishedEvent ");
         job.eventHandler.handle(new JobHistoryEvent(job.jobId, jfe));
 
-        job.finished();
-        return JobState.SUCCEEDED;
+        return job.finished(JobState.SUCCEEDED);
       }
       
       //return the current state, Job not finished yet
@@ -1225,6 +1249,7 @@ public class JobImpl implements org.apac
       } else {
         job.succeededReduceTaskCount++;
       }
+      job.metrics.completedTask(task);
     }
   
     private void taskFailed(JobImpl job, Task task) {
@@ -1233,6 +1258,7 @@ public class JobImpl implements org.apac
       } else if (task.getType() == TaskType.REDUCE) {
         job.failedReduceTaskCount++;
       }
+      job.metrics.failedTask(task);
     }
 
     private void taskKilled(JobImpl job, Task task) {
@@ -1241,6 +1267,7 @@ public class JobImpl implements org.apac
       } else if (task.getType() == TaskType.REDUCE) {
         job.killedReduceTaskCount++;
       }
+      job.metrics.killedTask(task);
     }
   }
 
@@ -1261,8 +1288,7 @@ public class JobImpl implements org.apac
       if (job.completedTaskCount == job.tasks.size()) {
         job.setFinishTime();
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
-        job.finished();
-        return JobState.KILLED;
+        return job.finished(JobState.KILLED);
       }
       //return the current state, Job not finished yet
       return job.getState();
@@ -1287,7 +1313,7 @@ public class JobImpl implements org.apac
     @Override
     public void transition(JobImpl job, JobEvent event) {
       //TODO JH Event?
-      job.finished();
+      job.finished(JobState.ERROR);
     }
   }
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java Thu May 26 02:46:11 2011
@@ -19,7 +19,6 @@
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -32,6 +31,7 @@ import org.apache.hadoop.mapreduce.split
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -49,10 +49,11 @@ public class MapTaskImpl extends TaskImp
       TaskAttemptListener taskAttemptListener, OutputCommitter committer,
       Token<JobTokenIdentifier> jobToken,
       Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock, 
-      Set<TaskId> completedTasksFromPreviousRun, int startCount) {
+      Set<TaskId> completedTasksFromPreviousRun, int startCount,
+      MRAppMetrics metrics) {
     super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
         conf, taskAttemptListener, committer, jobToken, fsTokens, clock, 
-        completedTasksFromPreviousRun, startCount);
+        completedTasksFromPreviousRun, startCount, metrics);
     this.taskSplitMetaInfo = taskSplitMetaInfo;
   }
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java Thu May 26 02:46:11 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.secur
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -45,10 +46,11 @@ public class ReduceTaskImpl extends Task
       int numMapTasks, TaskAttemptListener taskAttemptListener,
       OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
       Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock, 
-      Set<TaskId> completedTasksFromPreviousRun, int startCount) {
+      Set<TaskId> completedTasksFromPreviousRun, int startCount,
+      MRAppMetrics metrics) {
     super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
         taskAttemptListener, committer, jobToken, fsTokens, clock,
-        completedTasksFromPreviousRun, startCount);
+        completedTasksFromPreviousRun, startCount, metrics);
     this.numMapTasks = numMapTasks;
   }
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu May 26 02:46:11 2011
@@ -96,7 +96,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Thu May 26 02:46:11 2011
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -49,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@@ -94,6 +94,7 @@ public abstract class TaskImpl implement
   protected final Clock clock;
   private final Lock readLock;
   private final Lock writeLock;
+  private final MRAppMetrics metrics;
   
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   
@@ -126,7 +127,7 @@ public abstract class TaskImpl implement
     // Transitions from SCHEDULED state
       //when the first attempt is launched, the task state is set to RUNNING
      .addTransition(TaskState.SCHEDULED, TaskState.RUNNING, 
-         TaskEventType.T_ATTEMPT_LAUNCHED)
+         TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
      .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT, 
          TaskEventType.T_KILL, KILL_TRANSITION)
      .addTransition(TaskState.SCHEDULED, TaskState.SCHEDULED, 
@@ -221,7 +222,8 @@ public abstract class TaskImpl implement
       TaskAttemptListener taskAttemptListener, OutputCommitter committer,
       Token<JobTokenIdentifier> jobToken,
       Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
-      Set<TaskId> completedTasksFromPreviousRun, int startCount) {
+      Set<TaskId> completedTasksFromPreviousRun, int startCount,
+      MRAppMetrics metrics) {
     this.conf = conf;
     this.clock = clock;
     this.jobFile = remoteJobConfFile;
@@ -243,6 +245,7 @@ public abstract class TaskImpl implement
     this.committer = committer;
     this.fsTokens = fsTokens;
     this.jobToken = jobToken;
+    this.metrics = metrics;
 
     if (completedTasksFromPreviousRun != null 
         && completedTasksFromPreviousRun.contains(taskId)) {
@@ -400,6 +403,24 @@ public abstract class TaskImpl implement
     return finishTime;
   }
 
+  private TaskState finished(TaskState finalState) {
+    if (getState() == TaskState.RUNNING) {
+      metrics.endRunningTask(this);
+    }
+    switch (finalState) {
+      case FAILED:
+        metrics.failedTask(this);
+        break;
+      case KILLED:
+        metrics.killedTask(this);
+        break;
+      case SUCCEEDED:
+        metrics.completedTask(this);
+        break;
+    }
+    return finalState;
+  }
+
   //select the nextAttemptNumber with best progress
   // always called inside the Read Lock
   private TaskAttempt selectBestAttempt() {
@@ -547,6 +568,7 @@ public abstract class TaskImpl implement
       task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
 
       task.addAndScheduleAttempt();
+      task.metrics.launchedTask(task);
     }
   }
 
@@ -620,6 +642,7 @@ public abstract class TaskImpl implement
                   TaskAttemptEventType.TA_KILL));
         }
       }
+      task.finished(TaskState.SUCCEEDED);
     }
   }
 
@@ -688,7 +711,7 @@ public abstract class TaskImpl implement
         task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfi));
         task.eventHandler.handle(
             new JobTaskEvent(task.taskId, TaskState.FAILED));
-        return TaskState.FAILED;
+        return task.finished(TaskState.FAILED);
       }
       return getDefaultState(task);
     }
@@ -744,6 +767,7 @@ public abstract class TaskImpl implement
       task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfe));
       task.eventHandler.handle(
           new JobTaskEvent(task.taskId, TaskState.KILLED));
+      task.metrics.endWaitingTask(task);
     }
   }
 
@@ -768,4 +792,12 @@ public abstract class TaskImpl implement
       task.numberUncompletedAttempts = 0;
     }
   }
+
+  static class LaunchTransition
+      implements SingleArcTransition<TaskImpl, TaskEvent> {
+    @Override
+    public void transition(TaskImpl task, TaskEvent event) {
+      task.metrics.runningTask(task);
+    }
+  }
 }

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/metrics/MRAppMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/metrics/MRAppMetrics.java?rev=1127764&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/metrics/MRAppMetrics.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/metrics/MRAppMetrics.java Thu May 26 02:46:11 2011
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app.metrics;
+
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+
+@Metrics(about="MR App Metrics", context="mapred")
+public class MRAppMetrics {
+  @Metric MutableCounterInt jobsSubmitted;
+  @Metric MutableCounterInt jobsCompleted;
+  @Metric MutableCounterInt jobsFailed;
+  @Metric MutableCounterInt jobsKilled;
+  @Metric MutableGaugeInt jobsPreparing;
+  @Metric MutableGaugeInt jobsRunning;
+
+  @Metric MutableCounterInt mapsLaunched;
+  @Metric MutableCounterInt mapsCompleted;
+  @Metric MutableCounterInt mapsFailed;
+  @Metric MutableCounterInt mapsKilled;
+  @Metric MutableGaugeInt mapsRunning;
+  @Metric MutableGaugeInt mapsWaiting;
+
+  @Metric MutableCounterInt reducesLaunched;
+  @Metric MutableCounterInt reducesCompleted;
+  @Metric MutableCounterInt reducesFailed;
+  @Metric MutableCounterInt reducesKilled;
+  @Metric MutableGaugeInt reducesRunning;
+  @Metric MutableGaugeInt reducesWaiting;
+  
+  public static MRAppMetrics create() {
+    return create(DefaultMetricsSystem.instance());
+  }
+
+  public static MRAppMetrics create(MetricsSystem ms) {
+    JvmMetrics.create("MRAppMaster", null, ms);
+    return ms.register(new MRAppMetrics());
+  }
+
+  // potential instrumentation interface methods
+
+  public void submittedJob(Job job) {
+    jobsSubmitted.incr();
+  }
+
+  public void completedJob(Job job) {
+    jobsCompleted.incr();
+  }
+
+  public void failedJob(Job job) {
+    jobsFailed.incr();
+  }
+
+  public void killedJob(Job job) {
+    jobsKilled.incr();
+  }
+
+  public void preparingJob(Job job) {
+    jobsPreparing.incr();
+  }
+
+  public void endPreparingJob(Job job) {
+    jobsPreparing.decr();
+  }
+
+  public void runningJob(Job job) {
+    jobsRunning.incr();
+  }
+
+  public void endRunningJob(Job job) {
+    jobsRunning.decr();
+  }
+
+  public void launchedTask(Task task) {
+    switch (task.getType()) {
+      case MAP:
+        mapsLaunched.incr();
+        break;
+      case REDUCE:
+        reducesLaunched.incr();
+        break;
+    }
+    endWaitingTask(task);
+  }
+
+  public void completedTask(Task task) {
+    switch (task.getType()) {
+      case MAP:
+        mapsCompleted.incr();
+        break;
+      case REDUCE:
+        reducesCompleted.incr();
+        break;
+    }
+  }
+
+  public void failedTask(Task task) {
+    switch (task.getType()) {
+      case MAP:
+        mapsFailed.incr();
+        break;
+      case REDUCE:
+        reducesFailed.incr();
+        break;
+    }
+  }
+
+  public void killedTask(Task task) {
+    switch (task.getType()) {
+      case MAP:
+        mapsKilled.incr();
+        break;
+      case REDUCE:
+        reducesKilled.incr();
+        break;
+    }
+  }
+
+  public void runningTask(Task task) {
+    switch (task.getType()) {
+      case MAP:
+        mapsRunning.incr();
+        break;
+      case REDUCE:
+        reducesRunning.incr();
+        break;
+    }
+  }
+
+  public void endRunningTask(Task task) {
+    switch (task.getType()) {
+      case MAP:
+        mapsRunning.decr();
+        break;
+      case REDUCE:
+        reducesRunning.decr();
+        break;
+    }
+  }
+
+  public void waitingTask(Task task) {
+    switch (task.getType()) {
+      case MAP:
+        mapsWaiting.incr();
+        break;
+      case REDUCE:
+        reducesWaiting.incr();
+    }
+  }
+
+  public void endWaitingTask(Task task) {
+    switch (task.getType()) {
+      case MAP:
+        mapsWaiting.decr();
+        break;
+      case REDUCE:
+        reducesWaiting.decr();
+        break;
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Thu May 26 02:46:11 2011
@@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
@@ -79,9 +80,8 @@ import org.apache.hadoop.yarn.state.Stat
  * No threads are started except of the event Dispatcher thread.
  */
 public class MRApp extends MRAppMaster {
-
   private static final Log LOG = LogFactory.getLog(MRApp.class);
-  
+
   int maps;
   int reduces;
 
@@ -90,7 +90,7 @@ public class MRApp extends MRAppMaster {
 
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
-  
+
   //if true, tasks complete automatically as soon as they are launched
   protected boolean autoComplete = false;
 
@@ -134,6 +134,7 @@ public class MRApp extends MRAppMaster {
 
     init(conf);
     start();
+    DefaultMetricsSystem.shutdown();
     Job job = getContext().getAllJobs().values().iterator().next();
     return job;
   }
@@ -359,7 +360,7 @@ public class MRApp extends MRAppMaster {
         TaskAttemptListener taskAttemptListener, Clock clock) {
       super(appID, new Configuration(), eventHandler, taskAttemptListener,
           new JobTokenSecretManager(), new Credentials(), clock, getStartCount(), 
-          getCompletedTaskFromPreviousRun());
+          getCompletedTaskFromPreviousRun(), metrics);
 
       // This "this leak" is okay because the retained pointer is in an
       //  instance variable.

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Thu May 26 02:46:11 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
@@ -74,12 +75,19 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestRMContainerAllocator {
   private static final Log LOG = LogFactory.getLog(TestRMContainerAllocator.class);
   private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
+  @BeforeClass
+  public static void preTests() {
+    DefaultMetricsSystem.shutdown();
+  }
+
   @Test
   public void testSimple() throws Exception {
     FifoScheduler scheduler = createScheduler();

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/metrics/TestMRAppMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/metrics/TestMRAppMetrics.java?rev=1127764&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/metrics/TestMRAppMetrics.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/metrics/TestMRAppMetrics.java Thu May 26 02:46:11 2011
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app.metrics;
+
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+
+import static org.apache.hadoop.test.MetricsAsserts.*;
+import static org.apache.hadoop.test.MockitoMaker.*;
+
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestMRAppMetrics {
+
+  @Test public void testNames() {
+    Job job = mock(Job.class);
+    Task mapTask = make(stub(Task.class).returning(TaskType.MAP).
+                        from.getType());
+    Task reduceTask = make(stub(Task.class).returning(TaskType.REDUCE).
+                           from.getType());
+    MRAppMetrics metrics = MRAppMetrics.create();
+
+    metrics.submittedJob(job);
+    metrics.waitingTask(mapTask);
+    metrics.waitingTask(reduceTask);
+    metrics.preparingJob(job);
+    metrics.submittedJob(job);
+    metrics.waitingTask(mapTask);
+    metrics.waitingTask(reduceTask);
+    metrics.preparingJob(job);
+    metrics.submittedJob(job);
+    metrics.waitingTask(mapTask);
+    metrics.waitingTask(reduceTask);
+    metrics.preparingJob(job);
+    metrics.endPreparingJob(job);
+    metrics.endPreparingJob(job);
+    metrics.endPreparingJob(job);
+
+    metrics.runningJob(job);
+    metrics.launchedTask(mapTask);
+    metrics.runningTask(mapTask);
+    metrics.failedTask(mapTask);
+    metrics.endWaitingTask(reduceTask);
+    metrics.endRunningTask(mapTask);
+    metrics.endRunningJob(job);
+    metrics.failedJob(job);
+
+    metrics.runningJob(job);
+    metrics.launchedTask(mapTask);
+    metrics.runningTask(mapTask);
+    metrics.killedTask(mapTask);
+    metrics.endWaitingTask(reduceTask);
+    metrics.endRunningTask(mapTask);
+    metrics.endRunningJob(job);
+    metrics.killedJob(job);
+
+    metrics.runningJob(job);
+    metrics.launchedTask(mapTask);
+    metrics.runningTask(mapTask);
+    metrics.completedTask(mapTask);
+    metrics.endRunningTask(mapTask);
+    metrics.launchedTask(reduceTask);
+    metrics.runningTask(reduceTask);
+    metrics.completedTask(reduceTask);
+    metrics.endRunningTask(reduceTask);
+    metrics.endRunningJob(job);
+    metrics.completedJob(job);
+
+    checkMetrics(/*job*/3, 1, 1, 1, 0, 0,
+                 /*map*/3, 1, 1, 1, 0, 0,
+                 /*reduce*/1, 1, 0, 0, 0, 0);
+  }
+
+  private void checkMetrics(int jobsSubmitted, int jobsCompleted,
+      int jobsFailed, int jobsKilled, int jobsPreparing, int jobsRunning,
+      int mapsLaunched, int mapsCompleted, int mapsFailed, int mapsKilled,
+      int mapsRunning, int mapsWaiting, int reducesLaunched,
+      int reducesCompleted, int reducesFailed, int reducesKilled,
+      int reducesRunning, int reducesWaiting) {
+    MetricsRecordBuilder rb = getMetrics("MRAppMetrics");
+    assertCounter("JobsSubmitted", jobsSubmitted, rb);
+    assertCounter("JobsCompleted", jobsCompleted, rb);
+    assertCounter("JobsFailed", jobsFailed, rb);
+    assertCounter("JobsKilled", jobsKilled, rb);
+    assertGauge("JobsPreparing", jobsPreparing, rb);
+    assertGauge("JobsRunning", jobsRunning, rb);
+
+    assertCounter("MapsLaunched", mapsLaunched, rb);
+    assertCounter("MapsCompleted", mapsCompleted, rb);
+    assertCounter("MapsFailed", mapsFailed, rb);
+    assertCounter("MapsKilled", mapsKilled, rb);
+    assertGauge("MapsRunning", mapsRunning, rb);
+    assertGauge("MapsWaiting", mapsWaiting, rb);
+
+    assertCounter("ReducesLaunched", reducesLaunched, rb);
+    assertCounter("ReducesCompleted", reducesCompleted, rb);
+    assertCounter("ReducesFailed", reducesFailed, rb);
+    assertCounter("ReducesKilled", reducesKilled, rb);
+    assertGauge("ReducesRunning", reducesRunning, rb);
+    assertGauge("ReducesWaiting", reducesWaiting, rb);
+  }
+}