You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2020/05/23 16:58:40 UTC
[hadoop] branch trunk updated: HDFS-12288. Fix DataNode's xceiver
count calculation. Contributed by Lisheng Sun.
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6e04b00 HDFS-12288. Fix DataNode's xceiver count calculation. Contributed by Lisheng Sun.
6e04b00 is described below
commit 6e04b00df1bf4f0a45571c9fc4361e4e8a05f7ee
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Sat May 23 09:58:19 2020 -0700
HDFS-12288. Fix DataNode's xceiver count calculation. Contributed by Lisheng Sun.
---
.../hdfs/server/datanode/BPServiceActor.java | 2 +-
.../hadoop/hdfs/server/datanode/BlockReceiver.java | 4 ++
.../hdfs/server/datanode/BlockRecoveryWorker.java | 23 ++++++----
.../hadoop/hdfs/server/datanode/DataNode.java | 17 ++++++-
.../hdfs/server/datanode/DataNodeMXBean.java | 8 +++-
.../server/datanode/metrics/DataNodeMetrics.java | 42 +++++++++++++++++
.../hadoop/hdfs/TestDataTransferKeepalive.java | 7 ++-
.../hdfs/server/datanode/TestDataNodeMetrics.java | 52 ++++++++++++++++++++++
.../namenode/TestNamenodeCapacityReport.java | 9 ++--
9 files changed, 141 insertions(+), 23 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index a436c94..c449619 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -544,7 +544,7 @@ class BPServiceActor implements Runnable {
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
- dn.getXceiverCount(),
+ dn.getActiveTransferThreadCount(),
numFailedVolumes,
volumeFailureSummary,
requestBlockReportLease,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 2231aea..a153d49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -1368,6 +1368,7 @@ class BlockReceiver implements Closeable {
*/
@Override
public void run() {
+ datanode.metrics.incrDataNodePacketResponderCount();
boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (isRunning() && !lastPacketInBlock) {
@@ -1505,6 +1506,9 @@ class BlockReceiver implements Closeable {
}
}
}
+ // Any exception will be caught and processed in the previous loop, so we
+ // will always arrive here when the thread exiting
+ datanode.metrics.decrDataNodePacketResponderCount();
LOG.info(myString + " terminating");
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
index db52d07..f1c396f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
@@ -600,17 +600,22 @@ public class BlockRecoveryWorker {
Daemon d = new Daemon(datanode.threadGroup, new Runnable() {
@Override
public void run() {
- for(RecoveringBlock b : blocks) {
- try {
- logRecoverBlock(who, b);
- if (b.isStriped()) {
- new RecoveryTaskStriped((RecoveringStripedBlock) b).recover();
- } else {
- new RecoveryTaskContiguous(b).recover();
+ datanode.metrics.incrDataNodeBlockRecoveryWorkerCount();
+ try {
+ for (RecoveringBlock b : blocks) {
+ try {
+ logRecoverBlock(who, b);
+ if (b.isStriped()) {
+ new RecoveryTaskStriped((RecoveringStripedBlock) b).recover();
+ } else {
+ new RecoveryTaskContiguous(b).recover();
+ }
+ } catch (IOException e) {
+ LOG.warn("recover Block: {} FAILED: {}", b, e);
}
- } catch (IOException e) {
- LOG.warn("recoverBlocks FAILED: " + b, e);
}
+ } finally {
+ datanode.metrics.decrDataNodeBlockRecoveryWorkerCount();
}
}
});
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 20ea7c7..f96682f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2146,6 +2146,8 @@ public class DataNode extends ReconfigurableBase
}
if (metrics != null) {
metrics.setDataNodeActiveXceiversCount(0);
+ metrics.setDataNodePacketResponderCount(0);
+ metrics.setDataNodeBlockRecoveryWorkerCount(0);
}
// IPC server needs to be shutdown late in the process, otherwise
@@ -2244,7 +2246,20 @@ public class DataNode extends ReconfigurableBase
/** Number of concurrent xceivers per node. */
@Override // DataNodeMXBean
public int getXceiverCount() {
- return threadGroup == null ? 0 : threadGroup.activeCount();
+ if (metrics == null) {
+ return 0;
+ }
+ return metrics.getDataNodeActiveXceiverCount();
+ }
+
+ @Override // DataNodeMXBean
+ public int getActiveTransferThreadCount() {
+ if (metrics == null) {
+ return 0;
+ }
+ return metrics.getDataNodeActiveXceiverCount()
+ + metrics.getDataNodePacketResponderCount()
+ + metrics.getDataNodeBlockRecoveryWorkerCount();
}
@Override // DataNodeMXBean
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
index 9d11e14..7a8f59b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
@@ -104,12 +104,16 @@ public interface DataNodeMXBean {
public String getClusterId();
/**
- * Returns an estimate of the number of Datanode threads
- * actively transferring blocks.
+ * Returns the number of active xceivers.
*/
public int getXceiverCount();
/**
+ * Returns the number of Datanode threads actively transferring blocks.
+ */
+ int getActiveTransferThreadCount();
+
+ /**
* Returns an estimate of the number of data replication/reconstruction tasks
* running currently.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index 00093f7..16d1561 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -109,6 +109,12 @@ public class DataNodeMetrics {
@Metric("Count of active dataNode xceivers")
private MutableGaugeInt dataNodeActiveXceiversCount;
+ @Metric("Count of active DataNode packetResponder")
+ private MutableGaugeInt dataNodePacketResponderCount;
+
+ @Metric("Count of active DataNode block recovery worker")
+ private MutableGaugeInt dataNodeBlockRecoveryWorkerCount;
+
@Metric MutableRate readBlockOp;
@Metric MutableRate writeBlockOp;
@Metric MutableRate blockChecksumOp;
@@ -535,6 +541,42 @@ public class DataNodeMetrics {
dataNodeActiveXceiversCount.set(value);
}
+ public int getDataNodeActiveXceiverCount() {
+ return dataNodeActiveXceiversCount.value();
+ }
+
+ public void incrDataNodePacketResponderCount() {
+ dataNodePacketResponderCount.incr();
+ }
+
+ public void decrDataNodePacketResponderCount() {
+ dataNodePacketResponderCount.decr();
+ }
+
+ public void setDataNodePacketResponderCount(int value) {
+ dataNodePacketResponderCount.set(value);
+ }
+
+ public int getDataNodePacketResponderCount() {
+ return dataNodePacketResponderCount.value();
+ }
+
+ public void incrDataNodeBlockRecoveryWorkerCount() {
+ dataNodeBlockRecoveryWorkerCount.incr();
+ }
+
+ public void decrDataNodeBlockRecoveryWorkerCount() {
+ dataNodeBlockRecoveryWorkerCount.decr();
+ }
+
+ public void setDataNodeBlockRecoveryWorkerCount(int value) {
+ dataNodeBlockRecoveryWorkerCount.set(value);
+ }
+
+ public int getDataNodeBlockRecoveryWorkerCount() {
+ return dataNodeBlockRecoveryWorkerCount.value();
+ }
+
public void incrECDecodingTime(long decodingTimeNanos) {
ecDecodingTimeNanos.incr(decodingTimeNanos);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
index 9e3ddcf..b9ad785 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
@@ -253,12 +253,11 @@ public class TestDataTransferKeepalive {
}
/**
- * Returns the datanode's xceiver count, but subtracts 1, since the
- * DataXceiverServer counts as one.
+ * Returns the datanode's active xceiver count.
*
- * @return int xceiver count, not including DataXceiverServer
+ * @return the datanode's active xceivers count.
*/
private int getXceiverCountWithoutServer() {
- return dn.getXceiverCount() - 1;
+ return dn.getXceiverCount();
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
index 81d144f..9f912d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.*;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -420,6 +421,57 @@ public class TestDataNodeMetrics {
}
@Test
+ public void testDataNodeMXBeanActiveThreadCount() throws Exception {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ FileSystem fs = cluster.getFileSystem();
+ Path p = new Path("/testfile");
+
+ try {
+ List<DataNode> datanodes = cluster.getDataNodes();
+ assertEquals(1, datanodes.size());
+ DataNode datanode = datanodes.get(0);
+
+ // create a xceiver thread for write
+ FSDataOutputStream os = fs.create(p);
+ for (int i = 0; i < 1024; i++) {
+ os.write("testdatastr".getBytes());
+ }
+ os.hsync();
+ // create a xceiver thread for read
+ InputStream is = fs.open(p);
+ is.read(new byte[16], 0, 4);
+
+ int threadCount = datanode.threadGroup.activeCount();
+ assertTrue(threadCount > 0);
+ Thread[] threads = new Thread[threadCount];
+ datanode.threadGroup.enumerate(threads);
+ int xceiverCount = 0;
+ int responderCount = 0;
+ int recoveryWorkerCount = 0;
+ for (Thread t : threads) {
+ if (t.getName().contains("DataXceiver for client")) {
+ xceiverCount++;
+ } else if (t.getName().contains("PacketResponder")) {
+ responderCount++;
+ }
+ }
+ assertEquals(2, xceiverCount);
+ assertEquals(1, responderCount);
+ assertEquals(0, recoveryWorkerCount); //not easy to produce
+ assertEquals(xceiverCount, datanode.getXceiverCount());
+ assertEquals(xceiverCount + responderCount + recoveryWorkerCount,
+ datanode.getActiveTransferThreadCount());
+
+ is.close();
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ @Test
public void testDNShouldNotDeleteBlockONTooManyOpenFiles()
throws Exception {
Configuration conf = new HdfsConfiguration();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
index 4343b0a..c3abc12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
@@ -226,9 +226,9 @@ public class TestNamenodeCapacityReport {
triggerHeartbeats(datanodes);
// check that all nodes are live and in service
- int expectedTotalLoad = nodes; // xceiver server adds 1 to load
+ int expectedTotalLoad = 0;
int expectedInServiceNodes = nodes;
- int expectedInServiceLoad = nodes;
+ int expectedInServiceLoad = 0;
checkClusterHealth(nodes, namesystem, expectedTotalLoad,
expectedInServiceNodes, expectedInServiceLoad);
@@ -333,10 +333,7 @@ public class TestNamenodeCapacityReport {
expectedInServiceNodes--;
}
assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
- // live nodes always report load of 1. no nodes is load 0
- double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0;
- assertEquals((double)expectedXceiverAvg,
- getInServiceXceiverAverage(namesystem), EPSILON);
+ assertEquals(0, getInServiceXceiverAverage(namesystem), EPSILON);
}
// final sanity check
checkClusterHealth(0, namesystem, 0.0, 0, 0.0);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org