You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2012/03/10 00:28:10 UTC

svn commit: r1299098 - in /hadoop/common/branches/branch-1: CHANGES.txt src/mapred/org/apache/hadoop/mapred/QueueMetrics.java src/mapred/org/apache/hadoop/mapred/TimeBucketMetrics.java

Author: acmurthy
Date: Fri Mar  9 23:28:10 2012
New Revision: 1299098

URL: http://svn.apache.org/viewvc?rev=1299098&view=rev
Log:
MAPREDUCE-3773. Add queue metrics with buckets for job run times. Contributed by Owen O'Malley.

Added:
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TimeBucketMetrics.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1299098&r1=1299097&r2=1299098&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Fri Mar  9 23:28:10 2012
@@ -168,6 +168,9 @@ Release 1.0.2 - unreleased
 
   IMPROVEMENTS
 
+    MAPREDUCE-3773. Add queue metrics with buckets for job run times. (omalley
+    via acmurthy)
+
   BUG FIXES
 
     HADOOP-8050. Deadlock in metrics. (Kihwal Lee via mattf)

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java?rev=1299098&r1=1299097&r2=1299098&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java Fri Mar  9 23:28:10 2012
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapred;
 
+import java.util.ArrayList;
+
 import org.apache.hadoop.metrics2.MetricsBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
 import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
@@ -36,9 +38,14 @@ import org.apache.hadoop.metrics2.lib.De
  */
 @SuppressWarnings("deprecation")
 class QueueMetrics implements MetricsSource {
+
   private static final Log LOG =
     LogFactory.getLog(QueueMetrics.class);
 
+  public static final String BUCKET_PROPERTY = 
+    "mapred.queue.metrics.runtime.buckets";
+  private static final String DEFAULT_BUCKETS = "60,300,1440";
+
   final MetricsRegistry registry = new MetricsRegistry("Queue");
   final MetricMutableCounterInt mapsLaunched =
       registry.newCounter("maps_launched", "", 0);
@@ -76,6 +83,8 @@ class QueueMetrics implements MetricsSou
       registry.newCounter("maps_killed", "", 0);
   final MetricMutableCounterInt redsKilled =
       registry.newCounter("reduces_killed", "", 0);
+  final MetricMutableGaugeInt[] runningTime;
+  TimeBucketMetrics<JobID> runBuckets;
 
   final String sessionId;
   private String queueName;
@@ -85,13 +94,45 @@ class QueueMetrics implements MetricsSou
     sessionId = conf.get("session.id", "");
     registry.setContext("mapred").tag("sessionId", "", sessionId);
     registry.tag("Queue", "Metrics by queue", queueName);
+    runningTime = buildBuckets(conf);
   }
 
   public String getQueueName() {
     return this.queueName;
   }
 
+  private static ArrayList<Integer> parseInts(String value) {
+    ArrayList<Integer> result = new ArrayList<Integer>();
+    for(String word: value.split(",")) {
+      result.add(Integer.parseInt(word.trim()));
+    }
+    return result;
+  }
+
+  private MetricMutableGaugeInt[] buildBuckets(Configuration conf) {
+    ArrayList<Integer> buckets = 
+      parseInts(conf.get(BUCKET_PROPERTY, DEFAULT_BUCKETS));
+    MetricMutableGaugeInt[] result = 
+      new MetricMutableGaugeInt[buckets.size() + 1];
+    result[0] = registry.newGauge("running_0", "", 0);
+    long[] cuts = new long[buckets.size()];
+    for(int i=0; i < buckets.size(); ++i) {
+      result[i+1] = registry.newGauge("running_" + buckets.get(i), "", 0);
+      cuts[i] = buckets.get(i) * 1000 * 60; // covert from min to ms
+    }
+    this.runBuckets = new TimeBucketMetrics<JobID>(cuts);
+    return result;
+  }
+
+  private void updateRunningTime() {
+    int[] counts = runBuckets.getBucketCounts(System.currentTimeMillis());
+    for(int i=0; i < counts.length; ++i) {
+      runningTime[i].set(counts[i]); 
+    }
+  }
+
   public void getMetrics(MetricsBuilder builder, boolean all) {
+    updateRunningTime();
     registry.snapshot(builder.addRecord(registry.name()), all);
   }
 
@@ -181,10 +222,12 @@ class QueueMetrics implements MetricsSou
 
   public void addRunningJob(JobConf conf, JobID id) {
     jobsRunning.incr();
+    runBuckets.add(id, System.currentTimeMillis());
   }
 
   public void decRunningJob(JobConf conf, JobID id) {
     jobsRunning.decr();
+    runBuckets.remove(id);
   }
 
   public void killedMap(TaskAttemptID taskAttemptID) {

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TimeBucketMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TimeBucketMetrics.java?rev=1299098&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TimeBucketMetrics.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TimeBucketMetrics.java Fri Mar  9 23:28:10 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.HashMap;
+
+/**
+ * Create a set of buckets that hold key-time pairs. When the values of the 
+ * buckets is queried, the number of objects with time differences in the
+ * different buckets is returned.
+ */
+class TimeBucketMetrics<OBJ> {
+
+  private final HashMap<OBJ, Long> map = new HashMap<OBJ, Long>();
+  private final int[] counts;
+  private final long[] cuts;
+
+  /**
+   * Create a set of buckets based on a set of time points. The number of 
+   * buckets is one more than the number of points.
+   */
+  TimeBucketMetrics(long[] cuts) {
+    this.cuts = cuts;
+    counts = new int[cuts.length + 1];
+  }
+
+  /**
+   * Add an object to be counted
+   */
+  synchronized void add(OBJ key, long time) {
+    map.put(key, time);
+  }
+
+  /**
+   * Remove an object to be counted
+   */
+  synchronized void remove(OBJ key) {
+    map.remove(key);
+  }
+
+  /**
+   * Find the bucket based on the cut points.
+   */
+  private int findBucket(long val) {
+    for(int i=0; i < cuts.length; ++i) {
+      if (val < cuts[i]) {
+	return i;
+      }
+    }
+    return cuts.length;
+  }
+
+  /**
+   * Get the counts of how many keys are in each bucket. The same array is
+   * returned by each call to this method.
+   */
+  synchronized int[] getBucketCounts(long now) {
+    for(int i=0; i < counts.length; ++i) {
+      counts[i] = 0;
+    }
+    for(Long time: map.values()) {
+      counts[findBucket(now - time)] += 1;
+    }
+    return counts;
+  }
+}
\ No newline at end of file