You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/05/07 22:29:04 UTC

svn commit: r654265 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/dfs/namenode/metrics/ src/java/org/apache/hadoop/metrics/ src/java/org/apache/hadoop/metrics/spi/ src/java/org/apache/hadoop/metrics/util/

Author: rangadi
Date: Wed May  7 13:29:01 2008
New Revision: 654265

URL: http://svn.apache.org/viewvc?rev=654265&view=rev
Log:
HADOOP-3058. Add FSNamesystem status metrics. (Lohit Vjayarenu via rangadi)

Added:
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystemMetrics.java
    hadoop/core/trunk/src/java/org/apache/hadoop/metrics/util/MetricsLongValue.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ReplicationTargetChooser.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/namenode/metrics/FSNamesystemMBean.java
    hadoop/core/trunk/src/java/org/apache/hadoop/metrics/MetricsRecord.java
    hadoop/core/trunk/src/java/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java
    hadoop/core/trunk/src/java/org/apache/hadoop/metrics/spi/MetricsRecordImpl.java
    hadoop/core/trunk/src/java/org/apache/hadoop/metrics/util/MetricsIntValue.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=654265&r1=654264&r2=654265&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed May  7 13:29:01 2008
@@ -62,6 +62,9 @@
     HADOOP-2019. Adds support for .tar, .tgz and .tar.gz files in 
     DistributedCache (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-3058. Add FSNamesystem status metrics. 
+    (Lohit Vjayarenu via rangadi)
+
   IMPROVEMENTS
    
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=654265&r1=654264&r2=654265&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed May  7 13:29:01 2008
@@ -72,6 +72,12 @@
   private UserGroupInformation fsOwner;
   private String supergroup;
   private PermissionStatus defaultPermission;
+  // FSNamesystemMetrics counter variables
+  private FSNamesystemMetrics myFSMetrics;
+  private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
+  private int totalLoad = 0;
+  private long pendingReplicationBlocksCount = 0L, 
+    underReplicatedBlocksCount = 0L, scheduledReplicationBlocksCount = 0L;
 
   //
   // Stores the correct file name hierarchy
@@ -128,15 +134,6 @@
     new TreeMap<String, Collection<Block>>();
 
   //
-  // Stats on overall usage
-  //
-  long totalCapacity = 0L, totalUsed=0L, totalRemaining = 0L;
-
-  // total number of connections per live datanode
-  int totalLoad = 0;
-
-
-  //
   // For the HTTP browsing interface
   //
   StatusHttpServer infoServer;
@@ -254,6 +251,7 @@
 
     this.localMachine = nn.getNameNodeAddress().getHostName();
     this.port = nn.getNameNodeAddress().getPort();
+    this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
     this.dir = new FSDirectory(this, conf);
     StartupOption startOpt = NameNode.getStartupOption(conf);
     this.dir.loadFSImage(getNamespaceDirs(conf), startOpt);
@@ -274,9 +272,6 @@
     lmthread.start();
     replthread.start();
     resthread.start();
-    
-    this.registerMBean(); // register the MBean for the FSNamesystemStutus
-
 
     this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
                                            conf.get("dfs.hosts.exclude",""));
@@ -342,6 +337,7 @@
    */
   FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
     setConfigurationParameters(conf);
+    this.registerMBean(conf);
     this.dir = new FSDirectory(fsImage, this, conf);
   }
 
@@ -1890,14 +1886,14 @@
     //
     assert(Thread.holdsLock(heartbeats));
     if (isAdded) {
-      totalCapacity += node.getCapacity();
-      totalUsed += node.getDfsUsed();
-      totalRemaining += node.getRemaining();
+      capacityTotal += node.getCapacity();
+      capacityUsed += node.getDfsUsed();
+      capacityRemaining += node.getRemaining();
       totalLoad += node.getXceiverCount();
     } else {
-      totalCapacity -= node.getCapacity();
-      totalUsed -= node.getDfsUsed();
-      totalRemaining -= node.getRemaining();
+      capacityTotal -= node.getCapacity();
+      capacityUsed -= node.getDfsUsed();
+      capacityRemaining -= node.getRemaining();
       totalLoad -= node.getXceiverCount();
     }
   }
@@ -1972,6 +1968,12 @@
     }
 
     workFound = computeReplicationWork(blocksToProcess); 
