You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2014/10/23 21:59:47 UTC
git commit: HDFS-7222. Expose DataNode network errors as a metric.
(Charles Lamb via wang)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 d27cd5ad7 -> d3b9e9be2
HDFS-7222. Expose DataNode network errors as a metric. (Charles Lamb via wang)
(cherry picked from commit 86cad007d7d6366b293bb9a073814889081c8662)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3b9e9be
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3b9e9be
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3b9e9be
Branch: refs/heads/branch-2
Commit: d3b9e9be24073f83e101c6a82546b9aef3878689
Parents: d27cd5a
Author: Andrew Wang <wa...@apache.org>
Authored: Thu Oct 23 12:53:01 2014 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Thu Oct 23 12:53:39 2014 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../server/datanode/DataNodeFaultInjector.java | 4 ++
.../hdfs/server/datanode/DataXceiver.java | 24 +++++++++++
.../datanode/metrics/DataNodeMetrics.java | 7 ++++
.../server/datanode/TestDataNodeMetrics.java | 42 ++++++++++++++++++++
5 files changed, 79 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b9e9be/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b6ff34e..1256550 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -39,6 +39,8 @@ Release 2.7.0 - UNRELEASED
HDFS-6824. Additional user documentation for HDFS encryption. (wang)
+ HDFS-7222. Expose DataNode network errors as a metric. (Charles Lamb via wang)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b9e9be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index 31ac80b..478099d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -21,6 +21,8 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
+import java.io.IOException;
+
/**
* Used for injecting faults in DFSClient and DFSOutputStream tests.
* Calls into this are a no-op in production code.
@@ -35,4 +37,6 @@ public class DataNodeFaultInjector {
}
public void getHdfsBlocksMetadata() {}
+
+ public void writeBlockAfterFlush() throws IOException {}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b9e9be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 67eb941..6fc819d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -211,6 +211,7 @@ class DataXceiver extends Receiver implements Runnable {
LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
}
} else {
+ datanode.metrics.incrDatanodeNetworkErrors();
throw err;
}
break;
@@ -500,6 +501,7 @@ class DataXceiver extends Receiver implements Runnable {
} catch (IOException ioe) {
LOG.debug("Error reading client status response. Will close connection.", ioe);
IOUtils.closeStream(out);
+ datanode.metrics.incrDatanodeNetworkErrors();
}
} else {
IOUtils.closeStream(out);
@@ -520,6 +522,7 @@ class DataXceiver extends Receiver implements Runnable {
*/
LOG.warn(dnR + ":Got exception while serving " + block + " to "
+ remoteAddress, ioe);
+ datanode.metrics.incrDatanodeNetworkErrors();
throw ioe;
} finally {
IOUtils.closeStream(blockSender);
@@ -657,6 +660,8 @@ class DataXceiver extends Receiver implements Runnable {
mirrorOut.flush();
+ DataNodeFaultInjector.get().writeBlockAfterFlush();
+
// read connect ack (only for clients, not for replication req)
if (isClient) {
BlockOpResponseProto connectAck =
@@ -695,6 +700,7 @@ class DataXceiver extends Receiver implements Runnable {
LOG.info(datanode + ":Exception transfering " +
block + " to mirror " + mirrorNode +
"- continuing without the mirror", e);
+ datanode.metrics.incrDatanodeNetworkErrors();
}
}
}
@@ -749,6 +755,7 @@ class DataXceiver extends Receiver implements Runnable {
} catch (IOException ioe) {
LOG.info("opWriteBlock " + block + " received exception " + ioe);
+ datanode.metrics.incrDatanodeNetworkErrors();
throw ioe;
} finally {
// close all opened streams
@@ -782,6 +789,10 @@ class DataXceiver extends Receiver implements Runnable {
datanode.transferReplicaForPipelineRecovery(blk, targets,
targetStorageTypes, clientName);
writeResponse(Status.SUCCESS, null, out);
+ } catch (IOException ioe) {
+ LOG.info("transferBlock " + blk + " received exception " + ioe);
+ datanode.metrics.incrDatanodeNetworkErrors();
+ throw ioe;
} finally {
IOUtils.closeStream(out);
}
@@ -873,6 +884,10 @@ class DataXceiver extends Receiver implements Runnable {
.build()
.writeDelimitedTo(out);
out.flush();
+ } catch (IOException ioe) {
+ LOG.info("blockChecksum " + block + " received exception " + ioe);
+ datanode.metrics.incrDatanodeNetworkErrors();
+ throw ioe;
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(checksumIn);
@@ -938,6 +953,7 @@ class DataXceiver extends Receiver implements Runnable {
} catch (IOException ioe) {
isOpSuccess = false;
LOG.info("opCopyBlock " + block + " received exception " + ioe);
+ datanode.metrics.incrDatanodeNetworkErrors();
throw ioe;
} finally {
dataXceiverServer.balanceThrottler.release();
@@ -995,6 +1011,7 @@ class DataXceiver extends Receiver implements Runnable {
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
DataOutputStream replyOut = new DataOutputStream(getOutputStream());
+ boolean IoeDuringCopyBlockOperation = false;
try {
// get the output stream to the proxy
final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
@@ -1022,7 +1039,9 @@ class DataXceiver extends Receiver implements Runnable {
HdfsConstants.IO_FILE_BUFFER_SIZE));
/* send request to the proxy */
+ IoeDuringCopyBlockOperation = true;
new Sender(proxyOut).copyBlock(block, blockToken);
+ IoeDuringCopyBlockOperation = false;
// receive the response from the proxy
@@ -1065,6 +1084,10 @@ class DataXceiver extends Receiver implements Runnable {
opStatus = ERROR;
errMsg = "opReplaceBlock " + block + " received exception " + ioe;
LOG.info(errMsg);
+ if (!IoeDuringCopyBlockOperation) {
+ // Don't double count IO errors
+ datanode.metrics.incrDatanodeNetworkErrors();
+ }
throw ioe;
} finally {
// receive the last byte that indicates the proxy released its thread resource
@@ -1083,6 +1106,7 @@ class DataXceiver extends Receiver implements Runnable {
sendResponse(opStatus, errMsg);
} catch (IOException ioe) {
LOG.warn("Error writing reply back to " + peer.getRemoteAddressString());
+ datanode.metrics.incrDatanodeNetworkErrors();
}
IOUtils.closeStream(proxyOut);
IOUtils.closeStream(blockReceiver);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b9e9be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index 57f12db..09ad3da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -89,6 +89,9 @@ public class DataNodeMetrics {
@Metric MutableCounterLong volumeFailures;
+ @Metric("Count of network errors on the datanode")
+ MutableCounterLong datanodeNetworkErrors;
+
@Metric MutableRate readBlockOp;
@Metric MutableRate writeBlockOp;
@Metric MutableRate blockChecksumOp;
@@ -296,6 +299,10 @@ public class DataNodeMetrics {
volumeFailures.incr();
}
+ public void incrDatanodeNetworkErrors() {
+ datanodeNetworkErrors.incr();
+ }
+
/** Increment for getBlockLocalPathInfo calls */
public void incrBlocksGetLocalPathInfo() {
blocksGetLocalPathInfo.incr();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3b9e9be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
index 9b90d41..90112af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
@@ -25,8 +25,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.List;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -38,10 +43,13 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.junit.Test;
+import org.mockito.Mockito;
public class TestDataNodeMetrics {
+ private static final Log LOG = LogFactory.getLog(TestDataNodeMetrics.class);
@Test
public void testDataNodeMetrics() throws Exception {
@@ -186,4 +194,38 @@ public class TestDataNodeMetrics {
}
}
}
+
+ @Test(timeout=60000)
+ public void testTimeoutMetric() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ final Path path = new Path("/test");
+
+ final MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+
+ final List<FSDataOutputStream> streams = Lists.newArrayList();
+ try {
+ final FSDataOutputStream out =
+ cluster.getFileSystem().create(path, (short) 2);
+ final DataNodeFaultInjector injector = Mockito.mock
+ (DataNodeFaultInjector.class);
+ Mockito.doThrow(new IOException("mock IOException")).
+ when(injector).
+ writeBlockAfterFlush();
+ DataNodeFaultInjector.instance = injector;
+ streams.add(out);
+ out.writeBytes("old gs data\n");
+ out.hflush();
+
+ final MetricsRecordBuilder dnMetrics =
+ getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
+ assertCounter("DatanodeNetworkErrors", 1L, dnMetrics);
+ } finally {
+ IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ DataNodeFaultInjector.instance = new DataNodeFaultInjector();
+ }
+ }
}