You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/07/06 00:18:17 UTC

svn commit: r1357969 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/org/apache/hadoop/hdfs/server...

Author: todd
Date: Thu Jul  5 22:18:16 2012
New Revision: 1357969

URL: http://svn.apache.org/viewvc?rev=1357969&view=rev
Log:
HDFS-3170. Add more useful metrics for write latency. Contributed by Matthew Jacobs.

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1357969&r1=1357968&r2=1357969&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Jul  5 22:18:16 2012
@@ -96,6 +96,9 @@ Release 2.0.1-alpha - UNRELEASED
 
     HDFS-3343. Improve metrics for DN read latency (Andrew Wang via todd)
 
+    HDFS-3170. Add more useful metrics for write latency (Matthew Jacobs via
+    todd)
+
   OPTIMIZATIONS
 
     HDFS-2982. Startup performance suffers when there are many edit log

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java?rev=1357969&r1=1357968&r2=1357969&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java Thu Jul  5 22:18:16 2012
@@ -42,14 +42,25 @@ public class PipelineAck {
   }
   
   /**
-   * Constructor
+   * Constructor assuming no next DN in pipeline
    * @param seqno sequence number
    * @param replies an array of replies
    */
   public PipelineAck(long seqno, Status[] replies) {
+    this(seqno, replies, 0L);
+  }
+
+  /**
+   * Constructor
+   * @param seqno sequence number
+   * @param replies an array of replies
+   * @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline
+   */
+  public PipelineAck(long seqno, Status[] replies, long downstreamAckTimeNanos) {
     proto = PipelineAckProto.newBuilder()
       .setSeqno(seqno)
       .addAllStatus(Arrays.asList(replies))
+      .setDownstreamAckTimeNanos(downstreamAckTimeNanos)
       .build();
   }
   
