You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2020/05/23 16:58:40 UTC

[hadoop] branch trunk updated: HDFS-12288. Fix DataNode's xceiver count calculation. Contributed by Lisheng Sun.

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6e04b00  HDFS-12288. Fix DataNode's xceiver count calculation. Contributed by Lisheng Sun.
6e04b00 is described below

commit 6e04b00df1bf4f0a45571c9fc4361e4e8a05f7ee
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Sat May 23 09:58:19 2020 -0700

    HDFS-12288. Fix DataNode's xceiver count calculation. Contributed by Lisheng Sun.
---
 .../hdfs/server/datanode/BPServiceActor.java       |  2 +-
 .../hadoop/hdfs/server/datanode/BlockReceiver.java |  4 ++
 .../hdfs/server/datanode/BlockRecoveryWorker.java  | 23 ++++++----
 .../hadoop/hdfs/server/datanode/DataNode.java      | 17 ++++++-
 .../hdfs/server/datanode/DataNodeMXBean.java       |  8 +++-
 .../server/datanode/metrics/DataNodeMetrics.java   | 42 +++++++++++++++++
 .../hadoop/hdfs/TestDataTransferKeepalive.java     |  7 ++-
 .../hdfs/server/datanode/TestDataNodeMetrics.java  | 52 ++++++++++++++++++++++
 .../namenode/TestNamenodeCapacityReport.java       |  9 ++--
 9 files changed, 141 insertions(+), 23 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index a436c94..c449619 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -544,7 +544,7 @@ class BPServiceActor implements Runnable {
         dn.getFSDataset().getCacheCapacity(),
         dn.getFSDataset().getCacheUsed(),
         dn.getXmitsInProgress(),
-        dn.getXceiverCount(),
+        dn.getActiveTransferThreadCount(),
         numFailedVolumes,
         volumeFailureSummary,
         requestBlockReportLease,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 2231aea..a153d49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -1368,6 +1368,7 @@ class BlockReceiver implements Closeable {
      */
     @Override
     public void run() {
+      datanode.metrics.incrDataNodePacketResponderCount();
       boolean lastPacketInBlock = false;
       final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
       while (isRunning() && !lastPacketInBlock) {
@@ -1505,6 +1506,9 @@ class BlockReceiver implements Closeable {
           }
         }
       }
+      // Any exception will be caught and processed in the previous loop, so we
+      // will always arrive here when the thread exiting
+      datanode.metrics.decrDataNodePacketResponderCount();
       LOG.info(myString + " terminating");
     }
     
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
index db52d07..f1c396f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
@@ -600,17 +600,22 @@ public class BlockRecoveryWorker {
     Daemon d = new Daemon(datanode.threadGroup, new Runnable() {
       @Override
       public void run() {
-        for(RecoveringBlock b : blocks) {
-          try {
-            logRecoverBlock(who, b);
-            if (b.isStriped()) {
-              new RecoveryTaskStriped((RecoveringStripedBlock) b).recover();
-            } else {
-              new RecoveryTaskContiguous(b).recover();
+        datanode.metrics.incrDataNodeBlockRecoveryWorkerCount();
+        try {
+          for (RecoveringBlock b : blocks) {
+            try {
+              logRecoverBlock(who, b);
+              if (b.isStriped()) {
+                new RecoveryTaskStriped((RecoveringStripedBlock) b).recover();
+              } else {
+                new RecoveryTaskContiguous(b).recover();
+              }
+            } catch (IOException e) {
+              LOG.warn("recover Block: {} FAILED: {}", b, e);
             }
-          } catch (IOException e) {
-            LOG.warn("recoverBlocks FAILED: " + b, e);
           }
+        } finally {
+          datanode.metrics.decrDataNodeBlockRecoveryWorkerCount();
         }
       }
     });
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 20ea7c7..f96682f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2146,6 +2146,8 @@ public class DataNode extends ReconfigurableBase
     }
     if (metrics != null) {
       metrics.setDataNodeActiveXceiversCount(0);
+      metrics.setDataNodePacketResponderCount(0);
+      metrics.setDataNodeBlockRecoveryWorkerCount(0);
     }
 
    // IPC server needs to be shutdown late in the process, otherwise
@@ -2244,7 +2246,20 @@ public class DataNode extends ReconfigurableBase
   /** Number of concurrent xceivers per node. */
   @Override // DataNodeMXBean
   public int getXceiverCount() {
-    return threadGroup == null ? 0 : threadGroup.activeCount();
+    if (metrics == null) {
+      return 0;
+    }
+    return metrics.getDataNodeActiveXceiverCount();
+  }
+
+  @Override // DataNodeMXBean
+  public int getActiveTransferThreadCount() {
+    if (metrics == null) {
+      return 0;
+    }
+    return metrics.getDataNodeActiveXceiverCount()
+        + metrics.getDataNodePacketResponderCount()
+        + metrics.getDataNodeBlockRecoveryWorkerCount();
   }
 
   @Override // DataNodeMXBean
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
index 9d11e14..7a8f59b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
@@ -104,12 +104,16 @@ public interface DataNodeMXBean {
   public String getClusterId();
 
   /**
-   * Returns an estimate of the number of Datanode threads
-   * actively transferring blocks.
+   * Returns the number of active xceivers.
    */
   public int getXceiverCount();
 
   /**
+   * Returns the number of Datanode threads actively transferring blocks.
+   */
+  int getActiveTransferThreadCount();
+
+  /**
    * Returns an estimate of the number of data replication/reconstruction tasks
    * running currently.
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index 00093f7..16d1561 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -109,6 +109,12 @@ public class DataNodeMetrics {
   @Metric("Count of active dataNode xceivers")
   private MutableGaugeInt dataNodeActiveXceiversCount;
 
+  @Metric("Count of active DataNode packetResponder")
+  private MutableGaugeInt dataNodePacketResponderCount;
+
+  @Metric("Count of active DataNode block recovery worker")
+  private MutableGaugeInt dataNodeBlockRecoveryWorkerCount;
+
   @Metric MutableRate readBlockOp;
   @Metric MutableRate writeBlockOp;
   @Metric MutableRate blockChecksumOp;
@@ -535,6 +541,42 @@ public class DataNodeMetrics {
     dataNodeActiveXceiversCount.set(value);
   }
 
+  public int getDataNodeActiveXceiverCount() {
+    return dataNodeActiveXceiversCount.value();
+  }
+
+  public void incrDataNodePacketResponderCount() {
+    dataNodePacketResponderCount.incr();
+  }
+
+  public void decrDataNodePacketResponderCount() {
+    dataNodePacketResponderCount.decr();
+  }
+
+  public void setDataNodePacketResponderCount(int value) {
+    dataNodePacketResponderCount.set(value);
+  }
+
+  public int getDataNodePacketResponderCount() {
+    return dataNodePacketResponderCount.value();
+  }
+
+  public void incrDataNodeBlockRecoveryWorkerCount() {
+    dataNodeBlockRecoveryWorkerCount.incr();
+  }
+
+  public void decrDataNodeBlockRecoveryWorkerCount() {
+    dataNodeBlockRecoveryWorkerCount.decr();
+  }
+
+  public void setDataNodeBlockRecoveryWorkerCount(int value) {
+    dataNodeBlockRecoveryWorkerCount.set(value);
+  }
+
+  public int getDataNodeBlockRecoveryWorkerCount() {
+    return dataNodeBlockRecoveryWorkerCount.value();
+  }
+
   public void incrECDecodingTime(long decodingTimeNanos) {
     ecDecodingTimeNanos.incr(decodingTimeNanos);
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
index 9e3ddcf..b9ad785 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
@@ -253,12 +253,11 @@ public class TestDataTransferKeepalive {
   }
 
   /**
-   * Returns the datanode's xceiver count, but subtracts 1, since the
-   * DataXceiverServer counts as one.
+   * Returns the datanode's active xceiver count.
    * 
-   * @return int xceiver count, not including DataXceiverServer
+   * @return the datanode's active xceivers count.
    */
   private int getXceiverCountWithoutServer() {
-    return dn.getXceiverCount() - 1;
+    return dn.getXceiverCount();
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
index 81d144f..9f912d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.*;
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.management.ManagementFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -420,6 +421,57 @@ public class TestDataNodeMetrics {
   }
 
   @Test
+  public void testDataNodeMXBeanActiveThreadCount() throws Exception {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    FileSystem fs = cluster.getFileSystem();
+    Path p = new Path("/testfile");
+
+    try {
+      List<DataNode> datanodes = cluster.getDataNodes();
+      assertEquals(1, datanodes.size());
+      DataNode datanode = datanodes.get(0);
+
+      // create a xceiver thread for write
+      FSDataOutputStream os = fs.create(p);
+      for (int i = 0; i < 1024; i++) {
+        os.write("testdatastr".getBytes());
+      }
+      os.hsync();
+      // create a xceiver thread for read
+      InputStream is = fs.open(p);
+      is.read(new byte[16], 0, 4);
+
+      int threadCount = datanode.threadGroup.activeCount();
+      assertTrue(threadCount > 0);
+      Thread[] threads = new Thread[threadCount];
+      datanode.threadGroup.enumerate(threads);
+      int xceiverCount = 0;
+      int responderCount = 0;
+      int recoveryWorkerCount = 0;
+      for (Thread t : threads) {
+        if (t.getName().contains("DataXceiver for client")) {
+          xceiverCount++;
+        } else if (t.getName().contains("PacketResponder")) {
+          responderCount++;
+        }
+      }
+      assertEquals(2, xceiverCount);
+      assertEquals(1, responderCount);
+      assertEquals(0, recoveryWorkerCount); //not easy to produce
+      assertEquals(xceiverCount, datanode.getXceiverCount());
+      assertEquals(xceiverCount + responderCount + recoveryWorkerCount,
+          datanode.getActiveTransferThreadCount());
+
+      is.close();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test
   public void testDNShouldNotDeleteBlockONTooManyOpenFiles()
       throws Exception {
     Configuration conf = new HdfsConfiguration();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
index 4343b0a..c3abc12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
@@ -226,9 +226,9 @@ public class TestNamenodeCapacityReport {
       triggerHeartbeats(datanodes);
       
       // check that all nodes are live and in service
-      int expectedTotalLoad = nodes;  // xceiver server adds 1 to load
+      int expectedTotalLoad = 0;
       int expectedInServiceNodes = nodes;
-      int expectedInServiceLoad = nodes;
+      int expectedInServiceLoad = 0;
       checkClusterHealth(nodes, namesystem, expectedTotalLoad,
           expectedInServiceNodes, expectedInServiceLoad);
 
@@ -333,10 +333,7 @@ public class TestNamenodeCapacityReport {
           expectedInServiceNodes--;
         }
         assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
-        // live nodes always report load of 1.  no nodes is load 0
-        double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0;
-        assertEquals((double)expectedXceiverAvg,
-            getInServiceXceiverAverage(namesystem), EPSILON);
+        assertEquals(0, getInServiceXceiverAverage(namesystem), EPSILON);
       }
       // final sanity check
       checkClusterHealth(0, namesystem, 0.0, 0, 0.0);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org