You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 06:54:34 UTC
svn commit: r1079196 - in /hadoop/mapreduce/branches/yahoo-merge: ./ ivy/
src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/mapreduce/task/reduce/
src/test/mapred/org/apache/hadoop/mapred/
Author: omalley
Date: Tue Mar 8 05:54:33 2011
New Revision: 1079196
URL: http://svn.apache.org/viewvc?rev=1079196&view=rev
Log:
commit 2450b8f6c6400a1d9fd158f9f380517cad2e7a1a
Author: Luke Lu <ll...@yahoo-inc.com>
Date: Fri Nov 12 09:30:02 2010 -0800
MAPREDUCE:1738 Metrics 2.0 MapReduce instrumentation
See for patches/reviews.
Modified:
hadoop/mapreduce/branches/yahoo-merge/ivy.xml
hadoop/mapreduce/branches/yahoo-merge/ivy/libraries.properties
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Child.java
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java
Modified: hadoop/mapreduce/branches/yahoo-merge/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/ivy.xml?rev=1079196&r1=1079195&r2=1079196&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/ivy.xml (original)
+++ hadoop/mapreduce/branches/yahoo-merge/ivy.xml Tue Mar 8 05:54:33 2011
@@ -103,8 +103,6 @@
<exclude module="jetty"/>
<exclude module="slf4j-simple"/>
</dependency>
- <dependency org="org.mockito" name="mockito-all" rev="${mockito-all.version}"
- conf="test->default"/>
<!-- dependency addition for the fault injection -->
<dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}"
conf="common->default"/>
Modified: hadoop/mapreduce/branches/yahoo-merge/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/ivy/libraries.properties?rev=1079196&r1=1079195&r2=1079196&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/ivy/libraries.properties (original)
+++ hadoop/mapreduce/branches/yahoo-merge/ivy/libraries.properties Tue Mar 8 05:54:33 2011
@@ -64,8 +64,6 @@ lucene-core.version=2.3.1
mina-core.version=2.0.0-M5
-mockito-all.version=1.8.2
-
oro.version=2.0.8
rats-lib.version=0.6
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Child.java?rev=1079196&r1=1079195&r2=1079196&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Child.java Tue Mar 8 05:54:33 2011
@@ -31,16 +31,14 @@ import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
@@ -74,7 +72,10 @@ class Child {
int jvmIdInt = Integer.parseInt(args[4]);
JVMId jvmId = new JVMId(firstTaskid.getJobID(),
firstTaskid.getTaskType() == TaskType.MAP,jvmIdInt);
-
+
+ DefaultMetricsSystem.initialize(
+ StringUtils.camelize(firstTaskid.getTaskType().name()) +"Task");
+
//load token cache storage
String jobTokenFile =
System.getenv().get(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
@@ -204,7 +205,7 @@ class Child {
task.setConf(job);
// Initiate Java VM metrics
- JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
+ JvmMetrics.initSingleton(jvmId.toString(), job.getSessionId());
LOG.debug("Creating remote user to execute task: " + job.get("user.name"));
childUGI = UserGroupInformation.createRemoteUser(job.get("user.name"));
// Add tokens to new user so that it may execute its task correctly.
@@ -276,8 +277,7 @@ class Child {
}
} finally {
RPC.stopProxy(umbilical);
- MetricsContext metricsContext = MetricsUtil.getContext("mapred");
- metricsContext.close();
+ DefaultMetricsSystem.shutdown();
// Shutting down log4j of the child-vm...
// This assumes that on return from Task.run()
// there is no more logging done.
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=1079196&r1=1079195&r2=1079196&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java Tue Mar 8 05:54:33 2011
@@ -17,431 +17,315 @@
*/
package org.apache.hadoop.mapred;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.Updater;
-import org.apache.hadoop.metrics.jvm.JvmMetrics;
-
-class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater {
- private final MetricsRecord metricsRecord;
-
- private int numMapTasksLaunched = 0;
- private int numMapTasksCompleted = 0;
- private int numMapTasksFailed = 0;
- private int numReduceTasksLaunched = 0;
- private int numReduceTasksCompleted = 0;
- private int numReduceTasksFailed = 0;
- private int numJobsSubmitted = 0;
- private int numJobsCompleted = 0;
- private int numWaitingMaps = 0;
- private int numWaitingReduces = 0;
- private int numSpeculativeMaps = 0;
- private int numSpeculativeReduces = 0;
- private int numDataLocalMaps = 0;
- private int numRackLocalMaps = 0;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import static org.apache.hadoop.metrics2.impl.MsInfo.*;
+
+@Metrics(name="JobTrackerMetrics", context="mapred")
+class JobTrackerMetricsInst extends JobTrackerInstrumentation {
+
+ // need the registry for record name (different from metrics name) and tags
+ final MetricsRegistry registry = new MetricsRegistry("jobtracker");
+
+ @Metric MutableCounterInt mapsLaunched;
+ @Metric MutableCounterInt mapsCompleted;
+ @Metric MutableCounterInt mapsFailed;
+ @Metric MutableCounterInt mapsKilled;
+ @Metric MutableCounterInt reducesLaunched;
+ @Metric MutableCounterInt reducesCompleted;
+ @Metric MutableCounterInt reducesFailed;
+ @Metric MutableCounterInt reducesKilled;
+
+ @Metric MutableCounterInt jobsSubmitted;
+ @Metric MutableCounterInt jobsCompleted;
+ @Metric MutableCounterInt jobsFailed;
+ @Metric MutableCounterInt jobsKilled;
+ @Metric MutableGaugeInt jobsPreparing;
+ @Metric MutableGaugeInt jobsRunning;
+
+ @Metric MutableGaugeInt waitingMaps;
+ @Metric MutableGaugeInt waitingReduces;
+ @Metric MutableGaugeInt runningMaps;
+ @Metric MutableGaugeInt runningReduces;
+ @Metric MutableCounterInt speculativeMaps;
+ @Metric MutableCounterInt speculativeReduces;
+ @Metric MutableCounterInt dataLocalMaps;
+ @Metric MutableCounterInt rackLocalMaps;
//Cluster status fields.
- private volatile int numMapSlots = 0;
- private volatile int numReduceSlots = 0;
- private int numBlackListedMapSlots = 0;
- private int numBlackListedReduceSlots = 0;
-
- private int numReservedMapSlots = 0;
- private int numReservedReduceSlots = 0;
- private int numOccupiedMapSlots = 0;
- private int numOccupiedReduceSlots = 0;
-
- private int numJobsFailed = 0;
- private int numJobsKilled = 0;
-
- private int numJobsPreparing = 0;
- private int numJobsRunning = 0;
-
- private int numRunningMaps = 0;
- private int numRunningReduces = 0;
-
- private int numMapTasksKilled = 0;
- private int numReduceTasksKilled = 0;
+ @Metric MutableGaugeInt mapSlots;
+ @Metric MutableGaugeInt reduceSlots;
+ @Metric MutableGaugeInt blackListedMapSlots;
+ @Metric MutableGaugeInt blackListedReduceSlots;
+ @Metric MutableGaugeInt reservedMapSlots;
+ @Metric MutableGaugeInt reservedReduceSlots;
+ @Metric MutableGaugeInt occupiedMapSlots;
+ @Metric MutableGaugeInt occupiedReduceSlots;
+
+ @Metric MutableGaugeInt trackers;
+ @Metric MutableGaugeInt trackersBlackListed;
+ @Metric MutableGaugeInt trackersDecommissioned;
- private int numTrackers = 0;
- private int numTrackersBlackListed = 0;
- private int numTrackersDecommissioned = 0;
-
- // long, because 2^31 could well be only about a month's worth of
- // heartbeats, with reasonable assumptions and JobTracker improvements.
- private long numHeartbeats = 0L;
+ @Metric MutableCounterLong heartbeats;
public JobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
super(tracker, conf);
String sessionId = conf.getSessionId();
- // Initiate JVM Metrics
- JvmMetrics.init("JobTracker", sessionId);
- // Create a record for map-reduce metrics
- MetricsContext context = MetricsUtil.getContext("mapred");
- metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
- metricsRecord.setTag("sessionId", sessionId);
- context.registerUpdater(this);
- }
-
- /**
- * Since this object is a registered updater, this method will be called
- * periodically, e.g. every 5 seconds.
- */
- public void doUpdates(MetricsContext unused) {
- synchronized (this) {
- metricsRecord.setMetric("map_slots", numMapSlots);
- metricsRecord.setMetric("reduce_slots", numReduceSlots);
- metricsRecord.incrMetric("blacklisted_maps", numBlackListedMapSlots);
- metricsRecord.incrMetric("blacklisted_reduces",
- numBlackListedReduceSlots);
- metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
- metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
- metricsRecord.incrMetric("maps_failed", numMapTasksFailed);
- metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
- metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
- metricsRecord.incrMetric("reduces_failed", numReduceTasksFailed);
- metricsRecord.incrMetric("jobs_submitted", numJobsSubmitted);
- metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
- metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
- metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
- metricsRecord.incrMetric("speculative_maps", numSpeculativeMaps);
- metricsRecord.incrMetric("speculative_reduces", numSpeculativeReduces);
- metricsRecord.incrMetric("datalocal_maps", numDataLocalMaps);
- metricsRecord.incrMetric("racklocal_maps", numRackLocalMaps);
-
- metricsRecord.incrMetric("reserved_map_slots", numReservedMapSlots);
- metricsRecord.incrMetric("reserved_reduce_slots", numReservedReduceSlots);
- metricsRecord.incrMetric("occupied_map_slots", numOccupiedMapSlots);
- metricsRecord.incrMetric("occupied_reduce_slots", numOccupiedReduceSlots);
-
- metricsRecord.incrMetric("jobs_failed", numJobsFailed);
- metricsRecord.incrMetric("jobs_killed", numJobsKilled);
-
- metricsRecord.incrMetric("jobs_preparing", numJobsPreparing);
- metricsRecord.incrMetric("jobs_running", numJobsRunning);
-
- metricsRecord.incrMetric("running_maps", numRunningMaps);
- metricsRecord.incrMetric("running_reduces", numRunningReduces);
-
- metricsRecord.incrMetric("maps_killed", numMapTasksKilled);
- metricsRecord.incrMetric("reduces_killed", numReduceTasksKilled);
-
- metricsRecord.incrMetric("trackers", numTrackers);
- metricsRecord.incrMetric("trackers_blacklisted", numTrackersBlackListed);
- metricsRecord.setMetric("trackers_decommissioned",
- numTrackersDecommissioned);
-
- metricsRecord.incrMetric("heartbeats", numHeartbeats);
-
- numMapTasksLaunched = 0;
- numMapTasksCompleted = 0;
- numMapTasksFailed = 0;
- numReduceTasksLaunched = 0;
- numReduceTasksCompleted = 0;
- numReduceTasksFailed = 0;
- numJobsSubmitted = 0;
- numJobsCompleted = 0;
- numWaitingMaps = 0;
- numWaitingReduces = 0;
- numBlackListedMapSlots = 0;
- numBlackListedReduceSlots = 0;
- numSpeculativeMaps = 0;
- numSpeculativeReduces = 0;
- numDataLocalMaps = 0;
- numRackLocalMaps = 0;
-
- numReservedMapSlots = 0;
- numReservedReduceSlots = 0;
- numOccupiedMapSlots = 0;
- numOccupiedReduceSlots = 0;
-
- numJobsFailed = 0;
- numJobsKilled = 0;
-
- numJobsPreparing = 0;
- numJobsRunning = 0;
-
- numRunningMaps = 0;
- numRunningReduces = 0;
-
- numMapTasksKilled = 0;
- numReduceTasksKilled = 0;
-
- numTrackers = 0;
- numTrackersBlackListed = 0;
-
- numHeartbeats = 0L;
- }
- metricsRecord.update();
+ // Ideally we should the registering in an init method.
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ JvmMetrics.create("JobTracker", sessionId, ms);
+ registry.tag(SessionId, sessionId);
+ ms.register(this);
}
@Override
public synchronized void launchMap(TaskAttemptID taskAttemptID) {
- ++numMapTasksLaunched;
+ mapsLaunched.incr();
decWaitingMaps(taskAttemptID.getJobID(), 1);
}
@Override
public synchronized void launchDataLocalMap(TaskAttemptID taskAttemptID) {
- ++numDataLocalMaps;
+ dataLocalMaps.incr();
}
@Override
public synchronized void launchRackLocalMap(TaskAttemptID taskAttemptID) {
- ++numRackLocalMaps;
+ rackLocalMaps.incr();
}
@Override
public synchronized void completeMap(TaskAttemptID taskAttemptID) {
- ++numMapTasksCompleted;
+ mapsCompleted.incr();
}
@Override
public synchronized void failedMap(TaskAttemptID taskAttemptID) {
- ++numMapTasksFailed;
+ mapsFailed.incr();
addWaitingMaps(taskAttemptID.getJobID(), 1);
}
@Override
public synchronized void speculateMap(TaskAttemptID taskAttemptID) {
- ++numSpeculativeMaps;
+ speculativeMaps.incr();
}
@Override
public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
- ++numReduceTasksLaunched;
+ reducesLaunched.incr();
decWaitingReduces(taskAttemptID.getJobID(), 1);
}
@Override
public synchronized void completeReduce(TaskAttemptID taskAttemptID) {
- ++numReduceTasksCompleted;
+ reducesCompleted.incr();
}
@Override
public synchronized void failedReduce(TaskAttemptID taskAttemptID) {
- ++numReduceTasksFailed;
+ reducesFailed.incr();
addWaitingReduces(taskAttemptID.getJobID(), 1);
}
@Override
public synchronized void speculateReduce(TaskAttemptID taskAttemptID) {
- ++numSpeculativeReduces;
+ speculativeReduces.incr();
}
@Override
public synchronized void submitJob(JobConf conf, JobID id) {
- ++numJobsSubmitted;
+ jobsSubmitted.incr();
}
@Override
public synchronized void completeJob(JobConf conf, JobID id) {
- ++numJobsCompleted;
+ jobsCompleted.incr();
}
@Override
public synchronized void addWaitingMaps(JobID id, int task) {
- numWaitingMaps += task;
+ waitingMaps.incr(task);
}
@Override
public synchronized void decWaitingMaps(JobID id, int task) {
- numWaitingMaps -= task;
+ waitingMaps.decr(task);
}
@Override
public synchronized void addWaitingReduces(JobID id, int task) {
- numWaitingReduces += task;
+ waitingReduces.incr(task);
}
@Override
public synchronized void decWaitingReduces(JobID id, int task){
- numWaitingReduces -= task;
+ waitingReduces.decr(task);
}
@Override
public synchronized void setMapSlots(int slots) {
- numMapSlots = slots;
+ mapSlots.set(slots);
}
@Override
public synchronized void setReduceSlots(int slots) {
- numReduceSlots = slots;
+ reduceSlots.set(slots);
}
@Override
- public synchronized void addBlackListedMapSlots(int slots){
- numBlackListedMapSlots += slots;
+ public synchronized void addBlackListedMapSlots(int slots) {
+ blackListedMapSlots.incr(slots);
}
@Override
- public synchronized void decBlackListedMapSlots(int slots){
- numBlackListedMapSlots -= slots;
+ public synchronized void decBlackListedMapSlots(int slots) {
+ blackListedMapSlots.decr(slots);
}
@Override
- public synchronized void addBlackListedReduceSlots(int slots){
- numBlackListedReduceSlots += slots;
+ public synchronized void addBlackListedReduceSlots(int slots) {
+ blackListedReduceSlots.incr(slots);
}
@Override
- public synchronized void decBlackListedReduceSlots(int slots){
- numBlackListedReduceSlots -= slots;
+ public synchronized void decBlackListedReduceSlots(int slots) {
+ blackListedReduceSlots.decr(slots);
}
@Override
- public synchronized void addReservedMapSlots(int slots)
- {
- numReservedMapSlots += slots;
+ public synchronized void addReservedMapSlots(int slots) {
+ reservedMapSlots.incr(slots);
}
@Override
- public synchronized void decReservedMapSlots(int slots)
- {
- numReservedMapSlots -= slots;
+ public synchronized void decReservedMapSlots(int slots) {
+ reservedMapSlots.decr(slots);
}
@Override
- public synchronized void addReservedReduceSlots(int slots)
- {
- numReservedReduceSlots += slots;
+ public synchronized void addReservedReduceSlots(int slots) {
+ reservedReduceSlots.incr(slots);
}
@Override
- public synchronized void decReservedReduceSlots(int slots)
- {
- numReservedReduceSlots -= slots;
+ public synchronized void decReservedReduceSlots(int slots) {
+ reservedReduceSlots.decr(slots);
}
@Override
- public synchronized void addOccupiedMapSlots(int slots)
- {
- numOccupiedMapSlots += slots;
+ public synchronized void addOccupiedMapSlots(int slots) {
+ occupiedMapSlots.incr(slots);
}
@Override
- public synchronized void decOccupiedMapSlots(int slots)
- {
- numOccupiedMapSlots -= slots;
+ public synchronized void decOccupiedMapSlots(int slots) {
+ occupiedMapSlots.decr(slots);
}
@Override
- public synchronized void addOccupiedReduceSlots(int slots)
- {
- numOccupiedReduceSlots += slots;
+ public synchronized void addOccupiedReduceSlots(int slots) {
+ occupiedReduceSlots.incr(slots);
}
@Override
- public synchronized void decOccupiedReduceSlots(int slots)
- {
- numOccupiedReduceSlots -= slots;
+ public synchronized void decOccupiedReduceSlots(int slots) {
+ occupiedReduceSlots.decr(slots);
}
@Override
- public synchronized void failedJob(JobConf conf, JobID id)
- {
- numJobsFailed++;
+ public synchronized void failedJob(JobConf conf, JobID id) {
+ jobsFailed.incr();
}
@Override
- public synchronized void killedJob(JobConf conf, JobID id)
- {
- numJobsKilled++;
+ public synchronized void killedJob(JobConf conf, JobID id) {
+ jobsKilled.incr();
}
@Override
- public synchronized void addPrepJob(JobConf conf, JobID id)
- {
- numJobsPreparing++;
+ public synchronized void addPrepJob(JobConf conf, JobID id) {
+ jobsPreparing.incr();
}
@Override
- public synchronized void decPrepJob(JobConf conf, JobID id)
- {
- numJobsPreparing--;
+ public synchronized void decPrepJob(JobConf conf, JobID id) {
+ jobsPreparing.decr();
}
@Override
- public synchronized void addRunningJob(JobConf conf, JobID id)
- {
- numJobsRunning++;
+ public synchronized void addRunningJob(JobConf conf, JobID id) {
+ jobsRunning.incr();
}
@Override
- public synchronized void decRunningJob(JobConf conf, JobID id)
- {
- numJobsRunning--;
+ public synchronized void decRunningJob(JobConf conf, JobID id) {
+ jobsRunning.decr();
}
@Override
- public synchronized void addRunningMaps(int task)
- {
- numRunningMaps += task;
+ public synchronized void addRunningMaps(int task) {
+ runningMaps.incr(task);
}
@Override
- public synchronized void decRunningMaps(int task)
- {
- numRunningMaps -= task;
+ public synchronized void decRunningMaps(int task) {
+ runningMaps.decr(task);
}
@Override
- public synchronized void addRunningReduces(int task)
- {
- numRunningReduces += task;
+ public synchronized void addRunningReduces(int task) {
+ runningReduces.incr(task);
}
@Override
- public synchronized void decRunningReduces(int task)
- {
- numRunningReduces -= task;
+ public synchronized void decRunningReduces(int task) {
+ runningReduces.decr(task);
}
@Override
- public synchronized void killedMap(TaskAttemptID taskAttemptID)
- {
- numMapTasksKilled++;
+ public synchronized void killedMap(TaskAttemptID taskAttemptID) {
+ mapsKilled.incr();
}
@Override
- public synchronized void killedReduce(TaskAttemptID taskAttemptID)
- {
- numReduceTasksKilled++;
+ public synchronized void killedReduce(TaskAttemptID taskAttemptID) {
+ reducesKilled.incr();
}
@Override
- public synchronized void addTrackers(int trackers)
- {
- numTrackers += trackers;
+ public synchronized void addTrackers(int trackers) {
+ this.trackers.incr(trackers);
}
@Override
- public synchronized void decTrackers(int trackers)
- {
- numTrackers -= trackers;
+ public synchronized void decTrackers(int trackers) {
+ this.trackers.decr(trackers);
}
@Override
- public synchronized void addBlackListedTrackers(int trackers)
- {
- numTrackersBlackListed += trackers;
+ public synchronized void addBlackListedTrackers(int trackers) {
+ trackersBlackListed.incr(trackers);
}
@Override
- public synchronized void decBlackListedTrackers(int trackers)
- {
- numTrackersBlackListed -= trackers;
+ public synchronized void decBlackListedTrackers(int trackers) {
+ trackersBlackListed.decr(trackers);
}
@Override
- public synchronized void setDecommissionedTrackers(int trackers)
- {
- numTrackersDecommissioned = trackers;
+ public synchronized void setDecommissionedTrackers(int trackers) {
+ trackersDecommissioned.set(trackers);
}
@Override
public synchronized void heartbeat() {
- ++numHeartbeats;
+ heartbeats.incr();
}
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1079196&r1=1079195&r2=1079196&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Mar 8 05:54:33 2011
@@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.serve
import org.apache.hadoop.mapreduce.server.jobtracker.State;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
@@ -85,6 +86,8 @@ public class LocalJobRunner implements C
private static final Counters EMPTY_COUNTERS = new Counters();
+ { DefaultMetricsSystem.setMiniClusterMode(true); }
+
public long getProtocolVersion(String protocol, long clientVersion) {
return ClientProtocol.versionID;
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1079196&r1=1079195&r2=1079196&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Mar 8 05:54:33 2011
@@ -91,11 +91,13 @@ import org.apache.hadoop.mapreduce.secur
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsException;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import static org.apache.hadoop.metrics2.impl.MsInfo.*;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
@@ -322,18 +324,17 @@ public class TaskTracker
* the specific metrics for shuffle. The TaskTracker is actually a server for
* the shuffle and hence the name ShuffleServerMetrics.
*/
- private class ShuffleServerMetrics implements Updater {
- private MetricsRecord shuffleMetricsRecord = null;
- private int serverHandlerBusy = 0;
- private long outputBytes = 0;
- private int failedOutputs = 0;
- private int successOutputs = 0;
+ @Metrics(context="mapred")
+ private class ShuffleServerMetrics {
+ final MetricsRegistry registry = new MetricsRegistry("shuffleOutput");
+ private int serverHandlerBusy;
+ @Metric MutableCounterLong shuffleOutputBytes;
+ @Metric MutableCounterInt shuffleFailedOutputs;
+ @Metric MutableCounterInt shuffleSuccessOutputs;
+
ShuffleServerMetrics(JobConf conf) {
- MetricsContext context = MetricsUtil.getContext("mapred");
- shuffleMetricsRecord =
- MetricsUtil.createRecord(context, "shuffleOutput");
- this.shuffleMetricsRecord.setTag("sessionId", conf.getSessionId());
- context.registerUpdater(this);
+ registry.tag(SessionId, conf.getSessionId());
+ DefaultMetricsSystem.instance().register(this);
}
synchronized void serverHandlerBusy() {
++serverHandlerBusy;
@@ -341,41 +342,20 @@ public class TaskTracker
synchronized void serverHandlerFree() {
--serverHandlerBusy;
}
- synchronized void outputBytes(long bytes) {
- outputBytes += bytes;
+ void outputBytes(long bytes) {
+ shuffleOutputBytes.incr(bytes);
}
- synchronized void failedOutput() {
- ++failedOutputs;
+ void failedOutput() {
+ shuffleFailedOutputs.incr();
}
- synchronized void successOutput() {
- ++successOutputs;
+ void successOutput() {
+ shuffleSuccessOutputs.incr();
}
- public void doUpdates(MetricsContext unused) {
- synchronized (this) {
- if (workerThreads != 0) {
- shuffleMetricsRecord.setMetric("shuffle_handler_busy_percent",
- 100*((float)serverHandlerBusy/workerThreads));
- } else {
- shuffleMetricsRecord.setMetric("shuffle_handler_busy_percent", 0);
- }
- shuffleMetricsRecord.incrMetric("shuffle_output_bytes",
- outputBytes);
- shuffleMetricsRecord.incrMetric("shuffle_failed_outputs",
- failedOutputs);
- shuffleMetricsRecord.incrMetric("shuffle_success_outputs",
- successOutputs);
- outputBytes = 0;
- failedOutputs = 0;
- successOutputs = 0;
- }
- shuffleMetricsRecord.update();
+ @Metric synchronized float getShuffleHandlerBusyPercent() {
+ return workerThreads == 0 ? 0f : 100f * serverHandlerBusy / workerThreads;
}
}
-
-
-
-
private TaskTrackerInstrumentation myInstrumentation = null;
public TaskTrackerInstrumentation getTaskTrackerInstrumentation() {
@@ -1705,8 +1685,8 @@ public class TaskTracker
}
try {
myInstrumentation.completeTask(taskStatus.getTaskID());
- } catch (MetricsException me) {
- LOG.warn("Caught: " + StringUtils.stringifyException(me));
+ } catch (Exception e) {
+ LOG.warn("Caught: " + StringUtils.stringifyException(e));
}
runningTasks.remove(taskStatus.getTaskID());
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java?rev=1079196&r1=1079195&r2=1079196&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java Tue Mar 8 05:54:33 2011
@@ -18,69 +18,49 @@
package org.apache.hadoop.mapred;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.Updater;
-import org.apache.hadoop.metrics.jvm.JvmMetrics;
-
-class TaskTrackerMetricsInst extends TaskTrackerInstrumentation
- implements Updater {
- private final MetricsRecord metricsRecord;
- private int numCompletedTasks = 0;
- private int timedoutTasks = 0;
- private int tasksFailedPing = 0;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import static org.apache.hadoop.metrics2.impl.MsInfo.*;
+
+@Metrics(name="TaskTrackerMetrics", context="mapred")
+class TaskTrackerMetricsInst extends TaskTrackerInstrumentation {
+
+ final MetricsRegistry registry = new MetricsRegistry("tasktracker");
+ @Metric MutableCounterInt tasksCompleted;
+ @Metric MutableCounterInt tasksFailedTimedout;
+ @Metric MutableCounterInt tasksFailedPing;
public TaskTrackerMetricsInst(TaskTracker t) {
super(t);
- JobConf conf = tt.getJobConf();
- String sessionId = conf.getSessionId();
- // Initiate Java VM Metrics
- JvmMetrics.init("TaskTracker", sessionId);
- // Create a record for Task Tracker metrics
- MetricsContext context = MetricsUtil.getContext("mapred");
- metricsRecord = MetricsUtil.createRecord(context, "tasktracker"); //guaranteed never null
- metricsRecord.setTag("sessionId", sessionId);
- context.registerUpdater(this);
+ String sessionId = tt.getJobConf().getSessionId();
+ registry.tag(SessionId, sessionId);
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ JvmMetrics.create("TaskTracker", sessionId, ms);
+ ms.register(this);
}
@Override
- public synchronized void completeTask(TaskAttemptID t) {
- ++numCompletedTasks;
+ public void completeTask(TaskAttemptID t) {
+ tasksCompleted.incr();
}
@Override
- public synchronized void timedoutTask(TaskAttemptID t) {
- ++timedoutTasks;
+ public void timedoutTask(TaskAttemptID t) {
+ tasksFailedTimedout.incr();
}
@Override
- public synchronized void taskFailedPing(TaskAttemptID t) {
- ++tasksFailedPing;
+ public void taskFailedPing(TaskAttemptID t) {
+ tasksFailedPing.incr();
}
- /**
- * Since this object is a registered updater, this method will be called
- * periodically, e.g. every 5 seconds.
- */
- @Override
- public void doUpdates(MetricsContext unused) {
- synchronized (this) {
- metricsRecord.setMetric("maps_running", tt.mapTotal);
- metricsRecord.setMetric("reduces_running", tt.reduceTotal);
- metricsRecord.setMetric("mapTaskSlots", (short)tt.getMaxCurrentMapTasks());
- metricsRecord.setMetric("reduceTaskSlots",
- (short)tt.getMaxCurrentReduceTasks());
- metricsRecord.incrMetric("tasks_completed", numCompletedTasks);
- metricsRecord.incrMetric("tasks_failed_timeout", timedoutTasks);
- metricsRecord.incrMetric("tasks_failed_ping", tasksFailedPing);
-
- numCompletedTasks = 0;
- timedoutTasks = 0;
- tasksFailedPing = 0;
- }
- metricsRecord.update();
- }
-
-
+ @Metric int getMapsRunning() { return tt.mapTotal; }
+ @Metric int getReducesRunning() { return tt.reduceTotal; }
+ @Metric int getMapTaskSlots() { return tt.getMaxCurrentMapTasks(); }
+ @Metric int getReduceTaskSlots() { return tt.getMaxCurrentReduceTasks(); }
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java?rev=1079196&r1=1079195&r2=1079196&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java Tue Mar 8 05:54:33 2011
@@ -20,41 +20,42 @@ package org.apache.hadoop.mapreduce.task
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-class ShuffleClientMetrics implements Updater {
+@Metrics(context="mapred")
+class ShuffleClientMetrics {
+
+ final MetricsRegistry registry = new MetricsRegistry("shuffleInput");
+
+ @Metric MutableCounterInt failedFetches;
+ @Metric MutableCounterInt successFetches;
+ @Metric MutableCounterLong inputBytes;
- private MetricsRecord shuffleMetrics = null;
- private int numFailedFetches = 0;
- private int numSuccessFetches = 0;
- private long numBytes = 0;
private int numThreadsBusy = 0;
private final int numCopiers;
-
+
ShuffleClientMetrics(TaskAttemptID reduceId, JobConf jobConf) {
this.numCopiers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
-
- MetricsContext metricsContext = MetricsUtil.getContext("mapred");
- this.shuffleMetrics =
- MetricsUtil.createRecord(metricsContext, "shuffleInput");
- this.shuffleMetrics.setTag("user", jobConf.getUser());
- this.shuffleMetrics.setTag("jobName", jobConf.getJobName());
- this.shuffleMetrics.setTag("jobId", reduceId.getJobID().toString());
- this.shuffleMetrics.setTag("taskId", reduceId.toString());
- this.shuffleMetrics.setTag("sessionId", jobConf.getSessionId());
- metricsContext.registerUpdater(this);
+ registry.tag("User", "User name", jobConf.getUser())
+ .tag("JobName", "Job name", jobConf.getJobName())
+ .tag("JobId", "Job ID", reduceId.getJobID().toString())
+ .tag("TaskId", "Task ID", reduceId.toString())
+ .tag("SessionId", "Session ID", jobConf.getSessionId());
+ DefaultMetricsSystem.instance().register(this);
}
- public synchronized void inputBytes(long numBytes) {
- this.numBytes += numBytes;
+ public void inputBytes(long numBytes) {
+ inputBytes.incr(numBytes);
}
public synchronized void failedFetch() {
- ++numFailedFetches;
+ failedFetches.incr();
}
public synchronized void successFetch() {
- ++numSuccessFetches;
+ successFetches.incr();
}
public synchronized void threadBusy() {
++numThreadsBusy;
@@ -62,23 +63,7 @@ class ShuffleClientMetrics implements Up
public synchronized void threadFree() {
--numThreadsBusy;
}
- public void doUpdates(MetricsContext unused) {
- synchronized (this) {
- shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
- shuffleMetrics.incrMetric("shuffle_failed_fetches",
- numFailedFetches);
- shuffleMetrics.incrMetric("shuffle_success_fetches",
- numSuccessFetches);
- if (numCopiers != 0) {
- shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
- 100*((float)numThreadsBusy/numCopiers));
- } else {
- shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
- }
- numBytes = 0;
- numSuccessFetches = 0;
- numFailedFetches = 0;
- }
- shuffleMetrics.update();
+ @Metric float getFetchersBusyPercent() {
+ return numCopiers == 0 ? 0f : 100f * numThreadsBusy / numCopiers;
}
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1079196&r1=1079195&r2=1079196&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java Tue Mar 8 05:54:33 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.MRCon
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
@@ -65,6 +66,8 @@ public class MiniMRCluster {
private JobConf job;
private Clock clock;
+
+ { DefaultMetricsSystem.setMiniClusterMode(true); }
/**
* An inner class that runs a job tracker.
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1079196&r1=1079195&r2=1079196&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Tue Mar 8 05:54:33 2011
@@ -26,16 +26,17 @@ import java.util.Map;
import junit.framework.TestCase;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
public class TestJobQueueTaskScheduler extends TestCase {
private static int jobCounter;
private static int taskCounter;
+
+ { DefaultMetricsSystem.setMiniClusterMode(true); }
static void resetCounters() {
jobCounter = 0;
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java?rev=1079196&r1=1079195&r2=1079196&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java Tue Mar 8 05:54:33 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapred.FakeObje
import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
/**
* A test to verify JobTracker's resilience to lost task trackers.
@@ -34,6 +35,8 @@ import org.apache.hadoop.mapreduce.serve
@SuppressWarnings("deprecation")
public class TestLostTracker extends TestCase {
+ { DefaultMetricsSystem.setMiniClusterMode(true); }
+
FakeJobInProgress job;
static FakeJobTracker jobTracker;