@@ -76,7 +87,15 @@ public class PipelineAck {
   public Status getReply(int i) {
     return proto.getStatus(i);
   }
-  
+
+  /**
+   * Get the time elapsed for downstream ack RTT in nanoseconds
+   * @return time elapsed for downstream ack in nanoseconds, 0 if no next DN in pipeline
+   */
+  public long getDownstreamAckTimeNanos() {
+    return proto.getDownstreamAckTimeNanos();
+  }
+
   /**
    * Check if this ack contains error status
    * @return true if all statuses are SUCCESS

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1357969&r1=1357968&r2=1357969&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Jul  5 22:18:16 2012
@@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -252,15 +251,21 @@ class BlockReceiver implements Closeable
     if (syncOnClose && (out != null || checksumOut != null)) {
       datanode.metrics.incrFsyncCount();      
     }
+    long flushTotalNanos = 0;
+    boolean measuredFlushTime = false;
     // close checksum file
     try {
       if (checksumOut != null) {
+        long flushStartNanos = System.nanoTime();
         checksumOut.flush();
+        long flushEndNanos = System.nanoTime();
         if (syncOnClose && (cout instanceof FileOutputStream)) {
-          long start = Util.now();
+          long fsyncStartNanos = flushEndNanos;
           ((FileOutputStream)cout).getChannel().force(true);
-          datanode.metrics.addFsync(Util.now() - start);
+          datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
         }
+        flushTotalNanos += flushEndNanos - flushStartNanos;
+        measuredFlushTime = true;
         checksumOut.close();
         checksumOut = null;
       }
@@ -273,12 +278,16 @@ class BlockReceiver implements Closeable
     // close block file
     try {
       if (out != null) {
+        long flushStartNanos = System.nanoTime();
         out.flush();
+        long flushEndNanos = System.nanoTime();
         if (syncOnClose && (out instanceof FileOutputStream)) {
-          long start = Util.now();
+          long fsyncStartNanos = flushEndNanos;
           ((FileOutputStream)out).getChannel().force(true);
-          datanode.metrics.addFsync(Util.now() - start);
+          datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
         }
+        flushTotalNanos += flushEndNanos - flushStartNanos;
+        measuredFlushTime = true;
         out.close();
         out = null;
       }
@@ -288,6 +297,9 @@ class BlockReceiver implements Closeable
     finally{
       IOUtils.closeStream(out);
     }
+    if (measuredFlushTime) {
+      datanode.metrics.addFlushNanos(flushTotalNanos);
+    }
     // disk check
     if(ioe != null) {
       datanode.checkDiskError(ioe);
@@ -303,21 +315,31 @@ class BlockReceiver implements Closeable
     if (isSync && (out != null || checksumOut != null)) {
       datanode.metrics.incrFsyncCount();      
     }
+    long flushTotalNanos = 0;
     if (checksumOut != null) {
+      long flushStartNanos = System.nanoTime();
       checksumOut.flush();
+      long flushEndNanos = System.nanoTime();
       if (isSync && (cout instanceof FileOutputStream)) {
-        long start = Util.now();
+        long fsyncStartNanos = flushEndNanos;
         ((FileOutputStream)cout).getChannel().force(true);
-        datanode.metrics.addFsync(Util.now() - start);
+        datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
       }
+      flushTotalNanos += flushEndNanos - flushStartNanos;
     }
     if (out != null) {
+      long flushStartNanos = System.nanoTime();
       out.flush();
+      long flushEndNanos = System.nanoTime();
       if (isSync && (out instanceof FileOutputStream)) {
-        long start = Util.now();
+        long fsyncStartNanos = flushEndNanos;
         ((FileOutputStream)out).getChannel().force(true);
-        datanode.metrics.addFsync(Util.now() - start);
+        datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
       }
+      flushTotalNanos += flushEndNanos - flushStartNanos;
+    }
+    if (checksumOut != null || out != null) {
+      datanode.metrics.addFlushNanos(flushTotalNanos);
     }
   }
 
@@ -446,7 +468,7 @@ class BlockReceiver implements Closeable
    */
   private void readNextPacket() throws IOException {
     /* This dances around buf a little bit, mainly to read 
-     * full packet with single read and to accept arbitarary size  
+     * full packet with single read and to accept arbitrary size  
      * for next packet at the same time.
      */
     if (buf == null) {
@@ -715,7 +737,7 @@ class BlockReceiver implements Closeable
           replicaInfo.setLastChecksumAndDataLen(
             offsetInBlock, lastChunkChecksum
           );
-          
+
           datanode.metrics.incrBytesWritten(len);
 
           dropOsCacheBehindWriter(offsetInBlock);
@@ -976,7 +998,8 @@ class BlockReceiver implements Closeable
     synchronized void enqueue(final long seqno,
         final boolean lastPacketInBlock, final long offsetInBlock) {
       if (running) {
-        final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock);
+        final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
+            System.nanoTime());
         if(LOG.isDebugEnabled()) {
           LOG.debug(myString + ": enqueue " + p);
         }
@@ -1013,17 +1036,20 @@ class BlockReceiver implements Closeable
       final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
       while (running && datanode.shouldRun && !lastPacketInBlock) {
 
+        long totalAckTimeNanos = 0;
         boolean isInterrupted = false;
         try {
             Packet pkt = null;
             long expected = -2;
             PipelineAck ack = new PipelineAck();
             long seqno = PipelineAck.UNKOWN_SEQNO;
+            long ackRecvNanoTime = 0;
             try {
               if (type != PacketResponderType.LAST_IN_PIPELINE
                   && !mirrorError) {
                 // read an ack from downstream datanode
                 ack.readFields(downstreamIn);
+                ackRecvNanoTime = System.nanoTime();
                 if (LOG.isDebugEnabled()) {
                   LOG.debug(myString + " got " + ack);
                 }
@@ -1049,6 +1075,22 @@ class BlockReceiver implements Closeable
                     throw new IOException(myString + "seqno: expected="
                         + expected + ", received=" + seqno);
                   }
+                  if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
+                    // The total ack time includes the ack times of downstream nodes.
+                    // The value is 0 if this responder doesn't have a downstream
+                    // DN in the pipeline.
+                    totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
+                    // Report the elapsed time from ack send to ack receive minus
+                    // the downstream ack time.
+                    long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos();
+                    if (ackTimeNanos < 0) {
+                      if (LOG.isDebugEnabled()) {
+                        LOG.debug("Calculated invalid ack time: " + ackTimeNanos + "ns.");
+                      }
+                    } else {
+                      datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
+                    }
+                  }
                   lastPacketInBlock = pkt.lastPacketInBlock;
                 }
               }
@@ -1116,7 +1158,7 @@ class BlockReceiver implements Closeable
                 replies[i+1] = ack.getReply(i);
               }
             }
-            PipelineAck replyAck = new PipelineAck(expected, replies);
+            PipelineAck replyAck = new PipelineAck(expected, replies, totalAckTimeNanos);
             
             if (replyAck.isSuccess() && 
                  pkt.offsetInBlock > replicaInfo.getBytesAcked())
@@ -1176,11 +1218,14 @@ class BlockReceiver implements Closeable
     final long seqno;
     final boolean lastPacketInBlock;
     final long offsetInBlock;
+    final long ackEnqueueNanoTime;
 
-    Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) {
+    Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock,
+        long ackEnqueueNanoTime) {
       this.seqno = seqno;
       this.lastPacketInBlock = lastPacketInBlock;
       this.offsetInBlock = offsetInBlock;
+      this.ackEnqueueNanoTime = ackEnqueueNanoTime;
     }
 
     @Override
