You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2012/07/27 02:26:22 UTC

svn commit: r1366246 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/ src/main/java/o...

Author: atm
Date: Fri Jul 27 00:26:21 2012
New Revision: 1366246

URL: http://svn.apache.org/viewvc?rev=1366246&view=rev
Log:
HDFS-3650. Use MutableQuantiles to provide latency histograms for various operations. Contributed by Andrew Wang.

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1366246&r1=1366245&r2=1366246&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jul 27 00:26:21 2012
@@ -356,6 +356,9 @@ Branch-2 ( Unreleased changes )
 
     HDFS-3711. Manually convert remaining tests to JUnit4. (Andrew Wang via atm)
 
+    HDFS-3650. Use MutableQuantiles to provide latency histograms for various
+    operations. (Andrew Wang via atm)
+
   OPTIMIZATIONS
 
     HDFS-2982. Startup performance suffers when there are many edit log

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1366246&r1=1366245&r2=1366246&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jul 27 00:26:21 2012
@@ -203,6 +203,7 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size"; 
   public static final String  DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
   public static final String  DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";
+  public static final String  DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
   public static final String  DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
   public static final String  DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
   public static final String  DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1366246&r1=1366245&r2=1366246&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Fri Jul 27 00:26:21 2012
@@ -76,6 +76,8 @@ import org.apache.hadoop.util.DataChecks
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 /****************************************************************
  * DFSOutputStream creates files from a stream of bytes.
@@ -1210,7 +1212,8 @@ public class DFSOutputStream extends FSO
   //
   // returns the list of targets, if any, that is being currently used.
   //
-  synchronized DatanodeInfo[] getPipeline() {
+  @VisibleForTesting
+  public synchronized DatanodeInfo[] getPipeline() {
     if (streamer == null) {
       return null;
     }
@@ -1752,11 +1755,13 @@ public class DFSOutputStream extends FSO
     }
   }
 
-  void setArtificialSlowdown(long period) {
+  @VisibleForTesting
+  public void setArtificialSlowdown(long period) {
     artificialSlowdown = period;
   }
 
-  synchronized void setChunksPerPacket(int value) {
+  @VisibleForTesting
+  public synchronized void setChunksPerPacket(int value) {
     chunksPerPacket = Math.min(chunksPerPacket, value);
     packetSize = PacketHeader.PKT_HEADER_LEN +
                  (checksum.getBytesPerChecksum() + 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1366246&r1=1366245&r2=1366246&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Fri Jul 27 00:26:21 2012
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
 import org.apache.hadoop.io.nativeio.NativeIO;
@@ -486,11 +487,14 @@ class BlockSender implements java.io.Clo
         
         // no need to flush since we know out is not a buffered stream
         FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
+        LongWritable waitTime = new LongWritable();
+        LongWritable transferTime = new LongWritable();
         sockOut.transferToFully(fileCh, blockInPosition, dataLen, 
-            datanode.metrics.getSendDataPacketBlockedOnNetworkNanos(),
-            datanode.metrics.getSendDataPacketTransferNanos());
+            waitTime, transferTime);
+        datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
+        datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
         blockInPosition += dataLen;
-      } else { 
+      } else {
         // normal transfer
         out.write(buf, 0, dataOff + dataLen);
       }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1366246&r1=1366245&r2=1366246&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Fri Jul 27 00:26:21 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.metrics2.annota
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 
@@ -74,19 +75,54 @@ public class DataNodeMetrics {
   @Metric MutableRate heartbeats;
   @Metric MutableRate blockReports;
   @Metric MutableRate packetAckRoundTripTimeNanos;
-
+  MutableQuantiles[] packetAckRoundTripTimeNanosQuantiles;
+  
   @Metric MutableRate flushNanos;
+  MutableQuantiles[] flushNanosQuantiles;
+  
   @Metric MutableRate fsyncNanos;
+  MutableQuantiles[] fsyncNanosQuantiles;
   
   @Metric MutableRate sendDataPacketBlockedOnNetworkNanos;
+  MutableQuantiles[] sendDataPacketBlockedOnNetworkNanosQuantiles;
   @Metric MutableRate sendDataPacketTransferNanos;
+  MutableQuantiles[] sendDataPacketTransferNanosQuantiles;
+  
 
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   final String name;
 
-  public DataNodeMetrics(String name, String sessionId) {
+  public DataNodeMetrics(String name, String sessionId, int[] intervals) {
     this.name = name;
     registry.tag(SessionId, sessionId);
+    
+    final int len = intervals.length;
+    packetAckRoundTripTimeNanosQuantiles = new MutableQuantiles[len];
+    flushNanosQuantiles = new MutableQuantiles[len];
+    fsyncNanosQuantiles = new MutableQuantiles[len];
+    sendDataPacketBlockedOnNetworkNanosQuantiles = new MutableQuantiles[len];
+    sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
+    
+    for (int i = 0; i < len; i++) {
+      int interval = intervals[i];
+      packetAckRoundTripTimeNanosQuantiles[i] = registry.newQuantiles(
+          "packetAckRoundTripTimeNanos" + interval + "s",
+          "Packet Ack RTT in ns", "ops", "latency", interval);
+      flushNanosQuantiles[i] = registry.newQuantiles(
+          "flushNanos" + interval + "s", 
+          "Disk flush latency in ns", "ops", "latency", interval);
+      fsyncNanosQuantiles[i] = registry.newQuantiles(
+          "fsyncNanos" + interval + "s", "Disk fsync latency in ns", 
+          "ops", "latency", interval);
+      sendDataPacketBlockedOnNetworkNanosQuantiles[i] = registry.newQuantiles(
+          "sendDataPacketBlockedOnNetworkNanos" + interval + "s", 
+          "Time blocked on network while sending a packet in ns",
+          "ops", "latency", interval);
+      sendDataPacketTransferNanosQuantiles[i] = registry.newQuantiles(
+          "sendDataPacketTransferNanos" + interval + "s", 
+          "Time reading from disk and writing to network while sending " +
+          "a packet in ns", "ops", "latency", interval);
+    }
   }
 
   public static DataNodeMetrics create(Configuration conf, String dnName) {
@@ -94,8 +130,15 @@ public class DataNodeMetrics {
     MetricsSystem ms = DefaultMetricsSystem.instance();
     JvmMetrics.create("DataNode", sessionId, ms);
     String name = "DataNodeActivity-"+ (dnName.isEmpty()
-        ? "UndefinedDataNodeName"+ DFSUtil.getRandom().nextInt() : dnName.replace(':', '-'));
-    return ms.register(name, null, new DataNodeMetrics(name, sessionId));
+        ? "UndefinedDataNodeName"+ DFSUtil.getRandom().nextInt() 
+            : dnName.replace(':', '-'));
+
+    // Percentile measurement is off by default, by watching no intervals
+    int[] intervals = 
+        conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
+    
+    return ms.register(name, null, new DataNodeMetrics(name, sessionId,
+        intervals));
   }
 
   public String name() { return name; }
@@ -166,14 +209,23 @@ public class DataNodeMetrics {
 
   public void addPacketAckRoundTripTimeNanos(long latencyNanos) {
     packetAckRoundTripTimeNanos.add(latencyNanos);
+    for (MutableQuantiles q : packetAckRoundTripTimeNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
 
   public void addFlushNanos(long latencyNanos) {
     flushNanos.add(latencyNanos);
+    for (MutableQuantiles q : flushNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
 
   public void addFsyncNanos(long latencyNanos) {
     fsyncNanos.add(latencyNanos);
+    for (MutableQuantiles q : fsyncNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
 
   public void shutdown() {
@@ -196,12 +248,18 @@ public class DataNodeMetrics {
   public void incrBlocksGetLocalPathInfo() {
     blocksGetLocalPathInfo.incr();
   }
-  
-  public MutableRate getSendDataPacketBlockedOnNetworkNanos() {
-    return sendDataPacketBlockedOnNetworkNanos;
+
+  public void addSendDataPacketBlockedOnNetworkNanos(long latencyNanos) {
+    sendDataPacketBlockedOnNetworkNanos.add(latencyNanos);
+    for (MutableQuantiles q : sendDataPacketBlockedOnNetworkNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
-  
-  public MutableRate getSendDataPacketTransferNanos() {
-    return sendDataPacketTransferNanos;
+
+  public void addSendDataPacketTransferNanos(long latencyNanos) {
+    sendDataPacketTransferNanos.add(latencyNanos);
+    for (MutableQuantiles q : sendDataPacketTransferNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=1366246&r1=1366245&r2=1366246&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Fri Jul 27 00:26:21 2012
@@ -17,17 +17,20 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.metrics;
 
+import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
+import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
-import static org.apache.hadoop.metrics2.impl.MsInfo.*;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 
@@ -57,15 +60,31 @@ public class NameNodeMetrics {
 
   @Metric("Journal transactions") MutableRate transactions;
   @Metric("Journal syncs") MutableRate syncs;
+  MutableQuantiles[] syncsQuantiles;
   @Metric("Journal transactions batched in sync")
   MutableCounterLong transactionsBatchedInSync;
   @Metric("Block report") MutableRate blockReport;
+  MutableQuantiles[] blockReportQuantiles;
 
   @Metric("Duration in SafeMode at startup") MutableGaugeInt safeModeTime;
   @Metric("Time loading FS Image at startup") MutableGaugeInt fsImageLoadTime;
 
-  NameNodeMetrics(String processName, String sessionId) {
+  NameNodeMetrics(String processName, String sessionId, int[] intervals) {
     registry.tag(ProcessName, processName).tag(SessionId, sessionId);
+    
+    final int len = intervals.length;
+    syncsQuantiles = new MutableQuantiles[len];
+    blockReportQuantiles = new MutableQuantiles[len];
+    
+    for (int i = 0; i < len; i++) {
+      int interval = intervals[i];
+      syncsQuantiles[i] = registry.newQuantiles(
+          "syncs" + interval + "s",
+          "Journal syncs", "ops", "latency", interval);
+      blockReportQuantiles[i] = registry.newQuantiles(
+          "blockReport" + interval + "s", 
+          "Block report", "ops", "latency", interval);
+    }
   }
 
   public static NameNodeMetrics create(Configuration conf, NamenodeRole r) {
@@ -73,7 +92,11 @@ public class NameNodeMetrics {
     String processName = r.toString();
     MetricsSystem ms = DefaultMetricsSystem.instance();
     JvmMetrics.create(processName, sessionId, ms);
-    return ms.register(new NameNodeMetrics(processName, sessionId));
+    
+    // Percentile measurement is off by default, by watching no intervals
+    int[] intervals = 
+        conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
+    return ms.register(new NameNodeMetrics(processName, sessionId, intervals));
   }
 
   public void shutdown() {
@@ -146,6 +169,9 @@ public class NameNodeMetrics {
 
   public void addSync(long elapsed) {
     syncs.add(elapsed);
+    for (MutableQuantiles q : syncsQuantiles) {
+      q.add(elapsed);
+    }
   }
 
   public void setFsImageLoadTime(long elapsed) {
@@ -154,6 +180,9 @@ public class NameNodeMetrics {
 
   public void addBlockReport(long latency) {
     blockReport.add(latency);
+    for (MutableQuantiles q : blockReportQuantiles) {
+      q.add(latency);
+    }
   }
 
   public void setSafeModeTime(long elapsed) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1366246&r1=1366245&r2=1366246&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Jul 27 00:26:21 2012
@@ -1004,4 +1004,14 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.metrics.percentiles.intervals</name>
+  <value></value>
+  <description>
+    Comma-delimited set of integers denoting the desired rollover intervals 
+    (in seconds) for percentile latency metrics on the Namenode and Datanode.
+    By default, percentile latency metrics are disabled.
+  </description>
+</property>
+
 </configuration>

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java?rev=1366246&r1=1366245&r2=1366246&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java Fri Jul 27 00:26:21 2012
@@ -18,21 +18,26 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.List;
-import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSOutputStream;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.junit.Test;
 
@@ -59,8 +64,10 @@ public class TestDataNodeMetrics {
   }
 
   @Test
-  public void testSendDataPacket() throws Exception {
+  public void testSendDataPacketMetrics() throws Exception {
     Configuration conf = new HdfsConfiguration();
+    final int interval = 1;
+    conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     try {
       FileSystem fs = cluster.getFileSystem();
@@ -73,64 +80,110 @@ public class TestDataNodeMetrics {
       assertEquals(datanodes.size(), 1);
       DataNode datanode = datanodes.get(0);
       MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name());
-
       // Expect 2 packets, 1 for the 1 byte read, 1 for the empty packet
       // signaling the end of the block
       assertCounter("SendDataPacketTransferNanosNumOps", (long)2, rb);
       assertCounter("SendDataPacketBlockedOnNetworkNanosNumOps", (long)2, rb);
+      // Wait for at least 1 rollover
+      Thread.sleep((interval + 1) * 1000);
+      // Check that the sendPacket percentiles rolled to non-zero values
+      String sec = interval + "s";
+      assertQuantileGauges("SendDataPacketBlockedOnNetworkNanos" + sec, rb);
+      assertQuantileGauges("SendDataPacketTransferNanos" + sec, rb);
     } finally {
       if (cluster != null) {cluster.shutdown();}
     }
   }
 
   @Test
-  public void testFlushMetric() throws Exception {
+  public void testReceivePacketMetrics() throws Exception {
     Configuration conf = new HdfsConfiguration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    final int interval = 1;
+    conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     try {
       cluster.waitActive();
       DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
 
       Path testFile = new Path("/testFlushNanosMetric.txt");
-      DFSTestUtil.createFile(fs, testFile, 1, (short)1, new Random().nextLong());
-
+      FSDataOutputStream fout = fs.create(testFile);
+      fout.write(new byte[1]);
+      fout.hsync();
+      fout.close();
       List<DataNode> datanodes = cluster.getDataNodes();
       DataNode datanode = datanodes.get(0);
       MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
-      // Expect 2 flushes, 1 for the flush that occurs after writing, 1 that occurs
-      // on closing the data and metadata files.
+      // Expect two flushes, 1 for the flush that occurs after writing, 
+      // 1 that occurs on closing the data and metadata files.
       assertCounter("FlushNanosNumOps", 2L, dnMetrics);
+      // Expect two syncs, one from the hsync, one on close.
+      assertCounter("FsyncNanosNumOps", 2L, dnMetrics);
+      // Wait for at least 1 rollover
+      Thread.sleep((interval + 1) * 1000);
+      // Check the receivePacket percentiles that should be non-zero
+      String sec = interval + "s";
+      assertQuantileGauges("FlushNanos" + sec, dnMetrics);
+      assertQuantileGauges("FsyncNanos" + sec, dnMetrics);
     } finally {
       if (cluster != null) {cluster.shutdown();}
     }
   }
 
+  /**
+   * Tests that round-trip acks in a datanode write pipeline are correctly 
+   * measured. 
+   */
   @Test
   public void testRoundTripAckMetric() throws Exception {
-    final int DATANODE_COUNT = 2;
-
+    final int datanodeCount = 2;
+    final int interval = 1;
     Configuration conf = new HdfsConfiguration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_COUNT).build();
