You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2012/03/10 00:29:22 UTC

svn commit: r1299099 - in /hadoop/common/branches/branch-1.0: CHANGES.txt src/mapred/org/apache/hadoop/mapred/QueueMetrics.java src/mapred/org/apache/hadoop/mapred/TimeBucketMetrics.java

Author: acmurthy
Date: Fri Mar  9 23:29:22 2012
New Revision: 1299099

URL: http://svn.apache.org/viewvc?rev=1299099&view=rev
Log:
Merge -c 1299098 from branch-1 to branch-1.0 to fix MAPREDUCE-3773. Add queue metrics with buckets for job run times. Contributed by Owen O'Malley.

Added:
    hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/TimeBucketMetrics.java
      - copied unchanged from r1299098, hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TimeBucketMetrics.java
Modified:
    hadoop/common/branches/branch-1.0/CHANGES.txt
    hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java

Modified: hadoop/common/branches/branch-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/CHANGES.txt?rev=1299099&r1=1299098&r2=1299099&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.0/CHANGES.txt Fri Mar  9 23:29:22 2012
@@ -9,6 +9,9 @@ Release 1.0.2 - unreleased
 
   IMPROVEMENTS
 
+    MAPREDUCE-3773. Add queue metrics with buckets for job run times. (omalley
+    via acmurthy)
+
   BUG FIXES
 
     HADOOP-8050. Deadlock in metrics. (Kihwal Lee via mattf)

Modified: hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java?rev=1299099&r1=1299098&r2=1299099&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java (original)
+++ hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java Fri Mar  9 23:29:22 2012
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapred;
 
+import java.util.ArrayList;
+
 import org.apache.hadoop.metrics2.MetricsBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
 import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
@@ -36,9 +38,14 @@ import org.apache.hadoop.metrics2.lib.De
  */
 @SuppressWarnings("deprecation")
 class QueueMetrics implements MetricsSource {
+
   private static final Log LOG =
     LogFactory.getLog(QueueMetrics.class);
 
+  public static final String BUCKET_PROPERTY = 
+    "mapred.queue.metrics.runtime.buckets";
+  private static final String DEFAULT_BUCKETS = "60,300,1440";
+
   final MetricsRegistry registry = new MetricsRegistry("Queue");
   final MetricMutableCounterInt mapsLaunched =
       registry.newCounter("maps_launched", "", 0);
@@ -76,6 +83,8 @@ class QueueMetrics implements MetricsSou
       registry.newCounter("maps_killed", "", 0);
   final MetricMutableCounterInt redsKilled =
       registry.newCounter("reduces_killed", "", 0);
+  final MetricMutableGaugeInt[] runningTime;
+  TimeBucketMetrics<JobID> runBuckets;
 
   final String sessionId;
   private String queueName;
@@ -85,13 +94,45 @@ class QueueMetrics implements MetricsSou
     sessionId = conf.get("session.id", "");
     registry.setContext("mapred").tag("sessionId", "", sessionId);
     registry.tag("Queue", "Metrics by queue", queueName);
+    runningTime = buildBuckets(conf);
   }
 
   public String getQueueName() {
     return this.queueName;
   }
 
+  private static ArrayList<Integer> parseInts(String value) {
+    ArrayList<Integer> result = new ArrayList<Integer>();
+    for(String word: value.split(",")) {
+      result.add(Integer.parseInt(word.trim()));
+    }
+    return result;
+  }
+
+  private MetricMutableGaugeInt[] buildBuckets(Configuration conf) {
+    ArrayList<Integer> buckets = 
+      parseInts(conf.get(BUCKET_PROPERTY, DEFAULT_BUCKETS));
+    MetricMutableGaugeInt[] result = 
+      new MetricMutableGaugeInt[buckets.size() + 1];
+    result[0] = registry.newGauge("running_0", "", 0);
+    long[] cuts = new long[buckets.size()];
+    for(int i=0; i < buckets.size(); ++i) {
+      result[i+1] = registry.newGauge("running_" + buckets.get(i), "", 0);
+      cuts[i] = buckets.get(i) * 1000 * 60; // covert from min to ms
+    }
+    this.runBuckets = new TimeBucketMetrics<JobID>(cuts);
+    return result;
+  }
+
+  private void updateRunningTime() {
+    int[] counts = runBuckets.getBucketCounts(System.currentTimeMillis());
+    for(int i=0; i < counts.length; ++i) {
+      runningTime[i].set(counts[i]); 
+    }
+  }
+
   public void getMetrics(MetricsBuilder builder, boolean all) {
+    updateRunningTime();
     registry.snapshot(builder.addRecord(registry.name()), all);
   }
 
@@ -181,10 +222,12 @@ class QueueMetrics implements MetricsSou
 
   public void addRunningJob(JobConf conf, JobID id) {
     jobsRunning.incr();
+    runBuckets.add(id, System.currentTimeMillis());
   }
 
   public void decRunningJob(JobConf conf, JobID id) {
     jobsRunning.decr();
+    runBuckets.remove(id);
   }
 
   public void killedMap(TaskAttemptID taskAttemptID) {