You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/07/06 00:18:17 UTC
svn commit: r1357969 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/
src/main/java/org/apache/hadoop/hdfs/server/datanode/
src/main/java/org/apache/hadoop/hdfs/server...
Author: todd
Date: Thu Jul 5 22:18:16 2012
New Revision: 1357969
URL: http://svn.apache.org/viewvc?rev=1357969&view=rev
Log:
HDFS-3170. Add more useful metrics for write latency. Contributed by Matthew Jacobs.
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1357969&r1=1357968&r2=1357969&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Jul 5 22:18:16 2012
@@ -96,6 +96,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3343. Improve metrics for DN read latency (Andrew Wang via todd)
+ HDFS-3170. Add more useful metrics for write latency (Matthew Jacobs via
+ todd)
+
OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java?rev=1357969&r1=1357968&r2=1357969&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java Thu Jul 5 22:18:16 2012
@@ -42,14 +42,25 @@ public class PipelineAck {
}
/**
- * Constructor
+ * Constructor assuming no next DN in pipeline
* @param seqno sequence number
* @param replies an array of replies
*/
public PipelineAck(long seqno, Status[] replies) {
+ this(seqno, replies, 0L);
+ }
+
+ /**
+ * Constructor
+ * @param seqno sequence number
+ * @param replies an array of replies
+ * @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline
+ */
+ public PipelineAck(long seqno, Status[] replies, long downstreamAckTimeNanos) {
proto = PipelineAckProto.newBuilder()
.setSeqno(seqno)
.addAllStatus(Arrays.asList(replies))
+ .setDownstreamAckTimeNanos(downstreamAckTimeNanos)
.build();
}
@@ -76,7 +87,15 @@ public class PipelineAck {
public Status getReply(int i) {
return proto.getStatus(i);
}
-
+
+ /**
+ * Get the time elapsed for downstream ack RTT in nanoseconds
+ * @return time elapsed for downstream ack in nanoseconds, 0 if no next DN in pipeline
+ */
+ public long getDownstreamAckTimeNanos() {
+ return proto.getDownstreamAckTimeNanos();
+ }
+
/**
* Check if this ack contains error status
* @return true if all statuses are SUCCESS
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1357969&r1=1357968&r2=1357969&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Jul 5 22:18:16 2012
@@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.d
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -252,15 +251,21 @@ class BlockReceiver implements Closeable
if (syncOnClose && (out != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount();
}
+ long flushTotalNanos = 0;
+ boolean measuredFlushTime = false;
// close checksum file
try {
if (checksumOut != null) {
+ long flushStartNanos = System.nanoTime();
checksumOut.flush();
+ long flushEndNanos = System.nanoTime();
if (syncOnClose && (cout instanceof FileOutputStream)) {
- long start = Util.now();
+ long fsyncStartNanos = flushEndNanos;
((FileOutputStream)cout).getChannel().force(true);
- datanode.metrics.addFsync(Util.now() - start);
+ datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
}
+ flushTotalNanos += flushEndNanos - flushStartNanos;
+ measuredFlushTime = true;
checksumOut.close();
checksumOut = null;
}
@@ -273,12 +278,16 @@ class BlockReceiver implements Closeable
// close block file
try {
if (out != null) {
+ long flushStartNanos = System.nanoTime();
out.flush();
+ long flushEndNanos = System.nanoTime();
if (syncOnClose && (out instanceof FileOutputStream)) {
- long start = Util.now();
+ long fsyncStartNanos = flushEndNanos;
((FileOutputStream)out).getChannel().force(true);
- datanode.metrics.addFsync(Util.now() - start);
+ datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
}
+ flushTotalNanos += flushEndNanos - flushStartNanos;
+ measuredFlushTime = true;
out.close();
out = null;
}
@@ -288,6 +297,9 @@ class BlockReceiver implements Closeable
finally{
IOUtils.closeStream(out);
}
+ if (measuredFlushTime) {
+ datanode.metrics.addFlushNanos(flushTotalNanos);
+ }
// disk check
if(ioe != null) {
datanode.checkDiskError(ioe);
@@ -303,21 +315,31 @@ class BlockReceiver implements Closeable
if (isSync && (out != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount();
}
+ long flushTotalNanos = 0;
if (checksumOut != null) {
+ long flushStartNanos = System.nanoTime();
checksumOut.flush();
+ long flushEndNanos = System.nanoTime();
if (isSync && (cout instanceof FileOutputStream)) {
- long start = Util.now();
+ long fsyncStartNanos = flushEndNanos;
((FileOutputStream)cout).getChannel().force(true);
- datanode.metrics.addFsync(Util.now() - start);
+ datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
}
+ flushTotalNanos += flushEndNanos - flushStartNanos;
}
if (out != null) {
+ long flushStartNanos = System.nanoTime();
out.flush();
+ long flushEndNanos = System.nanoTime();
if (isSync && (out instanceof FileOutputStream)) {
- long start = Util.now();
+ long fsyncStartNanos = flushEndNanos;
((FileOutputStream)out).getChannel().force(true);
- datanode.metrics.addFsync(Util.now() - start);
+ datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
}
+ flushTotalNanos += flushEndNanos - flushStartNanos;
+ }
+ if (checksumOut != null || out != null) {
+ datanode.metrics.addFlushNanos(flushTotalNanos);
}
}
@@ -446,7 +468,7 @@ class BlockReceiver implements Closeable
*/
private void readNextPacket() throws IOException {
/* This dances around buf a little bit, mainly to read
- * full packet with single read and to accept arbitarary size
+ * full packet with single read and to accept arbitrary size
* for next packet at the same time.
*/
if (buf == null) {
@@ -715,7 +737,7 @@ class BlockReceiver implements Closeable
replicaInfo.setLastChecksumAndDataLen(
offsetInBlock, lastChunkChecksum
);
-
+
datanode.metrics.incrBytesWritten(len);
dropOsCacheBehindWriter(offsetInBlock);
@@ -976,7 +998,8 @@ class BlockReceiver implements Closeable
synchronized void enqueue(final long seqno,
final boolean lastPacketInBlock, final long offsetInBlock) {
if (running) {
- final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock);
+ final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
+ System.nanoTime());
if(LOG.isDebugEnabled()) {
LOG.debug(myString + ": enqueue " + p);
}
@@ -1013,17 +1036,20 @@ class BlockReceiver implements Closeable
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (running && datanode.shouldRun && !lastPacketInBlock) {
+ long totalAckTimeNanos = 0;
boolean isInterrupted = false;
try {
Packet pkt = null;
long expected = -2;
PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO;
+ long ackRecvNanoTime = 0;
try {
if (type != PacketResponderType.LAST_IN_PIPELINE
&& !mirrorError) {
// read an ack from downstream datanode
ack.readFields(downstreamIn);
+ ackRecvNanoTime = System.nanoTime();
if (LOG.isDebugEnabled()) {
LOG.debug(myString + " got " + ack);
}
@@ -1049,6 +1075,22 @@ class BlockReceiver implements Closeable
throw new IOException(myString + "seqno: expected="
+ expected + ", received=" + seqno);
}
+ if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
+ // The total ack time includes the ack times of downstream nodes.
+ // The value is 0 if this responder doesn't have a downstream
+ // DN in the pipeline.
+ totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
+ // Report the elapsed time from ack send to ack receive minus
+ // the downstream ack time.
+ long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos();
+ if (ackTimeNanos < 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Calculated invalid ack time: " + ackTimeNanos + "ns.");
+ }
+ } else {
+ datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
+ }
+ }
lastPacketInBlock = pkt.lastPacketInBlock;
}
}
@@ -1116,7 +1158,7 @@ class BlockReceiver implements Closeable
replies[i+1] = ack.getReply(i);
}
}
- PipelineAck replyAck = new PipelineAck(expected, replies);
+ PipelineAck replyAck = new PipelineAck(expected, replies, totalAckTimeNanos);
if (replyAck.isSuccess() &&
pkt.offsetInBlock > replicaInfo.getBytesAcked())
@@ -1176,11 +1218,14 @@ class BlockReceiver implements Closeable
final long seqno;
final boolean lastPacketInBlock;
final long offsetInBlock;
+ final long ackEnqueueNanoTime;
- Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) {
+ Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock,
+ long ackEnqueueNanoTime) {
this.seqno = seqno;
this.lastPacketInBlock = lastPacketInBlock;
this.offsetInBlock = offsetInBlock;
+ this.ackEnqueueNanoTime = ackEnqueueNanoTime;
}
@Override
@@ -1188,6 +1233,7 @@ class BlockReceiver implements Closeable
return getClass().getSimpleName() + "(seqno=" + seqno
+ ", lastPacketInBlock=" + lastPacketInBlock
+ ", offsetInBlock=" + offsetInBlock
+ + ", ackEnqueueNanoTime=" + ackEnqueueNanoTime
+ ")";
}
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1357969&r1=1357968&r2=1357969&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Thu Jul 5 22:18:16 2012
@@ -73,8 +73,10 @@ public class DataNodeMetrics {
@Metric MutableRate replaceBlockOp;
@Metric MutableRate heartbeats;
@Metric MutableRate blockReports;
+ @Metric MutableRate packetAckRoundTripTimeNanos;
- @Metric MutableRate fsync;
+ @Metric MutableRate flushNanos;
+ @Metric MutableRate fsyncNanos;
@Metric MutableRate sendDataPacketBlockedOnNetworkNanos;
@Metric MutableRate sendDataPacketTransferNanos;
@@ -162,8 +164,16 @@ public class DataNodeMetrics {
fsyncCount.incr();
}
- public void addFsync(long latency) {
- fsync.add(latency);
+ public void addPacketAckRoundTripTimeNanos(long latencyNanos) {
+ packetAckRoundTripTimeNanos.add(latencyNanos);
+ }
+
+ public void addFlushNanos(long latencyNanos) {
+ flushNanos.add(latencyNanos);
+ }
+
+ public void addFsyncNanos(long latencyNanos) {
+ fsyncNanos.add(latencyNanos);
}
public void shutdown() {
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1357969&r1=1357968&r2=1357969&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Thu Jul 5 22:18:16 2012
@@ -129,6 +129,7 @@ enum Status {
message PipelineAckProto {
required sint64 seqno = 1;
repeated Status status = 2;
+ optional uint64 downstreamAckTimeNanos = 3 [default = 0];
}
/**
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java?rev=1357969&r1=1357968&r2=1357969&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java Thu Jul 5 22:18:16 2012
@@ -18,26 +18,25 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
-import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.*;
import java.util.List;
+import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.junit.Test;
public class TestDataNodeMetrics {
-
- MiniDFSCluster cluster = null;
- FileSystem fs = null;
-
+
@Test
public void testDataNodeMetrics() throws Exception {
Configuration conf = new HdfsConfiguration();
@@ -82,4 +81,55 @@ public class TestDataNodeMetrics {
if (cluster != null) {cluster.shutdown();}
}
}
+
+ @Test
+ public void testFlushMetric() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ try {
+ cluster.waitActive();
+ DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
+
+ Path testFile = new Path("/testFlushNanosMetric.txt");
+ DFSTestUtil.createFile(fs, testFile, 1, (short)1, new Random().nextLong());
+
+ List<DataNode> datanodes = cluster.getDataNodes();
+ DataNode datanode = datanodes.get(0);
+ MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
+ // Expect 2 flushes, 1 for the flush that occurs after writing, 1 that occurs
+ // on closing the data and metadata files.
+ assertCounter("FlushNanosNumOps", 2L, dnMetrics);
+ } finally {
+ if (cluster != null) {cluster.shutdown();}
+ }
+ }
+
+ @Test
+ public void testRoundTripAckMetric() throws Exception {
+ final int DATANODE_COUNT = 2;
+
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_COUNT).build();
+ try {
+ cluster.waitActive();
+ DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
+
+ Path testFile = new Path("/testRoundTripAckMetric.txt");
+ DFSTestUtil.createFile(fs, testFile, 1, (short)DATANODE_COUNT,
+ new Random().nextLong());
+
+ boolean foundNonzeroPacketAckNumOps = false;
+ for (DataNode datanode : cluster.getDataNodes()) {
+ MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
+ if (getLongCounter("PacketAckRoundTripTimeNanosNumOps", dnMetrics) > 0) {
+ foundNonzeroPacketAckNumOps = true;
+ }
+ }
+ assertTrue(
+ "Expected at least one datanode to have reported PacketAckRoundTripTimeNanos metric",
+ foundNonzeroPacketAckNumOps);
+ } finally {
+ if (cluster != null) {cluster.shutdown();}
+ }
+ }
}