You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2021/03/15 03:36:30 UTC

[hadoop] branch trunk updated: HDFS-15809. DeadNodeDetector does not remove live nodes from dead node set. Contributed by Jinglun.

This is an automated email from the ASF dual-hosted git repository.

sunlisheng pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7025f399 HDFS-15809. DeadNodeDetector does not remove live nodes from dead node set. Contributed by Jinglun.
7025f399 is described below

commit 7025f39944e628345109b43cba2cd4d49ca8cc6b
Author: sunlisheng <su...@apache.org>
AuthorDate: Mon Mar 15 11:34:13 2021 +0800

    HDFS-15809. DeadNodeDetector does not remove live nodes from dead node set. Contributed by Jinglun.
---
 .../org/apache/hadoop/hdfs/DeadNodeDetector.java   | 85 +++++++++++++---------
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   | 10 +--
 .../src/main/resources/hdfs-default.xml            | 20 ++---
 .../apache/hadoop/hdfs/TestDeadNodeDetection.java  | 71 ++++++++++++------
 4 files changed, 107 insertions(+), 79 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
index e17f261..cd46551 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
@@ -29,9 +29,9 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.Deque;
+import java.util.LinkedList;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -40,8 +40,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT;
@@ -54,9 +52,9 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_DEFAULT;
 
 /**
  * Detect the dead nodes in advance, and share this information among all the
@@ -74,7 +72,7 @@ public class DeadNodeDetector extends Daemon {
   /**
    * Waiting time when DeadNodeDetector's state is idle.
    */
-  private static final long IDLE_SLEEP_MS = 10000;
+  private final long idleSleepMs;
 
   /**
    * Client context name.
@@ -114,16 +112,6 @@ public class DeadNodeDetector extends Daemon {
   private long suspectNodeDetectInterval = 0;
 
   /**
-   * The max queue size of probing dead node.
-   */
-  private int maxDeadNodesProbeQueueLen = 0;
-
-  /**
-   * The max queue size of probing suspect node.
-   */
-  private int maxSuspectNodesProbeQueueLen;
-
-  /**
    * Connection timeout for probing dead node in milliseconds.
    */
   private long probeConnectionTimeoutMs;
@@ -131,12 +119,12 @@ public class DeadNodeDetector extends Daemon {
   /**
    * The dead node probe queue.
    */
-  private Queue<DatanodeInfo> deadNodesProbeQueue;
+  private UniqueQueue<DatanodeInfo> deadNodesProbeQueue;
 
   /**
    * The suspect node probe queue.
    */
-  private Queue<DatanodeInfo> suspectNodesProbeQueue;
+  private UniqueQueue<DatanodeInfo> suspectNodesProbeQueue;
 
   /**
    * The thread pool of probing dead node.
@@ -182,6 +170,32 @@ public class DeadNodeDetector extends Daemon {
   }
 
   /**
+   * The thread safe unique queue.
+   */
+  static class UniqueQueue<T> {
+    private Deque<T> queue = new LinkedList<>();
+    private Set<T> set = new HashSet<>();
+
+    synchronized boolean offer(T dn) {
+      if (set.add(dn)) {
+        queue.addLast(dn);
+        return true;
+      }
+      return false;
+    }
+
+    synchronized T poll() {
+      T dn = queue.pollFirst();
+      set.remove(dn);
+      return dn;
+    }
+
+    synchronized int size() {
+      return set.size();
+    }
+  }
+
+  /**
    * Disabled start probe suspect/dead thread for the testing.
    */
   private static volatile boolean disabledProbeThreadForTest = false;
@@ -203,20 +217,14 @@ public class DeadNodeDetector extends Daemon {
         DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT);
     socketTimeout =
         conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT);
-    maxDeadNodesProbeQueueLen =
-        conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY,
-            DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT);
-    maxSuspectNodesProbeQueueLen =
-        conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY,
-            DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT);
     probeConnectionTimeoutMs = conf.getLong(
         DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY,
         DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT);
+    this.deadNodesProbeQueue = new UniqueQueue<>();
+    this.suspectNodesProbeQueue = new UniqueQueue<>();
 
