You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2019/11/27 02:57:42 UTC

[hadoop] branch trunk updated: HDFS-14649. Add suspect probe for DeadNodeDetector. Contributed by Lisheng Sun.

This is an automated email from the ASF dual-hosted git repository.

yqlin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c8bef4d  HDFS-14649. Add suspect probe for DeadNodeDetector. Contributed by Lisheng Sun.
c8bef4d is described below

commit c8bef4d6a6d7d5affd00cff6ea4a2e2ef778050e
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Wed Nov 27 10:57:20 2019 +0800

    HDFS-14649. Add suspect probe for DeadNodeDetector. Contributed by Lisheng Sun.
---
 .../org/apache/hadoop/hdfs/DeadNodeDetector.java   | 169 +++++++++++++++++----
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |  13 ++
 .../src/main/resources/hdfs-default.xml            |  24 +++
 .../apache/hadoop/hdfs/TestDeadNodeDetection.java  | 104 +++++++++++--
 4 files changed, 264 insertions(+), 46 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
index 2fe7cf8..ce50547 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -48,8 +49,14 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 
 /**
@@ -83,13 +90,13 @@ public class DeadNodeDetector implements Runnable {
   private final Map<String, DatanodeInfo> deadNodes;
 
   /**
-   * Record dead nodes by one DFSInputStream. When dead node is not used by one
-   * DFSInputStream, remove it from dfsInputStreamNodes#DFSInputStream. If
-   * DFSInputStream does not include any dead node, remove DFSInputStream from
-   * dfsInputStreamNodes.
+   * Record suspect and dead nodes by one DFSInputStream. When node is not used
+   * by one DFSInputStream, remove it from suspectAndDeadNodes#DFSInputStream.
+   * If DFSInputStream does not include any node, remove DFSInputStream from
+   * suspectAndDeadNodes.
    */
   private final Map<DFSInputStream, HashSet<DatanodeInfo>>
-          dfsInputStreamNodes;
+          suspectAndDeadNodes;
 
   /**
    * Datanodes that is being probed.
@@ -108,11 +115,21 @@ public class DeadNodeDetector implements Runnable {
   private long deadNodeDetectInterval = 0;
 
   /**
+   * Interval time in milliseconds for probing suspect node behavior.
+   */
+  private long suspectNodeDetectInterval = 0;
+
+  /**
    * The max queue size of probing dead node.
    */
   private int maxDeadNodesProbeQueueLen = 0;
 
   /**
+   * The max queue size of probing suspect node.
+   */
+  private int maxSuspectNodesProbeQueueLen;
+
+  /**
    * Connection timeout for probing dead node in milliseconds.
    */
   private long probeConnectionTimeoutMs;
@@ -123,16 +140,31 @@ public class DeadNodeDetector implements Runnable {
   private Queue<DatanodeInfo> deadNodesProbeQueue;
 
   /**
+   * The suspect node probe queue.
+   */
+  private Queue<DatanodeInfo> suspectNodesProbeQueue;
+
+  /**
    * The thread pool of probing dead node.
    */
   private ExecutorService probeDeadNodesThreadPool;
 
   /**
+   * The thread pool of probing suspect node.
+   */
+  private ExecutorService probeSuspectNodesThreadPool;
+
+  /**
    * The scheduler thread of probing dead node.
    */
   private Thread probeDeadNodesSchedulerThr;
 
   /**
+   * The scheduler thread of probing suspect node.
+   */
+  private Thread probeSuspectNodesSchedulerThr;
+
+  /**
    * The thread pool of probing datanodes' rpc request. Sometimes the data node
    * can hang and not respond to the client in a short time. And these node will
    * filled with probe thread pool and block other normal node probing.
@@ -145,7 +177,7 @@ public class DeadNodeDetector implements Runnable {
    * The type of probe.
    */
   private enum ProbeType {
-    CHECK_DEAD
+    CHECK_DEAD, CHECK_SUSPECT
   }
 
   /**
@@ -155,41 +187,61 @@ public class DeadNodeDetector implements Runnable {
     INIT, CHECK_DEAD, IDLE, ERROR
   }
 
+  /**
+   * Disabled start probe suspect/dead thread for the testing.
+   */
+  private static volatile boolean disabledProbeThreadForTest = false;
+
   private State state;
 
   public DeadNodeDetector(String name, Configuration conf) {
     this.conf = new Configuration(conf);
     this.deadNodes = new ConcurrentHashMap<String, DatanodeInfo>();
-    this.dfsInputStreamNodes =
+    this.suspectAndDeadNodes =
         new ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>>();
     this.name = name;
 
     deadNodeDetectInterval = conf.getLong(
         DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
         DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT);
+    suspectNodeDetectInterval = conf.getLong(
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY,
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT);
     socketTimeout =
         conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT);
     maxDeadNodesProbeQueueLen =
         conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY,
             DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT);