+    
+    // Update FSNamesystemMetrics counters
+    pendingReplicationBlocksCount = pendingReplications.size();
+    underReplicatedBlocksCount = neededReplications.size();
+    scheduledReplicationBlocksCount = workFound;
+    
     if(workFound == 0)
       workFound = computeInvalidateWork(nodesToProcess);
     return workFound;
@@ -2827,7 +2829,7 @@
    */
   public long getCapacityTotal() {
     synchronized (heartbeats) {
-      return totalCapacity;
+      return this.capacityTotal;
     }
   }
 
@@ -2836,7 +2838,7 @@
    */
   public long getCapacityUsed() {
     synchronized(heartbeats){
-      return totalUsed;
+      return this.capacityUsed;
     }
   }
   /**
@@ -2844,16 +2846,16 @@
    */
   public long getCapacityRemaining() {
     synchronized (heartbeats) {
-      return totalRemaining;
+      return this.capacityRemaining;
     }
   }
 
   /**
    * Total number of connections.
    */
-  public int totalLoad() {
+  public int getTotalLoad() {
     synchronized (heartbeats) {
-      return totalLoad;
+      return this.totalLoad;
     }
   }
 
@@ -3876,6 +3878,18 @@
     return this.dir.totalInodes();
   }
 
+  public long getPendingReplicationBlocks() {
+    return pendingReplicationBlocksCount;
+  }
+
+  public long getUnderReplicatedBlocks() {
+    return underReplicatedBlocksCount;
+  }
+
+  public long getScheduledReplicationBlocks() {
+    return scheduledReplicationBlocksCount;
+  }
+
   public String getFSState() {
     return isInSafeMode() ? "safeMode" : "Operational";
   }
@@ -3884,12 +3898,13 @@
   /**
    * Register the FSNamesystem MBean
    */