+    conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        datanodeCount).build();
     try {
       cluster.waitActive();
-      DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
-
+      FileSystem fs = cluster.getFileSystem();
+      // Open a file and get the head of the pipeline
       Path testFile = new Path("/testRoundTripAckMetric.txt");
-      DFSTestUtil.createFile(fs, testFile, 1, (short)DATANODE_COUNT,
-          new Random().nextLong());
-
-      boolean foundNonzeroPacketAckNumOps = false;
+      FSDataOutputStream fsout = fs.create(testFile, (short) datanodeCount);
+      DFSOutputStream dout = (DFSOutputStream) fsout.getWrappedStream();
+      // Slow down the writes to catch the write pipeline
+      dout.setChunksPerPacket(5);
+      dout.setArtificialSlowdown(3000);
+      fsout.write(new byte[10000]);
+      DatanodeInfo[] pipeline = null;
+      int count = 0;
+      while (pipeline == null && count < 5) {
+        pipeline = dout.getPipeline();
+        System.out.println("Waiting for pipeline to be created.");
+        Thread.sleep(1000);
+        count++;
+      }
+      // Get the head node that should be receiving downstream acks
+      DatanodeInfo headInfo = pipeline[0];
+      DataNode headNode = null;
       for (DataNode datanode : cluster.getDataNodes()) {
-        MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
-        if (getLongCounter("PacketAckRoundTripTimeNanosNumOps", dnMetrics) > 0) {
-          foundNonzeroPacketAckNumOps = true;
+        if (datanode.getDatanodeId().equals(headInfo)) {
+          headNode = datanode;
+          break;
         }
       }
-      assertTrue(
-          "Expected at least one datanode to have reported PacketAckRoundTripTimeNanos metric",
-          foundNonzeroPacketAckNumOps);
+      assertNotNull("Could not find the head of the datanode write pipeline", 
+          headNode);
+      // Close the file and wait for the metrics to rollover
+      Thread.sleep((interval + 1) * 1000);
+      // Check the ack was received
+      MetricsRecordBuilder dnMetrics = getMetrics(headNode.getMetrics()
+          .name());
+      assertTrue("Expected non-zero number of acks", 
+          getLongCounter("PacketAckRoundTripTimeNanosNumOps", dnMetrics) > 0);
+      assertQuantileGauges("PacketAckRoundTripTimeNanos" + interval
+          + "s", dnMetrics);
     } finally {
-      if (cluster != null) {cluster.shutdown();}
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java?rev=1366246&r1=1366245&r2=1366246&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java Fri Jul 27 00:26:21 2012
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.na
 
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertTrue;
 
@@ -63,6 +64,9 @@ public class TestNameNodeMetrics {
   // Number of datanodes in the cluster
   private static final int DATANODE_COUNT = 3; 
   private static final int WAIT_GAUGE_VALUE_RETRIES = 20;
+  
+  // Rollover interval of percentile metrics (in seconds)
+  private static final int PERCENTILES_INTERVAL = 1;
 
   static {
     CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
@@ -71,6 +75,8 @@ public class TestNameNodeMetrics {
         DFS_REPLICATION_INTERVAL);
     CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
         DFS_REPLICATION_INTERVAL);
+    CONF.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, 
+        "" + PERCENTILES_INTERVAL);
 
     ((Log4JLogger)LogFactory.getLog(MetricsAsserts.class))
       .getLogger().setLevel(Level.DEBUG);
@@ -352,4 +358,24 @@ public class TestNameNodeMetrics {
     assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS));
     assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
   }
+  
+  /**
+   * Tests that the sync and block report metrics get updated on cluster
+   * startup.
+   */
+  @Test
+  public void testSyncAndBlockReportMetric() throws Exception {
+    MetricsRecordBuilder rb = getMetrics(NN_METRICS);
+    // We have one sync when the cluster starts up, just opening the journal
+    assertCounter("SyncsNumOps", 1L, rb);
+    // Each datanode reports in when the cluster comes up
+    assertCounter("BlockReportNumOps", (long)DATANODE_COUNT, rb);
+    
+    // Sleep for an interval+slop to let the percentiles rollover
+    Thread.sleep((PERCENTILES_INTERVAL+1)*1000);
+    
+    // Check that the percentiles were updated
+    assertQuantileGauges("Syncs1s", rb);
+    assertQuantileGauges("BlockReport1s", rb);
+  }
 }