@@ -1188,6 +1233,7 @@ class BlockReceiver implements Closeable
       return getClass().getSimpleName() + "(seqno=" + seqno
         + ", lastPacketInBlock=" + lastPacketInBlock
         + ", offsetInBlock=" + offsetInBlock
+        + ", ackEnqueueNanoTime=" + ackEnqueueNanoTime
         + ")";
     }
   }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1357969&r1=1357968&r2=1357969&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Thu Jul  5 22:18:16 2012
@@ -73,8 +73,10 @@ public class DataNodeMetrics {
   @Metric MutableRate replaceBlockOp;
   @Metric MutableRate heartbeats;
   @Metric MutableRate blockReports;
+  @Metric MutableRate packetAckRoundTripTimeNanos;
 
-  @Metric MutableRate fsync;
+  @Metric MutableRate flushNanos;
+  @Metric MutableRate fsyncNanos;
   
   @Metric MutableRate sendDataPacketBlockedOnNetworkNanos;
   @Metric MutableRate sendDataPacketTransferNanos;
@@ -162,8 +164,16 @@ public class DataNodeMetrics {
     fsyncCount.incr();
   }
 
-  public void addFsync(long latency) {
-    fsync.add(latency);
+  public void addPacketAckRoundTripTimeNanos(long latencyNanos) {
+    packetAckRoundTripTimeNanos.add(latencyNanos);
+  }
+
+  public void addFlushNanos(long latencyNanos) {
+    flushNanos.add(latencyNanos);
+  }
+
+  public void addFsyncNanos(long latencyNanos) {
+    fsyncNanos.add(latencyNanos);
   }
 
   public void shutdown() {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1357969&r1=1357968&r2=1357969&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Thu Jul  5 22:18:16 2012
@@ -129,6 +129,7 @@ enum Status {
 message PipelineAckProto {
   required sint64 seqno = 1;
   repeated Status status = 2;
+  optional uint64 downstreamAckTimeNanos = 3 [default = 0];
 }
 
 /**

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java?rev=1357969&r1=1357968&r2=1357969&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java Thu Jul  5 22:18:16 2012
@@ -18,26 +18,25 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
-import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.*;
 
 import java.util.List;
+import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.junit.Test;
 
 public class TestDataNodeMetrics {
-  
-  MiniDFSCluster cluster = null;
-  FileSystem fs = null;
-  
+
   @Test
   public void testDataNodeMetrics() throws Exception {
     Configuration conf = new HdfsConfiguration();
@@ -82,4 +81,55 @@ public class TestDataNodeMetrics {
       if (cluster != null) {cluster.shutdown();}
     }
   }
+
+  @Test
+  public void testFlushMetric() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    try {
+      cluster.waitActive();
+      DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
+
+      Path testFile = new Path("/testFlushNanosMetric.txt");
+      DFSTestUtil.createFile(fs, testFile, 1, (short)1, new Random().nextLong());
+
+      List<DataNode> datanodes = cluster.getDataNodes();
+      DataNode datanode = datanodes.get(0);
+      MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
+      // Expect 2 flushes, 1 for the flush that occurs after writing, 1 that occurs
+      // on closing the data and metadata files.
+      assertCounter("FlushNanosNumOps", 2L, dnMetrics);
+    } finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
+
+  @Test
+  public void testRoundTripAckMetric() throws Exception {
+    final int DATANODE_COUNT = 2;
+
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_COUNT).build();
+    try {
+      cluster.waitActive();
+      DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
+
+      Path testFile = new Path("/testRoundTripAckMetric.txt");
+      DFSTestUtil.createFile(fs, testFile, 1, (short)DATANODE_COUNT,
+          new Random().nextLong());
+
+      boolean foundNonzeroPacketAckNumOps = false;
+      for (DataNode datanode : cluster.getDataNodes()) {
+        MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
+        if (getLongCounter("PacketAckRoundTripTimeNanosNumOps", dnMetrics) > 0) {
+          foundNonzeroPacketAckNumOps = true;
+        }
+      }
+      assertTrue(
+          "Expected at least one datanode to have reported PacketAckRoundTripTimeNanos metric",
+          foundNonzeroPacketAckNumOps);
+    } finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
 }