You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2012/03/10 00:28:10 UTC
svn commit: r1299098 - in /hadoop/common/branches/branch-1: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
src/mapred/org/apache/hadoop/mapred/TimeBucketMetrics.java
Author: acmurthy
Date: Fri Mar 9 23:28:10 2012
New Revision: 1299098
URL: http://svn.apache.org/viewvc?rev=1299098&view=rev
Log:
MAPREDUCE-3773. Add queue metrics with buckets for job run times. Contributed by Owen O'Malley.
Added:
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TimeBucketMetrics.java
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1299098&r1=1299097&r2=1299098&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Fri Mar 9 23:28:10 2012
@@ -168,6 +168,9 @@ Release 1.0.2 - unreleased
IMPROVEMENTS
+ MAPREDUCE-3773. Add queue metrics with buckets for job run times. (omalley
+ via acmurthy)
+
BUG FIXES
HADOOP-8050. Deadlock in metrics. (Kihwal Lee via mattf)
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java?rev=1299098&r1=1299097&r2=1299098&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java Fri Mar 9 23:28:10 2012
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapred;
+import java.util.ArrayList;
+
import org.apache.hadoop.metrics2.MetricsBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
@@ -36,9 +38,14 @@ import org.apache.hadoop.metrics2.lib.De
*/
@SuppressWarnings("deprecation")
class QueueMetrics implements MetricsSource {
+
private static final Log LOG =
LogFactory.getLog(QueueMetrics.class);
+ public static final String BUCKET_PROPERTY =
+ "mapred.queue.metrics.runtime.buckets";
+ private static final String DEFAULT_BUCKETS = "60,300,1440";
+
final MetricsRegistry registry = new MetricsRegistry("Queue");
final MetricMutableCounterInt mapsLaunched =
registry.newCounter("maps_launched", "", 0);
@@ -76,6 +83,8 @@ class QueueMetrics implements MetricsSou
registry.newCounter("maps_killed", "", 0);
final MetricMutableCounterInt redsKilled =
registry.newCounter("reduces_killed", "", 0);
+ final MetricMutableGaugeInt[] runningTime;
+ TimeBucketMetrics<JobID> runBuckets;
final String sessionId;
private String queueName;
@@ -85,13 +94,45 @@ class QueueMetrics implements MetricsSou
sessionId = conf.get("session.id", "");
registry.setContext("mapred").tag("sessionId", "", sessionId);
registry.tag("Queue", "Metrics by queue", queueName);
+ runningTime = buildBuckets(conf);
}
public String getQueueName() {
return this.queueName;
}
+ private static ArrayList<Integer> parseInts(String value) {
+ ArrayList<Integer> result = new ArrayList<Integer>();
+ for(String word: value.split(",")) {
+ result.add(Integer.parseInt(word.trim()));
+ }
+ return result;
+ }
+
+ private MetricMutableGaugeInt[] buildBuckets(Configuration conf) {
+ ArrayList<Integer> buckets =
+ parseInts(conf.get(BUCKET_PROPERTY, DEFAULT_BUCKETS));
+ MetricMutableGaugeInt[] result =
+ new MetricMutableGaugeInt[buckets.size() + 1];
+ result[0] = registry.newGauge("running_0", "", 0);
+ long[] cuts = new long[buckets.size()];
+ for(int i=0; i < buckets.size(); ++i) {
+ result[i+1] = registry.newGauge("running_" + buckets.get(i), "", 0);
+ cuts[i] = buckets.get(i) * 1000 * 60; // covert from min to ms
+ }
+ this.runBuckets = new TimeBucketMetrics<JobID>(cuts);
+ return result;
+ }
+
+ private void updateRunningTime() {
+ int[] counts = runBuckets.getBucketCounts(System.currentTimeMillis());
+ for(int i=0; i < counts.length; ++i) {
+ runningTime[i].set(counts[i]);
+ }
+ }
+
public void getMetrics(MetricsBuilder builder, boolean all) {
+ updateRunningTime();
registry.snapshot(builder.addRecord(registry.name()), all);
}
@@ -181,10 +222,12 @@ class QueueMetrics implements MetricsSou
public void addRunningJob(JobConf conf, JobID id) {
jobsRunning.incr();
+ runBuckets.add(id, System.currentTimeMillis());
}
public void decRunningJob(JobConf conf, JobID id) {
jobsRunning.decr();
+ runBuckets.remove(id);
}
public void killedMap(TaskAttemptID taskAttemptID) {
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TimeBucketMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TimeBucketMetrics.java?rev=1299098&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TimeBucketMetrics.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TimeBucketMetrics.java Fri Mar 9 23:28:10 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.HashMap;
+
+/**
+ * Create a set of buckets that hold key-time pairs. When the values of the
+ * buckets is queried, the number of objects with time differences in the
+ * different buckets is returned.
+ */
+class TimeBucketMetrics<OBJ> {
+
+ private final HashMap<OBJ, Long> map = new HashMap<OBJ, Long>();
+ private final int[] counts;
+ private final long[] cuts;
+
+ /**
+ * Create a set of buckets based on a set of time points. The number of
+ * buckets is one more than the number of points.
+ */
+ TimeBucketMetrics(long[] cuts) {
+ this.cuts = cuts;
+ counts = new int[cuts.length + 1];
+ }
+
+ /**
+ * Add an object to be counted
+ */
+ synchronized void add(OBJ key, long time) {
+ map.put(key, time);
+ }
+
+ /**
+ * Remove an object to be counted
+ */
+ synchronized void remove(OBJ key) {
+ map.remove(key);
+ }
+
+ /**
+ * Find the bucket based on the cut points.
+ */
+ private int findBucket(long val) {
+ for(int i=0; i < cuts.length; ++i) {
+ if (val < cuts[i]) {
+ return i;
+ }
+ }
+ return cuts.length;
+ }
+
+ /**
+ * Get the counts of how many keys are in each bucket. The same array is
+ * returned by each call to this method.
+ */
+ synchronized int[] getBucketCounts(long now) {
+ for(int i=0; i < counts.length; ++i) {
+ counts[i] = 0;
+ }
+ for(Long time: map.values()) {
+ counts[findBucket(now - time)] += 1;
+ }
+ return counts;
+ }
+}
\ No newline at end of file