-  void registerMBean() {
+  void registerMBean(Configuration conf) {
     // We wrap to bypass standard mbean naming convetion.
     // This wraping can be removed in java 6 as it is more flexible in 
     // package naming for mbeans and their impl.
     StandardMBean bean;
     try {
+      myFSMetrics = new FSNamesystemMetrics(conf, this);
       bean = new StandardMBean(this,FSNamesystemMBean.class);
       mbeanName = MBeanUtil.registerMBean("NameNode", "FSNamesystemStatus", bean);
     } catch (NotCompliantMBeanException e) {
@@ -3898,7 +3913,14 @@
 
     LOG.info("Registered FSNamesystemStatusMBean");
   }
-  
+
+  /**
+   * get FSNamesystemMetrics
+   */
+  public FSNamesystemMetrics getFSNamesystemMetrics() {
+    return myFSMetrics;
+  }
+
   /**
    * shutdown FSNamesystem
    */

Added: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystemMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystemMetrics.java?rev=654265&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystemMetrics.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystemMetrics.java Wed May  7 13:29:01 2008
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics.*;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.metrics.util.MetricsLongValue;
+import org.apache.hadoop.metrics.util.MetricsIntValue;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+
+/**
+ * 
+ * This class is for maintaining  the various FSNamesystem status metrics
+ * and publishing them through the metrics interfaces.
+ * This also registers the JMX MBean for RPC.
+ * <p>
+ * This class has a number of metrics variables that are publicly accessible;
+ * these variables (objects) have methods to update their values;
+ *  for example:
+ *  <p> {@link #filesTotal}.set()
+ *
+ */
+public class FSNamesystemMetrics implements Updater {
+  private static Log log = LogFactory.getLog(FSNamesystemMetrics.class);
+  private final MetricsRecord metricsRecord;
+  private FSNamesystem fsNameSystem;
+   
+  public MetricsLongValue filesTotal = new MetricsLongValue("FilesTotal");
+  public MetricsLongValue blocksTotal = new MetricsLongValue("BlocksTotal");
+  public MetricsLongValue capacityTotal = new MetricsLongValue("CapacityTotal");
+  public MetricsLongValue capacityUsed = new MetricsLongValue("CapacityUsed");
+  public MetricsLongValue capacityRemaining = new MetricsLongValue("CapacityRemaining");
+  public MetricsIntValue totalLoad = new MetricsIntValue("TotalLoad");
+  public MetricsLongValue pendingReplicationBlocks = new MetricsLongValue("PendingReplicationBlocks");
+  public MetricsLongValue underReplicatedBlocks = new MetricsLongValue("UnderReplicatedBlocks");
+  public MetricsLongValue scheduledReplicationBlocks = new MetricsLongValue("ScheduledReplicationBlocks");
+  FSNamesystemMetrics(Configuration conf, FSNamesystem fsNameSystem) {
+    String sessionId = conf.get("session.id");
+    this.fsNameSystem = fsNameSystem;
+     
+    // Create a record for FSNamesystem metrics
+    MetricsContext metricsContext = MetricsUtil.getContext("dfs");
+    metricsRecord = MetricsUtil.createRecord(metricsContext, "FSNamesystem");
+    metricsRecord.setTag("sessionId", sessionId);
+    metricsContext.registerUpdater(this);
+    log.info("Initializing FSNamesystemMeterics using context object:" +
+              metricsContext.getClass().getName());
+  }
+  public void shutdown() {
+    if (fsNameSystem != null) 
+      fsNameSystem.shutdown();
+  }
+      
+  /**
+   * Since this object is a registered updater, this method will be called
+   * periodically, e.g. every 5 seconds.
+   * We set the metrics value within  this function before pushing it out. 
+   * FSNamesystem updates its own local variables which are
+   * light weight compared to Metrics counters. 
+   */
+  public void doUpdates(MetricsContext unused) {
+    synchronized (this) {
+      filesTotal.set(fsNameSystem.getFilesTotal());
+      filesTotal.pushMetric(metricsRecord);
+
+      blocksTotal.set(fsNameSystem.getBlocksTotal());
+      blocksTotal.pushMetric(metricsRecord);
+      
+      capacityTotal.set(fsNameSystem.getCapacityTotal());
+      capacityTotal.pushMetric(metricsRecord);
+      
+      capacityUsed.set(fsNameSystem.getCapacityUsed());
+      capacityUsed.pushMetric(metricsRecord);
+      
+      capacityRemaining.set(fsNameSystem.getCapacityRemaining());
+      capacityRemaining.pushMetric(metricsRecord);
+      
+      totalLoad.set(fsNameSystem.getTotalLoad());
+      totalLoad.pushMetric(metricsRecord);
+      
+      pendingReplicationBlocks.set(fsNameSystem.getPendingReplicationBlocks());
+      pendingReplicationBlocks.pushMetric(metricsRecord);
+
+      underReplicatedBlocks.set(fsNameSystem.getUnderReplicatedBlocks());
+      underReplicatedBlocks.pushMetric(metricsRecord);
+
+      scheduledReplicationBlocks.set(fsNameSystem.
+                                      getScheduledReplicationBlocks());
+      scheduledReplicationBlocks.pushMetric(metricsRecord);
+    }
+    metricsRecord.update();
+  }
+}

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ReplicationTargetChooser.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ReplicationTargetChooser.java?rev=654265&r1=654264&r2=654265&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ReplicationTargetChooser.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ReplicationTargetChooser.java Wed May  7 13:29:01 2008
@@ -406,7 +406,7 @@
       double avgLoad = 0;
       int size = clusterMap.getNumOfLeaves();
       if (size != 0) {
-        avgLoad = (double)fs.totalLoad()/size;
+        avgLoad = (double)fs.getTotalLoad()/size;
       }
       if (node.getXceiverCount() > (2.0 * avgLoad)) {
         logr.debug("Node "+NodeBase.getPath(node)+

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/namenode/metrics/FSNamesystemMBean.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/namenode/metrics/FSNamesystemMBean.java?rev=654265&r1=654264&r2=654265&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/namenode/metrics/FSNamesystemMBean.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/namenode/metrics/FSNamesystemMBean.java Wed May  7 13:29:01 2008
@@ -71,6 +71,30 @@
   public long getFilesTotal();
  
   /**
+   * Blocks pending to be replicated
+   * @return -  num of blocks to be replicated
+   */
+  public long getPendingReplicationBlocks();
+ 
+  /**
+   * Blocks under replicated 
+   * @return -  num of blocks under replicated
+   */
+  public long getUnderReplicatedBlocks();
+ 
+  /**
+   * Blocks scheduled for replication
+   * @return -  num of blocks scheduled for replication
+   */
+  public long getScheduledReplicationBlocks();
+
+  /**
+   * Total Load on the FSNamesystem
+   * @return -  total load of FSNamesystem
+   */
+  public int getTotalLoad();
+
+  /**
    * Number of Live data nodes
    * @return number of live data nodes
    */

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/metrics/MetricsRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/metrics/MetricsRecord.java?rev=654265&r1=654264&r2=654265&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/metrics/MetricsRecord.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/metrics/MetricsRecord.java Wed May  7 13:29:01 2008
@@ -100,6 +100,15 @@
    * @param tagValue new value of the tag
    * @throws MetricsException if the tagName conflicts with the configuration
    */
+  public abstract void setTag(String tagName, long tagValue);
+    
+  /**
+   * Sets the named tag to the specified value.
+   *
+   * @param tagName name of the tag
+   * @param tagValue new value of the tag
+   * @throws MetricsException if the tagName conflicts with the configuration
+   */
   public abstract void setTag(String tagName, short tagValue);
     
   /**
@@ -136,6 +145,16 @@
    * @throws MetricsException if the metricName or the type of the metricValue 
    * conflicts with the configuration
    */
+  public abstract void setMetric(String metricName, long metricValue);
+    
+  /**
+   * Sets the named metric to the specified value.
+   *
+   * @param metricName name of the metric
+   * @param metricValue new value of the metric
+   * @throws MetricsException if the metricName or the type of the metricValue 
+   * conflicts with the configuration
+   */
   public abstract void setMetric(String metricName, short metricValue);
     
   /**
@@ -176,6 +195,16 @@
    * @throws MetricsException if the metricName or the type of the metricValue 
    * conflicts with the configuration
    */
+  public abstract void incrMetric(String metricName, long metricValue);
+    
+  /**
+   * Increments the named metric by the specified value.
+   *
+   * @param metricName name of the metric
+   * @param metricValue incremental value
+   * @throws MetricsException if the metricName or the type of the metricValue 
+   * conflicts with the configuration
+   */
   public abstract void incrMetric(String metricName, short metricValue);
     
   /**

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java?rev=654265&r1=654264&r2=654265&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java Wed May  7 13:29:01 2008
@@ -374,6 +374,9 @@
     else if (a instanceof Byte) {
       return new Byte((byte)(a.byteValue() + b.byteValue()));
     }
+    else if (a instanceof Long) {
+      return Long.valueOf((a.longValue() + b.longValue()));
+    }
     else {
       // should never happen
       throw new MetricsException("Invalid number type");

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/metrics/spi/MetricsRecordImpl.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/metrics/spi/MetricsRecordImpl.java?rev=654265&r1=654264&r2=654265&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/metrics/spi/MetricsRecordImpl.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/metrics/spi/MetricsRecordImpl.java Wed May  7 13:29:01 2008
@@ -87,6 +87,17 @@
    * @param tagValue new value of the tag
    * @throws MetricsException if the tagName conflicts with the configuration
    */
+  public void setTag(String tagName, long tagValue) {
+    tagTable.put(tagName, Long.valueOf(tagValue));
+  }
+    
+  /**
+   * Sets the named tag to the specified value.
+   *
+   * @param tagName name of the tag
+   * @param tagValue new value of the tag
+   * @throws MetricsException if the tagName conflicts with the configuration
+   */
   public void setTag(String tagName, short tagValue) {
     tagTable.put(tagName, new Short(tagValue));
   }
@@ -129,6 +140,18 @@
    * @throws MetricsException if the metricName or the type of the metricValue 
    * conflicts with the configuration
    */
+  public void setMetric(String metricName, long metricValue) {
+    setAbsolute(metricName, Long.valueOf(metricValue));
+  }
+    
+  /**
+   * Sets the named metric to the specified value.
+   *
+   * @param metricName name of the metric
+   * @param metricValue new value of the metric
+   * @throws MetricsException if the metricName or the type of the metricValue 
+   * conflicts with the configuration
+   */
   public void setMetric(String metricName, short metricValue) {
     setAbsolute(metricName, new Short(metricValue));
   }
@@ -177,6 +200,18 @@
    * @throws MetricsException if the metricName or the type of the metricValue 
    * conflicts with the configuration
    */
+  public void incrMetric(String metricName, long metricValue) {
+    setIncrement(metricName, Long.valueOf(metricValue));
+  }
+    
+  /**
+   * Increments the named metric by the specified value.
+   *
+   * @param metricName name of the metric
+   * @param metricValue incremental value
+   * @throws MetricsException if the metricName or the type of the metricValue 
+   * conflicts with the configuration
+   */
   public void incrMetric(String metricName, short metricValue) {
     setIncrement(metricName, new Short(metricValue));
   }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/metrics/util/MetricsIntValue.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/metrics/util/MetricsIntValue.java?rev=654265&r1=654264&r2=654265&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/metrics/util/MetricsIntValue.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/metrics/util/MetricsIntValue.java Wed May  7 13:29:01 2008
@@ -65,8 +65,46 @@
   public synchronized int get() { 
     return value;
   } 
+
+  /**
+   * Inc metrics for incr vlaue
+   * @param incr - value to be added
+   */
+  public synchronized void inc(final int incr) {
+    value += incr;
+    changed = true;
+  }
   
   /**
+   * Inc metrics by one
+   */
+  public synchronized void inc() {
+    value++;
+    changed = true;
+  }
+
+  /**
+   * Inc metrics for incr vlaue
+   * @param decr - value to subtract
+   */
+  public synchronized void dec(final int decr) {
+    value -= decr;
+    if (value < 0)
+      value = 0;
+    changed = true;
+  }
+  
+  /**
+   * Dec metrics by one
+   */
+  public synchronized void dec() {
+    value--;
+    if (value < 0)
+      value = 0;
+    changed = true;
+  }
+
+  /**
    * Push the metric to the mr.
    * The metric is pushed only if it was updated since last push
    * 
@@ -78,7 +116,7 @@
   public synchronized void pushMetric(final MetricsRecord mr) {
     if (changed) {
       try {
-        mr.incrMetric(name, value);
+        mr.setMetric(name, value);
       } catch (Exception e) {
         LOG.info("pushMetric failed for " + name + "\n" +
             StringUtils.stringifyException(e));

Added: hadoop/core/trunk/src/java/org/apache/hadoop/metrics/util/MetricsLongValue.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/metrics/util/MetricsLongValue.java?rev=654265&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/metrics/util/MetricsLongValue.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/metrics/util/MetricsLongValue.java Wed May  7 13:29:01 2008
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics.util;
+
+import org.apache.hadoop.metrics.MetricsRecord;
+
+
+/**
+ * The MetricsLongValue class is for a metric that is not time varied
+ * but changes only when it is set. 
+ * Each time its value is set, it is published only *once* at the next update
+ * call.
+ *
+ */
+public class MetricsLongValue {  
+  private String name;
+  private long value;
+  private boolean changed;
+  
+  /**
+   * Constructor - create a new metric
+   * @param nam the name of the metrics to be used to publish the metric
+   */
+  public MetricsLongValue(final String nam) {
+    name = nam;
+    value = 0;
+    changed = false;
+  }
+  
+  /**
+   * Set the value
+   * @param newValue
+   */
+  public synchronized void set(final long newValue) {
+    value = newValue;
+    changed = true;
+  }
+  
+  /**
+   * Get value
+   * @return the value last set
+   */
+  public synchronized long get() { 
+    return value;
+  } 
+  
+  /**
+   * Inc metrics for incr vlaue
+   * @param incr - value to be added
+   */
+  public synchronized void inc(final long incr) {
+    value += incr;
+    changed = true;
+  }
+  
+  /**
+   * Inc metrics by one
+   */
+  public synchronized void inc() {
+    value++;
+    changed = true;
+  }
+
+  /**
+   * Inc metrics for incr vlaue
+   * @param decr - value to subtract
+   */
+  public synchronized void dec(final long decr) {
+    value -= decr;
+    if (value < 0)
+      value = 0;
+    changed = true;
+  }
+  
+  /**
+   * Dec metrics by one
+   */
+  public synchronized void dec() {
+    value--;
+    if (value < 0)
+      value = 0;
+    changed = true;
+  }
+
+  /**
+   * Push the metric to the mr.
+   * The metric is pushed only if it was updated since last push
+   * 
+   * Note this does NOT push to JMX
+   * (JMX gets the info via {@link #get()}
+   *
+   * @param mr
+   */
+  public synchronized void pushMetric(final MetricsRecord mr) {
+    if (changed) 
+      mr.setMetric(name, value);
+    changed = false;
+  }
+}