You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/05/26 04:46:12 UTC
svn commit: r1127764 - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/
mr...
Author: mahadev
Date: Thu May 26 02:46:11 2011
New Revision: 1127764
URL: http://svn.apache.org/viewvc?rev=1127764&view=rev
Log:
MAPREDUCE-2527. Metrics for MRAppMaster (Luke lu via mahadev)
Added:
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/metrics/
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/metrics/MRAppMetrics.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/metrics/
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/metrics/TestMRAppMetrics.java
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptCompletedEvent.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Thu May 26 02:46:11 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
MAPREDUCE-279
+ MAPREDUCE-2527. Metrics for MRAppMaster (Luke lu via mahadev)
+
Add debug config for delaying delete of local files. (cdouglas)
Fixing race condition leader to hung jobs in scheduler negotiator (mahadev)
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu May 26 02:46:11 2011
@@ -58,6 +58,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
@@ -67,6 +68,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -109,6 +111,7 @@ public class MRAppMaster extends Composi
private Clock clock;
private final int startCount;
private final ApplicationId appID;
+ protected final MRAppMetrics metrics;
private Set<TaskId> completedTasksFromPreviousRun;
private AppContext context;
private Dispatcher dispatcher;
@@ -132,6 +135,7 @@ public class MRAppMaster extends Composi
this.clock = clock;
this.appID = applicationId;
this.startCount = startCount;
+ this.metrics = MRAppMetrics.create();
LOG.info("Created MRAppMaster for application " + applicationId);
}
@@ -269,8 +273,8 @@ public class MRAppMaster extends Composi
// create single job
Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(),
- taskAttemptListener, jobTokenSecretManager, fsTokens,
- clock, startCount, completedTasksFromPreviousRun);
+ taskAttemptListener, jobTokenSecretManager, fsTokens, clock, startCount,
+ completedTasksFromPreviousRun, metrics);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
@@ -467,6 +471,10 @@ public class MRAppMaster extends Composi
@Override
public void start() {
+ // metrics system init is really init & start.
+ // It's more test friendly to put it here.
+ DefaultMetricsSystem.initialize("MRAppMaster");
+
startJobs();
//start all the components
super.start();
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Thu May 26 02:46:11 2011
@@ -33,7 +33,7 @@ public enum JobEventType {
//Producer:Task
JOB_TASK_COMPLETED,
JOB_MAP_TASK_RESCHEDULED,
- JOB_TASK_ATTEMPT_COMPLETED_EVENT, // why "_EVENT" only on this one?
+ JOB_TASK_ATTEMPT_COMPLETED,
//Producer:Any component
JOB_DIAGNOSTIC_UPDATE,
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptCompletedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptCompletedEvent.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptCompletedEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptCompletedEvent.java Thu May 26 02:46:11 2011
@@ -27,7 +27,7 @@ public class JobTaskAttemptCompletedEven
public JobTaskAttemptCompletedEvent(TaskAttemptCompletionEvent completionEvent) {
super(completionEvent.getAttemptId().getTaskId().getJobId(),
- JobEventType.JOB_TASK_ATTEMPT_COMPLETED_EVENT);
+ JobEventType.JOB_TASK_ATTEMPT_COMPLETED);
this.completionEvent = completionEvent;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Thu May 26 02:46:11 2011
@@ -75,6 +75,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
@@ -143,6 +144,7 @@ public class JobImpl implements org.apac
private final Set<TaskId> mapTasks = new LinkedHashSet<TaskId>();
private final Set<TaskId> reduceTasks = new LinkedHashSet<TaskId>();
private final EventHandler eventHandler;
+ private final MRAppMetrics metrics;
private boolean lazyTasksCopyNeeded = false;
private volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
@@ -222,7 +224,7 @@ public class JobImpl implements org.apac
// Transitions from RUNNING state
.addTransition(JobState.RUNNING, JobState.RUNNING,
- JobEventType.JOB_TASK_ATTEMPT_COMPLETED_EVENT,
+ JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
.addTransition
(JobState.RUNNING,
@@ -252,7 +254,7 @@ public class JobImpl implements org.apac
JobEventType.JOB_TASK_COMPLETED,
new KillWaitTaskCompletedTransition())
.addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
- JobEventType.JOB_TASK_ATTEMPT_COMPLETED_EVENT,
+ JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
.addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
@@ -313,7 +315,7 @@ public class JobImpl implements org.apac
EnumSet.of(JobEventType.JOB_INIT,
JobEventType.JOB_KILL,
JobEventType.JOB_TASK_COMPLETED,
- JobEventType.JOB_TASK_ATTEMPT_COMPLETED_EVENT,
+ JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
JobEventType.JOB_MAP_TASK_RESCHEDULED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
@@ -349,10 +351,11 @@ public class JobImpl implements org.apac
EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock, int startCount,
- Set<TaskId> completedTasksFromPreviousRun) {
+ Set<TaskId> completedTasksFromPreviousRun, MRAppMetrics metrics) {
this.jobId = recordFactory.newRecordInstance(JobId.class);
this.conf = conf;
+ this.metrics = metrics;
this.clock = clock;
this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
this.startCount = startCount;
@@ -654,15 +657,31 @@ public class JobImpl implements org.apac
} else if (task.getType() == TaskType.REDUCE) {
reduceTasks.add(task.getID());
}
+ metrics.waitingTask(task);
}
private void setFinishTime() {
finishTime = clock.getTime();
}
- private void finished() {
+ private JobState finished(JobState finalState) {
+ if (getState() == JobState.RUNNING) {
+ metrics.endRunningJob(this);
+ }
if (finishTime == 0) setFinishTime();
eventHandler.handle(new JobFinishEvent(jobId));
+
+ switch (finalState) {
+ case KILLED:
+ metrics.killedJob(this);
+ break;
+ case FAILED:
+ metrics.failedJob(this);
+ break;
+ case SUCCEEDED:
+ metrics.completedJob(this);
+ }
+ return finalState;
}
@Override
@@ -694,6 +713,8 @@ public class JobImpl implements org.apac
@Override
public JobState transition(JobImpl job, JobEvent event) {
job.startTime = job.clock.getTime();
+ job.metrics.submittedJob(job);
+ job.metrics.preparingJob(job);
try {
setup(job);
job.fs = FileSystem.get(job.conf);
@@ -852,12 +873,14 @@ public class JobImpl implements org.apac
createMapTasks(job, inputLength, taskSplitMetaInfo);
createReduceTasks(job);
+ job.metrics.endPreparingJob(job);
return JobState.INITED;
} catch (Exception e) {
LOG.warn("Job init failed", e);
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
- return JobState.FAILED;
+ job.metrics.endPreparingJob(job);
+ return job.finished(JobState.FAILED);
}
}
@@ -949,7 +972,8 @@ public class JobImpl implements org.apac
job.conf, splits[i],
job.taskAttemptListener,
job.committer, job.jobToken, job.fsTokens.getAllTokens(),
- job.clock, job.completedTasksFromPreviousRun, job.startCount);
+ job.clock, job.completedTasksFromPreviousRun, job.startCount,
+ job.metrics);
job.addTask(task);
}
LOG.info("Input size for job " + job.jobId + " = " + inputLength
@@ -965,7 +989,7 @@ public class JobImpl implements org.apac
job.conf, job.numMapTasks,
job.taskAttemptListener, job.committer, job.jobToken,
job.fsTokens.getAllTokens(), job.clock,
- job.completedTasksFromPreviousRun, job.startCount);
+ job.completedTasksFromPreviousRun, job.startCount, job.metrics);
job.addTask(task);
}
LOG.info("Number of reduces for job " + job.jobId + " = "
@@ -1011,6 +1035,7 @@ public class JobImpl implements org.apac
job.isUber, 0, 0, // FIXME: lose latter two args again (old-style uber junk: needs to go along with 98% of other old-style uber junk)
JobState.NEW.toString());
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
+ job.metrics.runningJob(job);
}
}
@@ -1054,7 +1079,7 @@ public class JobImpl implements org.apac
job.finishTime, 0, 0,
org.apache.hadoop.mapreduce.JobStatus.State.FAILED.toString()); //TODO correct state
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
- job.finished();
+ job.finished(JobState.KILLED);
}
}
@@ -1063,7 +1088,7 @@ public class JobImpl implements org.apac
@Override
public void transition(JobImpl job, JobEvent event) {
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
- job.finished();
+ job.finished(JobState.KILLED);
}
}
@@ -1075,6 +1100,7 @@ public class JobImpl implements org.apac
job.eventHandler.handle(
new TaskEvent(task.getID(), TaskEventType.T_KILL));
}
+ job.metrics.endRunningJob(job);
}
}
@@ -1177,8 +1203,7 @@ public class JobImpl implements org.apac
//TODO This event not likely required - sent via abort().
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
- job.finished();
- return JobState.FAILED;
+ return job.finished(JobState.FAILED);
}
//check for Job success
@@ -1201,8 +1226,7 @@ public class JobImpl implements org.apac
LOG.info("Calling handler for JobFinishedEvent ");
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jfe));
- job.finished();
- return JobState.SUCCEEDED;
+ return job.finished(JobState.SUCCEEDED);
}
//return the current state, Job not finished yet
@@ -1225,6 +1249,7 @@ public class JobImpl implements org.apac
} else {
job.succeededReduceTaskCount++;
}
+ job.metrics.completedTask(task);
}
private void taskFailed(JobImpl job, Task task) {
@@ -1233,6 +1258,7 @@ public class JobImpl implements org.apac
} else if (task.getType() == TaskType.REDUCE) {
job.failedReduceTaskCount++;
}
+ job.metrics.failedTask(task);
}
private void taskKilled(JobImpl job, Task task) {
@@ -1241,6 +1267,7 @@ public class JobImpl implements org.apac
} else if (task.getType() == TaskType.REDUCE) {
job.killedReduceTaskCount++;
}
+ job.metrics.killedTask(task);
}
}
@@ -1261,8 +1288,7 @@ public class JobImpl implements org.apac
if (job.completedTaskCount == job.tasks.size()) {
job.setFinishTime();
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
- job.finished();
- return JobState.KILLED;
+ return job.finished(JobState.KILLED);
}
//return the current state, Job not finished yet
return job.getState();
@@ -1287,7 +1313,7 @@ public class JobImpl implements org.apac
@Override
public void transition(JobImpl job, JobEvent event) {
//TODO JH Event?
- job.finished();
+ job.finished(JobState.ERROR);
}
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java Thu May 26 02:46:11 2011
@@ -19,7 +19,6 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.Collection;
-import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
@@ -32,6 +31,7 @@ import org.apache.hadoop.mapreduce.split
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -49,10 +49,11 @@ public class MapTaskImpl extends TaskImp
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
- Set<TaskId> completedTasksFromPreviousRun, int startCount) {
+ Set<TaskId> completedTasksFromPreviousRun, int startCount,
+ MRAppMetrics metrics) {
super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
conf, taskAttemptListener, committer, jobToken, fsTokens, clock,
- completedTasksFromPreviousRun, startCount);
+ completedTasksFromPreviousRun, startCount, metrics);
this.taskSplitMetaInfo = taskSplitMetaInfo;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java Thu May 26 02:46:11 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.secur
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -45,10 +46,11 @@ public class ReduceTaskImpl extends Task
int numMapTasks, TaskAttemptListener taskAttemptListener,
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
- Set<TaskId> completedTasksFromPreviousRun, int startCount) {
+ Set<TaskId> completedTasksFromPreviousRun, int startCount,
+ MRAppMetrics metrics) {
super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
taskAttemptListener, committer, jobToken, fsTokens, clock,
- completedTasksFromPreviousRun, startCount);
+ completedTasksFromPreviousRun, startCount, metrics);
this.numMapTasks = numMapTasks;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu May 26 02:46:11 2011
@@ -96,7 +96,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Thu May 26 02:46:11 2011
@@ -18,7 +18,6 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -49,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@@ -94,6 +94,7 @@ public abstract class TaskImpl implement
protected final Clock clock;
private final Lock readLock;
private final Lock writeLock;
+ private final MRAppMetrics metrics;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
@@ -126,7 +127,7 @@ public abstract class TaskImpl implement
// Transitions from SCHEDULED state
//when the first attempt is launched, the task state is set to RUNNING
.addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
- TaskEventType.T_ATTEMPT_LAUNCHED)
+ TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
.addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT,
TaskEventType.T_KILL, KILL_TRANSITION)
.addTransition(TaskState.SCHEDULED, TaskState.SCHEDULED,
@@ -221,7 +222,8 @@ public abstract class TaskImpl implement
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
- Set<TaskId> completedTasksFromPreviousRun, int startCount) {
+ Set<TaskId> completedTasksFromPreviousRun, int startCount,
+ MRAppMetrics metrics) {
this.conf = conf;
this.clock = clock;
this.jobFile = remoteJobConfFile;
@@ -243,6 +245,7 @@ public abstract class TaskImpl implement
this.committer = committer;
this.fsTokens = fsTokens;
this.jobToken = jobToken;
+ this.metrics = metrics;
if (completedTasksFromPreviousRun != null
&& completedTasksFromPreviousRun.contains(taskId)) {
@@ -400,6 +403,24 @@ public abstract class TaskImpl implement
return finishTime;
}
+ private TaskState finished(TaskState finalState) {
+ if (getState() == TaskState.RUNNING) {
+ metrics.endRunningTask(this);
+ }
+ switch (finalState) {
+ case FAILED:
+ metrics.failedTask(this);
+ break;
+ case KILLED:
+ metrics.killedTask(this);
+ break;
+ case SUCCEEDED:
+ metrics.completedTask(this);
+ break;
+ }
+ return finalState;
+ }
+
//select the nextAttemptNumber with best progress
// always called inside the Read Lock
private TaskAttempt selectBestAttempt() {
@@ -547,6 +568,7 @@ public abstract class TaskImpl implement
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
task.addAndScheduleAttempt();
+ task.metrics.launchedTask(task);
}
}
@@ -620,6 +642,7 @@ public abstract class TaskImpl implement
TaskAttemptEventType.TA_KILL));
}
}
+ task.finished(TaskState.SUCCEEDED);
}
}
@@ -688,7 +711,7 @@ public abstract class TaskImpl implement
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfi));
task.eventHandler.handle(
new JobTaskEvent(task.taskId, TaskState.FAILED));
- return TaskState.FAILED;
+ return task.finished(TaskState.FAILED);
}
return getDefaultState(task);
}
@@ -744,6 +767,7 @@ public abstract class TaskImpl implement
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfe));
task.eventHandler.handle(
new JobTaskEvent(task.taskId, TaskState.KILLED));
+ task.metrics.endWaitingTask(task);
}
}
@@ -768,4 +792,12 @@ public abstract class TaskImpl implement
task.numberUncompletedAttempts = 0;
}
}
+
+ static class LaunchTransition
+ implements SingleArcTransition<TaskImpl, TaskEvent> {
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ task.metrics.runningTask(task);
+ }
+ }
}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/metrics/MRAppMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/metrics/MRAppMetrics.java?rev=1127764&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/metrics/MRAppMetrics.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/metrics/MRAppMetrics.java Thu May 26 02:46:11 2011
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app.metrics;
+
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+
+@Metrics(about="MR App Metrics", context="mapred")
+public class MRAppMetrics {
+ @Metric MutableCounterInt jobsSubmitted;
+ @Metric MutableCounterInt jobsCompleted;
+ @Metric MutableCounterInt jobsFailed;
+ @Metric MutableCounterInt jobsKilled;
+ @Metric MutableGaugeInt jobsPreparing;
+ @Metric MutableGaugeInt jobsRunning;
+
+ @Metric MutableCounterInt mapsLaunched;
+ @Metric MutableCounterInt mapsCompleted;
+ @Metric MutableCounterInt mapsFailed;
+ @Metric MutableCounterInt mapsKilled;
+ @Metric MutableGaugeInt mapsRunning;
+ @Metric MutableGaugeInt mapsWaiting;
+
+ @Metric MutableCounterInt reducesLaunched;
+ @Metric MutableCounterInt reducesCompleted;
+ @Metric MutableCounterInt reducesFailed;
+ @Metric MutableCounterInt reducesKilled;
+ @Metric MutableGaugeInt reducesRunning;
+ @Metric MutableGaugeInt reducesWaiting;
+
+ public static MRAppMetrics create() {
+ return create(DefaultMetricsSystem.instance());
+ }
+
+ public static MRAppMetrics create(MetricsSystem ms) {
+ JvmMetrics.create("MRAppMaster", null, ms);
+ return ms.register(new MRAppMetrics());
+ }
+
+ // potential instrumentation interface methods
+
+ public void submittedJob(Job job) {
+ jobsSubmitted.incr();
+ }
+
+ public void completedJob(Job job) {
+ jobsCompleted.incr();
+ }
+
+ public void failedJob(Job job) {
+ jobsFailed.incr();
+ }
+
+ public void killedJob(Job job) {
+ jobsKilled.incr();
+ }
+
+ public void preparingJob(Job job) {
+ jobsPreparing.incr();
+ }
+
+ public void endPreparingJob(Job job) {
+ jobsPreparing.decr();
+ }
+
+ public void runningJob(Job job) {
+ jobsRunning.incr();
+ }
+
+ public void endRunningJob(Job job) {
+ jobsRunning.decr();
+ }
+
+ public void launchedTask(Task task) {
+ switch (task.getType()) {
+ case MAP:
+ mapsLaunched.incr();
+ break;
+ case REDUCE:
+ reducesLaunched.incr();
+ break;
+ }
+ endWaitingTask(task);
+ }
+
+ public void completedTask(Task task) {
+ switch (task.getType()) {
+ case MAP:
+ mapsCompleted.incr();
+ break;
+ case REDUCE:
+ reducesCompleted.incr();
+ break;
+ }
+ }
+
+ public void failedTask(Task task) {
+ switch (task.getType()) {
+ case MAP:
+ mapsFailed.incr();
+ break;
+ case REDUCE:
+ reducesFailed.incr();
+ break;
+ }
+ }
+
+ public void killedTask(Task task) {
+ switch (task.getType()) {
+ case MAP:
+ mapsKilled.incr();
+ break;
+ case REDUCE:
+ reducesKilled.incr();
+ break;
+ }
+ }
+
+ public void runningTask(Task task) {
+ switch (task.getType()) {
+ case MAP:
+ mapsRunning.incr();
+ break;
+ case REDUCE:
+ reducesRunning.incr();
+ break;
+ }
+ }
+
+ public void endRunningTask(Task task) {
+ switch (task.getType()) {
+ case MAP:
+ mapsRunning.decr();
+ break;
+ case REDUCE:
+ reducesRunning.decr();
+ break;
+ }
+ }
+
+ public void waitingTask(Task task) {
+ switch (task.getType()) {
+ case MAP:
+ mapsWaiting.incr();
+ break;
+ case REDUCE:
+ reducesWaiting.incr();
+ }
+ }
+
+ public void endWaitingTask(Task task) {
+ switch (task.getType()) {
+ case MAP:
+ mapsWaiting.decr();
+ break;
+ case REDUCE:
+ reducesWaiting.decr();
+ break;
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Thu May 26 02:46:11 2011
@@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
@@ -79,9 +80,8 @@ import org.apache.hadoop.yarn.state.Stat
* No threads are started except of the event Dispatcher thread.
*/
public class MRApp extends MRAppMaster {
-
private static final Log LOG = LogFactory.getLog(MRApp.class);
-
+
int maps;
int reduces;
@@ -90,7 +90,7 @@ public class MRApp extends MRAppMaster {
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
-
+
//if true, tasks complete automatically as soon as they are launched
protected boolean autoComplete = false;
@@ -134,6 +134,7 @@ public class MRApp extends MRAppMaster {
init(conf);
start();
+ DefaultMetricsSystem.shutdown();
Job job = getContext().getAllJobs().values().iterator().next();
return job;
}
@@ -359,7 +360,7 @@ public class MRApp extends MRAppMaster {
TaskAttemptListener taskAttemptListener, Clock clock) {
super(appID, new Configuration(), eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock, getStartCount(),
- getCompletedTaskFromPreviousRun());
+ getCompletedTaskFromPreviousRun(), metrics);
// This "this leak" is okay because the retained pointer is in an
// instance variable.
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1127764&r1=1127763&r2=1127764&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Thu May 26 02:46:11 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
@@ -74,12 +75,19 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestRMContainerAllocator {
private static final Log LOG = LogFactory.getLog(TestRMContainerAllocator.class);
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ @BeforeClass
+ public static void preTests() {
+ DefaultMetricsSystem.shutdown();
+ }
+
@Test
public void testSimple() throws Exception {
FifoScheduler scheduler = createScheduler();
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/metrics/TestMRAppMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/metrics/TestMRAppMetrics.java?rev=1127764&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/metrics/TestMRAppMetrics.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/metrics/TestMRAppMetrics.java Thu May 26 02:46:11 2011
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app.metrics;
+
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+
+import static org.apache.hadoop.test.MetricsAsserts.*;
+import static org.apache.hadoop.test.MockitoMaker.*;
+
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestMRAppMetrics {
+
+ @Test public void testNames() {
+ Job job = mock(Job.class);
+ Task mapTask = make(stub(Task.class).returning(TaskType.MAP).
+ from.getType());
+ Task reduceTask = make(stub(Task.class).returning(TaskType.REDUCE).
+ from.getType());
+ MRAppMetrics metrics = MRAppMetrics.create();
+
+ metrics.submittedJob(job);
+ metrics.waitingTask(mapTask);
+ metrics.waitingTask(reduceTask);
+ metrics.preparingJob(job);
+ metrics.submittedJob(job);
+ metrics.waitingTask(mapTask);
+ metrics.waitingTask(reduceTask);
+ metrics.preparingJob(job);
+ metrics.submittedJob(job);
+ metrics.waitingTask(mapTask);
+ metrics.waitingTask(reduceTask);
+ metrics.preparingJob(job);
+ metrics.endPreparingJob(job);
+ metrics.endPreparingJob(job);
+ metrics.endPreparingJob(job);
+
+ metrics.runningJob(job);
+ metrics.launchedTask(mapTask);
+ metrics.runningTask(mapTask);
+ metrics.failedTask(mapTask);
+ metrics.endWaitingTask(reduceTask);
+ metrics.endRunningTask(mapTask);
+ metrics.endRunningJob(job);
+ metrics.failedJob(job);
+
+ metrics.runningJob(job);
+ metrics.launchedTask(mapTask);
+ metrics.runningTask(mapTask);
+ metrics.killedTask(mapTask);
+ metrics.endWaitingTask(reduceTask);
+ metrics.endRunningTask(mapTask);
+ metrics.endRunningJob(job);
+ metrics.killedJob(job);
+
+ metrics.runningJob(job);
+ metrics.launchedTask(mapTask);
+ metrics.runningTask(mapTask);
+ metrics.completedTask(mapTask);
+ metrics.endRunningTask(mapTask);
+ metrics.launchedTask(reduceTask);
+ metrics.runningTask(reduceTask);
+ metrics.completedTask(reduceTask);
+ metrics.endRunningTask(reduceTask);
+ metrics.endRunningJob(job);
+ metrics.completedJob(job);
+
+ checkMetrics(/*job*/3, 1, 1, 1, 0, 0,
+ /*map*/3, 1, 1, 1, 0, 0,
+ /*reduce*/1, 1, 0, 0, 0, 0);
+ }
+
+ private void checkMetrics(int jobsSubmitted, int jobsCompleted,
+ int jobsFailed, int jobsKilled, int jobsPreparing, int jobsRunning,
+ int mapsLaunched, int mapsCompleted, int mapsFailed, int mapsKilled,
+ int mapsRunning, int mapsWaiting, int reducesLaunched,
+ int reducesCompleted, int reducesFailed, int reducesKilled,
+ int reducesRunning, int reducesWaiting) {
+ MetricsRecordBuilder rb = getMetrics("MRAppMetrics");
+ assertCounter("JobsSubmitted", jobsSubmitted, rb);
+ assertCounter("JobsCompleted", jobsCompleted, rb);
+ assertCounter("JobsFailed", jobsFailed, rb);
+ assertCounter("JobsKilled", jobsKilled, rb);
+ assertGauge("JobsPreparing", jobsPreparing, rb);
+ assertGauge("JobsRunning", jobsRunning, rb);
+
+ assertCounter("MapsLaunched", mapsLaunched, rb);
+ assertCounter("MapsCompleted", mapsCompleted, rb);
+ assertCounter("MapsFailed", mapsFailed, rb);
+ assertCounter("MapsKilled", mapsKilled, rb);
+ assertGauge("MapsRunning", mapsRunning, rb);
+ assertGauge("MapsWaiting", mapsWaiting, rb);
+
+ assertCounter("ReducesLaunched", reducesLaunched, rb);
+ assertCounter("ReducesCompleted", reducesCompleted, rb);
+ assertCounter("ReducesFailed", reducesFailed, rb);
+ assertCounter("ReducesKilled", reducesKilled, rb);
+ assertGauge("ReducesRunning", reducesRunning, rb);
+ assertGauge("ReducesWaiting", reducesWaiting, rb);
+ }
+}