+    maxSuspectNodesProbeQueueLen =
+        conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY,
+            DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT);
     probeConnectionTimeoutMs = conf.getLong(
         DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY,
         DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT);
 
     this.deadNodesProbeQueue =
         new ArrayBlockingQueue<DatanodeInfo>(maxDeadNodesProbeQueueLen);
+    this.suspectNodesProbeQueue =
+        new ArrayBlockingQueue<DatanodeInfo>(maxSuspectNodesProbeQueueLen);
 
     int deadNodeDetectDeadThreads =
         conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY,
             DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT);
+    int suspectNodeDetectDeadThreads = conf.getInt(
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY,
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT);
     int rpcThreads = conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY,
         DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT);
     probeDeadNodesThreadPool = Executors.newFixedThreadPool(
         deadNodeDetectDeadThreads, new Daemon.DaemonFactory());
+    probeSuspectNodesThreadPool = Executors.newFixedThreadPool(
+        suspectNodeDetectDeadThreads, new Daemon.DaemonFactory());
     rpcThreadPool =
         Executors.newFixedThreadPool(rpcThreads, new Daemon.DaemonFactory());
 
-    startProbeScheduler();
+    if (!disabledProbeThreadForTest) {
+      startProbeScheduler();
+    }
 
     LOG.info("Start dead node detector for DFSClient {}.", this.name);
     state = State.INIT;
@@ -223,14 +275,25 @@ public class DeadNodeDetector implements Runnable {
     }
   }
 
+  @VisibleForTesting
+  static void disabledProbeThreadForTest() {
+    disabledProbeThreadForTest = true;
+  }
+
   /**
-   * Start probe dead node thread.
+   * Start probe dead node and suspect node thread.
    */