-    this.deadNodesProbeQueue =
-        new ArrayBlockingQueue<DatanodeInfo>(maxDeadNodesProbeQueueLen);
-    this.suspectNodesProbeQueue =
-        new ArrayBlockingQueue<DatanodeInfo>(maxSuspectNodesProbeQueueLen);
+    idleSleepMs = conf.getLong(DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY,
+        DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_DEFAULT);
 
     int deadNodeDetectDeadThreads =
         conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY,
@@ -447,8 +455,7 @@ public class DeadNodeDetector extends Daemon {
     for (DatanodeInfo datanodeInfo : datanodeInfos) {
       if (!deadNodesProbeQueue.offer(datanodeInfo)) {
         LOG.debug("Skip to add dead node {} to check " +
-                "since the probe queue is full.", datanodeInfo);
-        break;
+                "since the node is already in the probe queue.", datanodeInfo);
       } else {
         LOG.debug("Add dead node to check: {}.", datanodeInfo);
       }
@@ -458,7 +465,7 @@ public class DeadNodeDetector extends Daemon {
 
   private void idle() {
     try {
-      Thread.sleep(IDLE_SLEEP_MS);
+      Thread.sleep(idleSleepMs);
     } catch (InterruptedException e) {
       LOG.debug("Got interrupted while DeadNodeDetector is idle.", e);
       Thread.currentThread().interrupt();
@@ -483,14 +490,24 @@ public class DeadNodeDetector extends Daemon {
     deadNodes.remove(datanodeInfo.getDatanodeUuid());
   }
 
-  public Queue<DatanodeInfo> getDeadNodesProbeQueue() {
+  public UniqueQueue<DatanodeInfo> getDeadNodesProbeQueue() {
     return deadNodesProbeQueue;
   }
 
-  public Queue<DatanodeInfo> getSuspectNodesProbeQueue() {
+  public UniqueQueue<DatanodeInfo> getSuspectNodesProbeQueue() {
     return suspectNodesProbeQueue;
   }
 
+  @VisibleForTesting
+  void setSuspectQueue(UniqueQueue<DatanodeInfo> queue) {
+    this.suspectNodesProbeQueue = queue;
+  }
+
+  @VisibleForTesting
+  void setDeadQueue(UniqueQueue<DatanodeInfo> queue) {
+    this.deadNodesProbeQueue = queue;
+  }
+
   /**
    * Add datanode to suspectNodes and suspectAndDeadNodes.
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index c17ad0e..2a6a7a5 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -164,13 +164,9 @@ public interface HdfsClientConfigKeys {
           "dfs.client.deadnode.detection.enabled";
   boolean DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT = false;
 
-  String DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY =
-      "dfs.client.deadnode.detection.deadnode.queue.max";
-  int DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT = 100;
-
-  String DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY =
-      "dfs.client.deadnode.detection.suspectnode.queue.max";
-  int DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT = 1000;
+  String DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY =
+      "dfs.client.deadnode.detection.idle.sleep.ms";
+  long DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_DEFAULT = 10000;
 
   String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY =
       "dfs.client.deadnode.detection.probe.connection.timeout.ms";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 94ff3ec..c308058 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3191,26 +3191,18 @@
   </property>
 
   <property>
-    <name>dfs.client.deadnode.detection.deadnode.queue.max</name>
-    <value>100</value>
-    <description>
-      The max queue size of probing dead node.
-    </description>
-  </property>
-
-  <property>
-    <name>dfs.client.deadnode.detection.suspectnode.queue.max</name>
-    <value>1000</value>
+    <name>dfs.client.deadnode.detection.probe.deadnode.threads</name>
+    <value>10</value>
     <description>
-      The max queue size of probing suspect node.
+      The maximum number of threads to use for probing dead node.
     </description>
   </property>
 
   <property>
-    <name>dfs.client.deadnode.detection.probe.deadnode.threads</name>
-    <value>10</value>
+    <name>dfs.client.deadnode.detection.idle.sleep.ms</name>
+    <value>10000</value>
     <description>
-      The maximum number of threads to use for probing dead node.
+      The sleep time of DeadNodeDetector per iteration.
     </description>
   </property>
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
index 9134f36..e8da918 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
@@ -30,19 +30,20 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertNotSame;
@@ -73,6 +74,7 @@ public class TestDeadNodeDetection {
         DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY,
         1000);
     conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 0);
+    conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY, 100);
   }
 
   @After
@@ -247,42 +249,63 @@ public class TestDeadNodeDetection {
   }
 
   @Test
-  public void testDeadNodeDetectionMaxDeadNodesProbeQueue() throws Exception {
-    conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, 1);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
-    cluster.waitActive();
-
-    FileSystem fs = cluster.getFileSystem();
-    Path filePath = new Path("/testDeadNodeDetectionMaxDeadNodesProbeQueue");
-    createFile(fs, filePath);
-
-    // Remove three DNs,
-    cluster.stopDataNode(0);
-    cluster.stopDataNode(0);
-    cluster.stopDataNode(0);
-
-    FSDataInputStream in = fs.open(filePath);
-    DFSInputStream din = (DFSInputStream) in.getWrappedStream();
-    DFSClient dfsClient = din.getDFSClient();
+  public void testDeadNodeDetectionDeadNodeProbe() throws Exception {
+    FileSystem fs = null;
+    FSDataInputStream in = null;
+    Path filePath = new Path("/" + GenericTestUtils.getMethodName());
     try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      cluster.waitActive();
+
+      fs = cluster.getFileSystem();
+      createFile(fs, filePath);
+
+      // Remove three DNs,
+      cluster.stopDataNode(0);
+      cluster.stopDataNode(0);
+      cluster.stopDataNode(0);
+
+      in = fs.open(filePath);
+      DFSInputStream din = (DFSInputStream) in.getWrappedStream();
+      DFSClient dfsClient = din.getDFSClient();
+      DeadNodeDetector deadNodeDetector =
+          dfsClient.getClientContext().getDeadNodeDetector();
+      // Spy suspect queue and dead queue.
+      DeadNodeDetector.UniqueQueue<DatanodeInfo> queue =
+          deadNodeDetector.getSuspectNodesProbeQueue();
+      DeadNodeDetector.UniqueQueue<DatanodeInfo> suspectSpy =
+          Mockito.spy(queue);
+      deadNodeDetector.setSuspectQueue(suspectSpy);
+      queue = deadNodeDetector.getDeadNodesProbeQueue();
+      DeadNodeDetector.UniqueQueue<DatanodeInfo> deadSpy = Mockito.spy(queue);
+      deadNodeDetector.setDeadQueue(deadSpy);
+      // Trigger dead node detection.
       try {
         in.read();
       } catch (BlockMissingException e) {
       }
 
       Thread.sleep(1500);
-      Assert.assertTrue((dfsClient.getClientContext().getDeadNodeDetector()
-          .getDeadNodesProbeQueue().size()
-          + dfsClient.getDeadNodes(din).size()) <= 4);
+      Collection<DatanodeInfo> deadNodes =
+          dfsClient.getDeadNodeDetector().clearAndGetDetectedDeadNodes();
+      assertEquals(3, deadNodes.size());
+      for (DatanodeInfo dead : deadNodes) {
+        // Each node is suspected once then marked as dead.
+        Mockito.verify(suspectSpy, Mockito.times(1)).offer(dead);
+        // All the dead nodes should be scheduled and probed at least once.
+        Mockito.verify(deadSpy, Mockito.atLeastOnce()).offer(dead);
+        Mockito.verify(deadSpy, Mockito.atLeastOnce()).poll();
+      }
     } finally {
-      in.close();
+      if (in != null) {
+        in.close();
+      }
       deleteFile(fs, filePath);
     }
   }
 
   @Test
   public void testDeadNodeDetectionSuspectNode() throws Exception {
-    conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY, 1);
     DeadNodeDetector.setDisabledProbeThreadForTest(true);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     cluster.waitActive();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org