-  private void startProbeScheduler() {
+  @VisibleForTesting
+  void startProbeScheduler() {
     probeDeadNodesSchedulerThr =
             new Thread(new ProbeScheduler(this, ProbeType.CHECK_DEAD));
     probeDeadNodesSchedulerThr.setDaemon(true);
     probeDeadNodesSchedulerThr.start();
+
+    probeSuspectNodesSchedulerThr =
+            new Thread(new ProbeScheduler(this, ProbeType.CHECK_SUSPECT));
+    probeSuspectNodesSchedulerThr.setDaemon(true);
+    probeSuspectNodesSchedulerThr.start();
   }
 
   /**
@@ -250,6 +313,15 @@ public class DeadNodeDetector implements Runnable {
         Probe probe = new Probe(this, datanodeInfo, ProbeType.CHECK_DEAD);
         probeDeadNodesThreadPool.execute(probe);
       }
+    } else if (type == ProbeType.CHECK_SUSPECT) {
+      while ((datanodeInfo = suspectNodesProbeQueue.poll()) != null) {
+        if (probeInProg.containsKey(datanodeInfo.getDatanodeUuid())) {
+          continue;
+        }
+        probeInProg.put(datanodeInfo.getDatanodeUuid(), datanodeInfo);
+        Probe probe = new Probe(this, datanodeInfo, ProbeType.CHECK_SUSPECT);
+        probeSuspectNodesThreadPool.execute(probe);
+      }
     }
   }
 
@@ -263,7 +335,7 @@ public class DeadNodeDetector implements Runnable {
     private ProbeType type;
 
     Probe(DeadNodeDetector deadNodeDetector, DatanodeInfo datanodeInfo,
-        ProbeType type) {
+           ProbeType type) {
       this.deadNodeDetector = deadNodeDetector;
       this.datanodeInfo = datanodeInfo;
       this.type = type;
@@ -323,9 +395,19 @@ public class DeadNodeDetector implements Runnable {
     probeInProg.remove(probe.getDatanodeInfo().getDatanodeUuid());
     if (success) {
       if (probe.getType() == ProbeType.CHECK_DEAD) {
-        LOG.info("Remove the node out from dead node list: {}. ",
+        LOG.info("Remove the node out from dead node list: {}.",
+            probe.getDatanodeInfo());
+        removeDeadNode(probe.getDatanodeInfo());
+      } else if (probe.getType() == ProbeType.CHECK_SUSPECT) {
+        LOG.debug("Remove the node out from suspect node list: {}.",
+            probe.getDatanodeInfo());
+        removeNodeFromDeadNodeDetector(probe.getDatanodeInfo());
+      }
+    } else {
+      if (probe.getType() == ProbeType.CHECK_SUSPECT) {
+        LOG.info("Add the node to dead node list: {}.",
             probe.getDatanodeInfo());
-        removeNodeFromDeadNode(probe.getDatanodeInfo());
+        addToDead(probe.getDatanodeInfo());
       }
     }
   }
@@ -381,34 +463,43 @@ public class DeadNodeDetector implements Runnable {
     return deadNodesProbeQueue;
   }
 
+  public Queue<DatanodeInfo> getSuspectNodesProbeQueue() {
+    return suspectNodesProbeQueue;
+  }
+
   /**
-   * Add datanode in deadNodes and dfsInputStreamNodes. The node is considered
-   * to dead node. The dead node is shared by all the DFSInputStreams in the
-   * same client.
+   * Add datanode to suspectNodes and suspectAndDeadNodes.
    */
   public synchronized void addNodeToDetect(DFSInputStream dfsInputStream,
       DatanodeInfo datanodeInfo) {
     HashSet<DatanodeInfo> datanodeInfos =
-        dfsInputStreamNodes.get(dfsInputStream);
+        suspectAndDeadNodes.get(dfsInputStream);
     if (datanodeInfos == null) {
       datanodeInfos = new HashSet<DatanodeInfo>();
       datanodeInfos.add(datanodeInfo);
-      dfsInputStreamNodes.putIfAbsent(dfsInputStream, datanodeInfos);
+      suspectAndDeadNodes.putIfAbsent(dfsInputStream, datanodeInfos);
     } else {
       datanodeInfos.add(datanodeInfo);
     }
 
-    addToDead(datanodeInfo);
+    addSuspectNodeToDetect(datanodeInfo);
   }
 
   /**
-   * Remove dead node which is not used by any DFSInputStream from deadNodes.
-   * @return new dead node shared by all DFSInputStreams.
+   * Add datanode to suspectNodes.
    */
+  private boolean addSuspectNodeToDetect(DatanodeInfo datanodeInfo) {
+    return suspectNodesProbeQueue.offer(datanodeInfo);
+  }
+
+    /**
+     * Remove dead node which is not used by any DFSInputStream from deadNodes.
+     * @return new dead node shared by all DFSInputStreams.
+     */
   public synchronized Set<DatanodeInfo> clearAndGetDetectedDeadNodes() {
     // remove the dead nodes who doesn't have any inputstream first
     Set<DatanodeInfo> newDeadNodes = new HashSet<DatanodeInfo>();
-    for (HashSet<DatanodeInfo> datanodeInfos : dfsInputStreamNodes.values()) {
+    for (HashSet<DatanodeInfo> datanodeInfos : suspectAndDeadNodes.values()) {
       newDeadNodes.addAll(datanodeInfos);
     }
 
@@ -421,34 +512,46 @@ public class DeadNodeDetector implements Runnable {
   }
 
   /**
-   * Remove dead node from dfsInputStreamNodes#dfsInputStream. If
-   * dfsInputStreamNodes#dfsInputStream does not contain any dead node, remove
-   * it from dfsInputStreamNodes.
+   * Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and
+   *  local deadNodes.
    */
   public synchronized void removeNodeFromDeadNodeDetector(
       DFSInputStream dfsInputStream, DatanodeInfo datanodeInfo) {
-    Set<DatanodeInfo> datanodeInfos = dfsInputStreamNodes.get(dfsInputStream);
+    Set<DatanodeInfo> datanodeInfos = suspectAndDeadNodes.get(dfsInputStream);
     if (datanodeInfos != null) {
       datanodeInfos.remove(datanodeInfo);
+      dfsInputStream.removeFromLocalDeadNodes(datanodeInfo);
       if (datanodeInfos.isEmpty()) {
-        dfsInputStreamNodes.remove(dfsInputStream);
+        suspectAndDeadNodes.remove(dfsInputStream);
       }
     }
   }
 
   /**
-   * Remove dead node from dfsInputStreamNodes#dfsInputStream and deadNodes.
+   * Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and
+   *  local deadNodes.
    */
-  public synchronized void removeNodeFromDeadNode(DatanodeInfo datanodeInfo) {
+  private synchronized void removeNodeFromDeadNodeDetector(
+      DatanodeInfo datanodeInfo) {
     for (Map.Entry<DFSInputStream, HashSet<DatanodeInfo>> entry :
-            dfsInputStreamNodes.entrySet()) {
+            suspectAndDeadNodes.entrySet()) {
       Set<DatanodeInfo> datanodeInfos = entry.getValue();
       if (datanodeInfos.remove(datanodeInfo)) {
         DFSInputStream dfsInputStream = entry.getKey();
         dfsInputStream.removeFromLocalDeadNodes(datanodeInfo);
+        if (datanodeInfos.isEmpty()) {
+          suspectAndDeadNodes.remove(dfsInputStream);
+        }
       }
     }
+  }
 
+  /**
+   * Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and
+   * deadNodes.
+   */
+  private void removeDeadNode(DatanodeInfo datanodeInfo) {
+    removeNodeFromDeadNodeDetector(datanodeInfo);
     removeFromDead(datanodeInfo);
   }
 
@@ -476,7 +579,11 @@ public class DeadNodeDetector implements Runnable {
     public void run() {
       while (true) {
         deadNodeDetector.scheduleProbe(type);
-        probeSleep(deadNodeDetector.deadNodeDetectInterval);
+        if (type == ProbeType.CHECK_SUSPECT) {
+          probeSleep(deadNodeDetector.suspectNodeDetectInterval);
+        } else {
+          probeSleep(deadNodeDetector.deadNodeDetectInterval);
+        }
       }
     }
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 4218cc0..fec3958 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -160,6 +160,10 @@ public interface HdfsClientConfigKeys {
       "dfs.client.deadnode.detection.deadnode.queue.max";
   int DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT = 100;
 
+  String DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY =
+      "dfs.client.deadnode.detection.suspectnode.queue.max";
+  int DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT = 1000;
+
   String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY =
       "dfs.client.deadnode.detection.probe.connection.timeout.ms";
   long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT =
@@ -169,6 +173,10 @@ public interface HdfsClientConfigKeys {
       "dfs.client.deadnode.detection.probe.deadnode.threads";
   int DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT = 10;
 
+  String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY =
+      "dfs.client.deadnode.detection.probe.suspectnode.threads";
+  int DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT = 10;
+
   String DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY =
       "dfs.client.deadnode.detection.rpc.threads";
   int DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT = 20;
@@ -178,6 +186,11 @@ public interface HdfsClientConfigKeys {
   long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT =
       60 * 1000; // 60s
 
+  String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY =
+      "dfs.client.deadnode.detection.probe.suspectnode.interval.ms";
+  long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT =
+      300; // 300ms
+
   String  DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
       "dfs.datanode.kerberos.principal";
   String  DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 684b617..a7e91f1 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3005,6 +3005,14 @@
   </property>
 
   <property>
+    <name>dfs.client.deadnode.detection.suspectnode.queue.max</name>
+    <value>1000</value>
+    <description>
+      The max queue size of probing suspect node.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.client.deadnode.detection.probe.deadnode.threads</name>
     <value>10</value>
     <description>
@@ -3013,6 +3021,14 @@
   </property>
 
   <property>
+    <name>dfs.client.deadnode.detection.probe.suspectnode.threads</name>
+    <value>10</value>
+    <description>
+      The maximum number of threads to use for probing suspect node.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.client.deadnode.detection.rpc.threads</name>
     <value>20</value>
     <description>
@@ -3029,6 +3045,14 @@
   </property>
 
   <property>
+    <name>dfs.client.deadnode.detection.probe.suspectnode.interval.ms</name>
+    <value>300</value>
+    <description>
+      Interval time in milliseconds for probing suspect node behavior.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.client.deadnode.detection.probe.connection.timeout.ms</name>
     <value>20000</value>
     <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
index da800f7..58f6d5d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
@@ -35,6 +35,8 @@ import java.io.IOException;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -59,10 +61,15 @@ public class TestDeadNodeDetection {
   }
 
   @Test
-  public void testDeadNodeDetectionInBackground() throws IOException {
+  public void testDeadNodeDetectionInBackground() throws Exception {
+    conf = new HdfsConfiguration();
     conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
+    conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
+        1000);
+    conf.setLong(
+        DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY, 100);
     // We'll be using a 512 bytes block size just for tests
-    // so making sure the checksum bytes too match it.
+    // so making sure the checksum bytes match it too.
     conf.setInt("io.bytes.per.checksum", 512);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
     cluster.waitActive();
@@ -91,22 +98,21 @@ public class TestDeadNodeDetection {
     cluster.stopDataNode(0);
 
     FSDataInputStream in = fs.open(filePath);
-    DFSInputStream din = null;
-    DFSClient dfsClient = null;
+    DFSInputStream din = (DFSInputStream) in.getWrappedStream();
+    DFSClient dfsClient = din.getDFSClient();
     try {
       try {
         in.read();
       } catch (BlockMissingException e) {
       }
 
-      din = (DFSInputStream) in.getWrappedStream();
-      dfsClient = din.getDFSClient();
+      waitForDeadNode(dfsClient, 3);
       assertEquals(3, dfsClient.getDeadNodes(din).size());
       assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector()
           .clearAndGetDetectedDeadNodes().size());
     } finally {
       in.close();
-      fs.delete(new Path("/testDetectDeadNodeInBackground"), true);
+      fs.delete(filePath, true);
       // check the dead node again here, the dead node is expected be removed
       assertEquals(0, dfsClient.getDeadNodes(din).size());
       assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector()
@@ -119,7 +125,7 @@ public class TestDeadNodeDetection {
       throws IOException {
     conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
     // We'll be using a 512 bytes block size just for tests
-    // so making sure the checksum bytes too match it.
+    // so making sure the checksum bytes match it too.
     conf.setInt("io.bytes.per.checksum", 512);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     cluster.waitActive();
@@ -178,7 +184,7 @@ public class TestDeadNodeDetection {
     conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
         1000);
     // We'll be using a 512 bytes block size just for tests
-    // so making sure the checksum bytes too match it.
+    // so making sure the checksum bytes match it too.
     conf.setInt("io.bytes.per.checksum", 512);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
     cluster.waitActive();
@@ -201,12 +207,13 @@ public class TestDeadNodeDetection {
       } catch (BlockMissingException e) {
       }
 
+      waitForDeadNode(dfsClient, 3);
       assertEquals(3, dfsClient.getDeadNodes(din).size());
       assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector()
           .clearAndGetDetectedDeadNodes().size());
 
       cluster.restartDataNode(one, true);
-      waitForDeadNodeRecovery(din);
+      waitForDeadNode(dfsClient, 2);
       assertEquals(2, dfsClient.getDeadNodes(din).size());
       assertEquals(2, dfsClient.getClientContext().getDeadNodeDetector()
           .clearAndGetDetectedDeadNodes().size());
@@ -225,15 +232,14 @@ public class TestDeadNodeDetection {
     conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
         1000);
     conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, 1);
-
     // We'll be using a 512 bytes block size just for tests
-    // so making sure the checksum bytes too match it.
+    // so making sure the checksum bytes match it too.
     conf.setInt("io.bytes.per.checksum", 512);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
     cluster.waitActive();
 
     FileSystem fs = cluster.getFileSystem();
-    Path filePath = new Path("testDeadNodeDetectionMaxDeadNodesProbeQueue");
+    Path filePath = new Path("/testDeadNodeDetectionMaxDeadNodesProbeQueue");
     createFile(fs, filePath);
 
     // Remove three DNs,
@@ -260,6 +266,55 @@ public class TestDeadNodeDetection {
     }
   }
 
+  @Test
+  public void testDeadNodeDetectionSuspectNode() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
+    conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY, 1);
+    // We'll be using a 512 bytes block size just for tests
+    // so making sure the checksum bytes match it too.
+    conf.setInt("io.bytes.per.checksum", 512);
+    DeadNodeDetector.disabledProbeThreadForTest();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+
+    FileSystem fs = cluster.getFileSystem();
+    Path filePath = new Path("/testDeadNodeDetectionSuspectNode");
+    createFile(fs, filePath);
+
+    MiniDFSCluster.DataNodeProperties one = cluster.stopDataNode(0);
+
+    FSDataInputStream in = fs.open(filePath);
+    DFSInputStream din = (DFSInputStream) in.getWrappedStream();
+    DFSClient dfsClient = din.getDFSClient();
+    DeadNodeDetector deadNodeDetector =
+        dfsClient.getClientContext().getDeadNodeDetector();
+    try {
+      try {
+        in.read();
+      } catch (BlockMissingException e) {
+      }
+      waitForSuspectNode(din.getDFSClient());
+      cluster.restartDataNode(one, true);
+      Assert.assertEquals(1,
+          deadNodeDetector.getSuspectNodesProbeQueue().size());
+      Assert.assertEquals(0,
+          deadNodeDetector.clearAndGetDetectedDeadNodes().size());
+      deadNodeDetector.startProbeScheduler();
+      Thread.sleep(1000);
+      Assert.assertEquals(0,
+          deadNodeDetector.getSuspectNodesProbeQueue().size());
+      Assert.assertEquals(0,
+          deadNodeDetector.clearAndGetDetectedDeadNodes().size());
+    } finally {
+      in.close();
+      deleteFile(fs, filePath);
+      assertEquals(0, dfsClient.getDeadNodes(din).size());
+      assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+    }
+  }
+
   private void createFile(FileSystem fs, Path filePath) throws IOException {
     FSDataOutputStream out = null;
     try {
@@ -286,12 +341,31 @@ public class TestDeadNodeDetection {
     fs.delete(filePath, true);
   }
 
-  private void waitForDeadNodeRecovery(DFSInputStream din) throws Exception {
+  private void waitForDeadNode(DFSClient dfsClient, int size) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          if (dfsClient.getClientContext().getDeadNodeDetector()
+              .clearAndGetDetectedDeadNodes().size() == size) {
+            return true;
+          }
+        } catch (Exception e) {
+          // Ignore the exception
+        }
+
+        return false;
+      }
+    }, 5000, 100000);
+  }
+
+  private void waitForSuspectNode(DFSClient dfsClient) throws Exception {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
         try {
-          if (din.getDFSClient().getDeadNodes(din).size() == 2) {
+          if (dfsClient.getClientContext().getDeadNodeDetector()
+              .getSuspectNodesProbeQueue().size() > 0) {
             return true;
           }
         } catch (Exception e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org