You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2019/11/27 02:57:42 UTC
[hadoop] branch trunk updated: HDFS-14649. Add suspect probe for
DeadNodeDetector. Contributed by Lisheng Sun.
This is an automated email from the ASF dual-hosted git repository.
yqlin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new c8bef4d HDFS-14649. Add suspect probe for DeadNodeDetector. Contributed by Lisheng Sun.
c8bef4d is described below
commit c8bef4d6a6d7d5affd00cff6ea4a2e2ef778050e
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Wed Nov 27 10:57:20 2019 +0800
HDFS-14649. Add suspect probe for DeadNodeDetector. Contributed by Lisheng Sun.
---
.../org/apache/hadoop/hdfs/DeadNodeDetector.java | 169 +++++++++++++++++----
.../hadoop/hdfs/client/HdfsClientConfigKeys.java | 13 ++
.../src/main/resources/hdfs-default.xml | 24 +++
.../apache/hadoop/hdfs/TestDeadNodeDetection.java | 104 +++++++++++--
4 files changed, 264 insertions(+), 46 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
index 2fe7cf8..ce50547 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -48,8 +49,14 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
/**
@@ -83,13 +90,13 @@ public class DeadNodeDetector implements Runnable {
private final Map<String, DatanodeInfo> deadNodes;
/**
- * Record dead nodes by one DFSInputStream. When dead node is not used by one
- * DFSInputStream, remove it from dfsInputStreamNodes#DFSInputStream. If
- * DFSInputStream does not include any dead node, remove DFSInputStream from
- * dfsInputStreamNodes.
+ * Record suspect and dead nodes by one DFSInputStream. When node is not used
+ * by one DFSInputStream, remove it from suspectAndDeadNodes#DFSInputStream.
+ * If DFSInputStream does not include any node, remove DFSInputStream from
+ * suspectAndDeadNodes.
*/
private final Map<DFSInputStream, HashSet<DatanodeInfo>>
- dfsInputStreamNodes;
+ suspectAndDeadNodes;
/**
* Datanodes that is being probed.
@@ -108,11 +115,21 @@ public class DeadNodeDetector implements Runnable {
private long deadNodeDetectInterval = 0;
/**
+ * Interval time in milliseconds for probing suspect node behavior.
+ */
+ private long suspectNodeDetectInterval = 0;
+
+ /**
* The max queue size of probing dead node.
*/
private int maxDeadNodesProbeQueueLen = 0;
/**
+ * The max queue size of probing suspect node.
+ */
+ private int maxSuspectNodesProbeQueueLen;
+
+ /**
* Connection timeout for probing dead node in milliseconds.
*/
private long probeConnectionTimeoutMs;
@@ -123,16 +140,31 @@ public class DeadNodeDetector implements Runnable {
private Queue<DatanodeInfo> deadNodesProbeQueue;
/**
+ * The suspect node probe queue.
+ */
+ private Queue<DatanodeInfo> suspectNodesProbeQueue;
+
+ /**
* The thread pool of probing dead node.
*/
private ExecutorService probeDeadNodesThreadPool;
/**
+ * The thread pool of probing suspect node.
+ */
+ private ExecutorService probeSuspectNodesThreadPool;
+
+ /**
* The scheduler thread of probing dead node.
*/
private Thread probeDeadNodesSchedulerThr;
/**
+ * The scheduler thread of probing suspect node.
+ */
+ private Thread probeSuspectNodesSchedulerThr;
+
+ /**
* The thread pool of probing datanodes' rpc request. Sometimes the data node
* can hang and not respond to the client in a short time. And these node will
* filled with probe thread pool and block other normal node probing.
@@ -145,7 +177,7 @@ public class DeadNodeDetector implements Runnable {
* The type of probe.
*/
private enum ProbeType {
- CHECK_DEAD
+ CHECK_DEAD, CHECK_SUSPECT
}
/**
@@ -155,41 +187,61 @@ public class DeadNodeDetector implements Runnable {
INIT, CHECK_DEAD, IDLE, ERROR
}
+ /**
+ * Disabled start probe suspect/dead thread for the testing.
+ */
+ private static volatile boolean disabledProbeThreadForTest = false;
+
private State state;
public DeadNodeDetector(String name, Configuration conf) {
this.conf = new Configuration(conf);
this.deadNodes = new ConcurrentHashMap<String, DatanodeInfo>();
- this.dfsInputStreamNodes =
+ this.suspectAndDeadNodes =
new ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>>();
this.name = name;
deadNodeDetectInterval = conf.getLong(
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT);
+ suspectNodeDetectInterval = conf.getLong(
+ DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY,
+ DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT);
socketTimeout =
conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT);
maxDeadNodesProbeQueueLen =
conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT);
+ maxSuspectNodesProbeQueueLen =
+ conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY,
+ DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT);
probeConnectionTimeoutMs = conf.getLong(
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT);
this.deadNodesProbeQueue =
new ArrayBlockingQueue<DatanodeInfo>(maxDeadNodesProbeQueueLen);
+ this.suspectNodesProbeQueue =
+ new ArrayBlockingQueue<DatanodeInfo>(maxSuspectNodesProbeQueueLen);
int deadNodeDetectDeadThreads =
conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT);
+ int suspectNodeDetectDeadThreads = conf.getInt(
+ DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY,
+ DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT);
int rpcThreads = conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT);
probeDeadNodesThreadPool = Executors.newFixedThreadPool(
deadNodeDetectDeadThreads, new Daemon.DaemonFactory());
+ probeSuspectNodesThreadPool = Executors.newFixedThreadPool(
+ suspectNodeDetectDeadThreads, new Daemon.DaemonFactory());
rpcThreadPool =
Executors.newFixedThreadPool(rpcThreads, new Daemon.DaemonFactory());
- startProbeScheduler();
+ if (!disabledProbeThreadForTest) {
+ startProbeScheduler();
+ }
LOG.info("Start dead node detector for DFSClient {}.", this.name);
state = State.INIT;
@@ -223,14 +275,25 @@ public class DeadNodeDetector implements Runnable {
}
}
+ @VisibleForTesting
+ static void disabledProbeThreadForTest() {
+ disabledProbeThreadForTest = true;
+ }
+
/**
- * Start probe dead node thread.
+ * Start probe dead node and suspect node thread.
*/
- private void startProbeScheduler() {
+ @VisibleForTesting
+ void startProbeScheduler() {
probeDeadNodesSchedulerThr =
new Thread(new ProbeScheduler(this, ProbeType.CHECK_DEAD));
probeDeadNodesSchedulerThr.setDaemon(true);
probeDeadNodesSchedulerThr.start();
+
+ probeSuspectNodesSchedulerThr =
+ new Thread(new ProbeScheduler(this, ProbeType.CHECK_SUSPECT));
+ probeSuspectNodesSchedulerThr.setDaemon(true);
+ probeSuspectNodesSchedulerThr.start();
}
/**
@@ -250,6 +313,15 @@ public class DeadNodeDetector implements Runnable {
Probe probe = new Probe(this, datanodeInfo, ProbeType.CHECK_DEAD);
probeDeadNodesThreadPool.execute(probe);
}
+ } else if (type == ProbeType.CHECK_SUSPECT) {
+ while ((datanodeInfo = suspectNodesProbeQueue.poll()) != null) {
+ if (probeInProg.containsKey(datanodeInfo.getDatanodeUuid())) {
+ continue;
+ }
+ probeInProg.put(datanodeInfo.getDatanodeUuid(), datanodeInfo);
+ Probe probe = new Probe(this, datanodeInfo, ProbeType.CHECK_SUSPECT);
+ probeSuspectNodesThreadPool.execute(probe);
+ }
}
}
@@ -263,7 +335,7 @@ public class DeadNodeDetector implements Runnable {
private ProbeType type;
Probe(DeadNodeDetector deadNodeDetector, DatanodeInfo datanodeInfo,
- ProbeType type) {
+ ProbeType type) {
this.deadNodeDetector = deadNodeDetector;
this.datanodeInfo = datanodeInfo;
this.type = type;
@@ -323,9 +395,19 @@ public class DeadNodeDetector implements Runnable {
probeInProg.remove(probe.getDatanodeInfo().getDatanodeUuid());
if (success) {
if (probe.getType() == ProbeType.CHECK_DEAD) {
- LOG.info("Remove the node out from dead node list: {}. ",
+ LOG.info("Remove the node out from dead node list: {}.",
+ probe.getDatanodeInfo());
+ removeDeadNode(probe.getDatanodeInfo());
+ } else if (probe.getType() == ProbeType.CHECK_SUSPECT) {
+ LOG.debug("Remove the node out from suspect node list: {}.",
+ probe.getDatanodeInfo());
+ removeNodeFromDeadNodeDetector(probe.getDatanodeInfo());
+ }
+ } else {
+ if (probe.getType() == ProbeType.CHECK_SUSPECT) {
+ LOG.info("Add the node to dead node list: {}.",
probe.getDatanodeInfo());
- removeNodeFromDeadNode(probe.getDatanodeInfo());
+ addToDead(probe.getDatanodeInfo());
}
}
}
@@ -381,34 +463,43 @@ public class DeadNodeDetector implements Runnable {
return deadNodesProbeQueue;
}
+ public Queue<DatanodeInfo> getSuspectNodesProbeQueue() {
+ return suspectNodesProbeQueue;
+ }
+
/**
- * Add datanode in deadNodes and dfsInputStreamNodes. The node is considered
- * to dead node. The dead node is shared by all the DFSInputStreams in the
- * same client.
+ * Add datanode to suspectNodes and suspectAndDeadNodes.
*/
public synchronized void addNodeToDetect(DFSInputStream dfsInputStream,
DatanodeInfo datanodeInfo) {
HashSet<DatanodeInfo> datanodeInfos =
- dfsInputStreamNodes.get(dfsInputStream);
+ suspectAndDeadNodes.get(dfsInputStream);
if (datanodeInfos == null) {
datanodeInfos = new HashSet<DatanodeInfo>();
datanodeInfos.add(datanodeInfo);
- dfsInputStreamNodes.putIfAbsent(dfsInputStream, datanodeInfos);
+ suspectAndDeadNodes.putIfAbsent(dfsInputStream, datanodeInfos);
} else {
datanodeInfos.add(datanodeInfo);
}
- addToDead(datanodeInfo);
+ addSuspectNodeToDetect(datanodeInfo);
}
/**
- * Remove dead node which is not used by any DFSInputStream from deadNodes.
- * @return new dead node shared by all DFSInputStreams.
+ * Add datanode to suspectNodes.
*/
+ private boolean addSuspectNodeToDetect(DatanodeInfo datanodeInfo) {
+ return suspectNodesProbeQueue.offer(datanodeInfo);
+ }
+
+ /**
+ * Remove dead node which is not used by any DFSInputStream from deadNodes.
+ * @return new dead node shared by all DFSInputStreams.
+ */
public synchronized Set<DatanodeInfo> clearAndGetDetectedDeadNodes() {
// remove the dead nodes who doesn't have any inputstream first
Set<DatanodeInfo> newDeadNodes = new HashSet<DatanodeInfo>();
- for (HashSet<DatanodeInfo> datanodeInfos : dfsInputStreamNodes.values()) {
+ for (HashSet<DatanodeInfo> datanodeInfos : suspectAndDeadNodes.values()) {
newDeadNodes.addAll(datanodeInfos);
}
@@ -421,34 +512,46 @@ public class DeadNodeDetector implements Runnable {
}
/**
- * Remove dead node from dfsInputStreamNodes#dfsInputStream. If
- * dfsInputStreamNodes#dfsInputStream does not contain any dead node, remove
- * it from dfsInputStreamNodes.
+ * Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and
+ * local deadNodes.
*/
public synchronized void removeNodeFromDeadNodeDetector(
DFSInputStream dfsInputStream, DatanodeInfo datanodeInfo) {
- Set<DatanodeInfo> datanodeInfos = dfsInputStreamNodes.get(dfsInputStream);
+ Set<DatanodeInfo> datanodeInfos = suspectAndDeadNodes.get(dfsInputStream);
if (datanodeInfos != null) {
datanodeInfos.remove(datanodeInfo);
+ dfsInputStream.removeFromLocalDeadNodes(datanodeInfo);
if (datanodeInfos.isEmpty()) {
- dfsInputStreamNodes.remove(dfsInputStream);
+ suspectAndDeadNodes.remove(dfsInputStream);
}
}
}
/**
- * Remove dead node from dfsInputStreamNodes#dfsInputStream and deadNodes.
+ * Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and
+ * local deadNodes.
*/
- public synchronized void removeNodeFromDeadNode(DatanodeInfo datanodeInfo) {
+ private synchronized void removeNodeFromDeadNodeDetector(
+ DatanodeInfo datanodeInfo) {
for (Map.Entry<DFSInputStream, HashSet<DatanodeInfo>> entry :
- dfsInputStreamNodes.entrySet()) {
+ suspectAndDeadNodes.entrySet()) {
Set<DatanodeInfo> datanodeInfos = entry.getValue();
if (datanodeInfos.remove(datanodeInfo)) {
DFSInputStream dfsInputStream = entry.getKey();
dfsInputStream.removeFromLocalDeadNodes(datanodeInfo);
+ if (datanodeInfos.isEmpty()) {
+ suspectAndDeadNodes.remove(dfsInputStream);
+ }
}
}
+ }
+ /**
+ * Remove suspect and dead node from suspectAndDeadNodes#dfsInputStream and
+ * deadNodes.
+ */
+ private void removeDeadNode(DatanodeInfo datanodeInfo) {
+ removeNodeFromDeadNodeDetector(datanodeInfo);
removeFromDead(datanodeInfo);
}
@@ -476,7 +579,11 @@ public class DeadNodeDetector implements Runnable {
public void run() {
while (true) {
deadNodeDetector.scheduleProbe(type);
- probeSleep(deadNodeDetector.deadNodeDetectInterval);
+ if (type == ProbeType.CHECK_SUSPECT) {
+ probeSleep(deadNodeDetector.suspectNodeDetectInterval);
+ } else {
+ probeSleep(deadNodeDetector.deadNodeDetectInterval);
+ }
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 4218cc0..fec3958 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -160,6 +160,10 @@ public interface HdfsClientConfigKeys {
"dfs.client.deadnode.detection.deadnode.queue.max";
int DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT = 100;
+ String DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY =
+ "dfs.client.deadnode.detection.suspectnode.queue.max";
+ int DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT = 1000;
+
String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY =
"dfs.client.deadnode.detection.probe.connection.timeout.ms";
long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT =
@@ -169,6 +173,10 @@ public interface HdfsClientConfigKeys {
"dfs.client.deadnode.detection.probe.deadnode.threads";
int DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT = 10;
+ String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY =
+ "dfs.client.deadnode.detection.probe.suspectnode.threads";
+ int DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_DEFAULT = 10;
+
String DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY =
"dfs.client.deadnode.detection.rpc.threads";
int DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT = 20;
@@ -178,6 +186,11 @@ public interface HdfsClientConfigKeys {
long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT =
60 * 1000; // 60s
+ String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY =
+ "dfs.client.deadnode.detection.probe.suspectnode.interval.ms";
+ long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT =
+ 300; // 300ms
+
String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
"dfs.datanode.kerberos.principal";
String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 684b617..a7e91f1 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3005,6 +3005,14 @@
</property>
<property>
+ <name>dfs.client.deadnode.detection.suspectnode.queue.max</name>
+ <value>1000</value>
+ <description>
+ The max queue size of probing suspect node.
+ </description>
+ </property>
+
+ <property>
<name>dfs.client.deadnode.detection.probe.deadnode.threads</name>
<value>10</value>
<description>
@@ -3013,6 +3021,14 @@
</property>
<property>
+ <name>dfs.client.deadnode.detection.probe.suspectnode.threads</name>
+ <value>10</value>
+ <description>
+ The maximum number of threads to use for probing suspect node.
+ </description>
+ </property>
+
+ <property>
<name>dfs.client.deadnode.detection.rpc.threads</name>
<value>20</value>
<description>
@@ -3029,6 +3045,14 @@
</property>
<property>
+ <name>dfs.client.deadnode.detection.probe.suspectnode.interval.ms</name>
+ <value>300</value>
+ <description>
+ Interval time in milliseconds for probing suspect node behavior.
+ </description>
+ </property>
+
+ <property>
<name>dfs.client.deadnode.detection.probe.connection.timeout.ms</name>
<value>20000</value>
<description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
index da800f7..58f6d5d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
@@ -35,6 +35,8 @@ import java.io.IOException;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
import static org.junit.Assert.assertEquals;
/**
@@ -59,10 +61,15 @@ public class TestDeadNodeDetection {
}
@Test
- public void testDeadNodeDetectionInBackground() throws IOException {
+ public void testDeadNodeDetectionInBackground() throws Exception {
+ conf = new HdfsConfiguration();
conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
+ conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
+ 1000);
+ conf.setLong(
+ DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY, 100);
// We'll be using a 512 bytes block size just for tests
- // so making sure the checksum bytes too match it.
+ // so making sure the checksum bytes match it too.
conf.setInt("io.bytes.per.checksum", 512);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
@@ -91,22 +98,21 @@ public class TestDeadNodeDetection {
cluster.stopDataNode(0);
FSDataInputStream in = fs.open(filePath);
- DFSInputStream din = null;
- DFSClient dfsClient = null;
+ DFSInputStream din = (DFSInputStream) in.getWrappedStream();
+ DFSClient dfsClient = din.getDFSClient();
try {
try {
in.read();
} catch (BlockMissingException e) {
}
- din = (DFSInputStream) in.getWrappedStream();
- dfsClient = din.getDFSClient();
+ waitForDeadNode(dfsClient, 3);
assertEquals(3, dfsClient.getDeadNodes(din).size());
assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size());
} finally {
in.close();
- fs.delete(new Path("/testDetectDeadNodeInBackground"), true);
+ fs.delete(filePath, true);
// check the dead node again here, the dead node is expected be removed
assertEquals(0, dfsClient.getDeadNodes(din).size());
assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector()
@@ -119,7 +125,7 @@ public class TestDeadNodeDetection {
throws IOException {
conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
// We'll be using a 512 bytes block size just for tests
- // so making sure the checksum bytes too match it.
+ // so making sure the checksum bytes match it too.
conf.setInt("io.bytes.per.checksum", 512);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
@@ -178,7 +184,7 @@ public class TestDeadNodeDetection {
conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
1000);
// We'll be using a 512 bytes block size just for tests
- // so making sure the checksum bytes too match it.
+ // so making sure the checksum bytes match it too.
conf.setInt("io.bytes.per.checksum", 512);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
@@ -201,12 +207,13 @@ public class TestDeadNodeDetection {
} catch (BlockMissingException e) {
}
+ waitForDeadNode(dfsClient, 3);
assertEquals(3, dfsClient.getDeadNodes(din).size());
assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size());
cluster.restartDataNode(one, true);
- waitForDeadNodeRecovery(din);
+ waitForDeadNode(dfsClient, 2);
assertEquals(2, dfsClient.getDeadNodes(din).size());
assertEquals(2, dfsClient.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size());
@@ -225,15 +232,14 @@ public class TestDeadNodeDetection {
conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
1000);
conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, 1);
-
// We'll be using a 512 bytes block size just for tests
- // so making sure the checksum bytes too match it.
+ // so making sure the checksum bytes match it too.
conf.setInt("io.bytes.per.checksum", 512);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
- Path filePath = new Path("testDeadNodeDetectionMaxDeadNodesProbeQueue");
+ Path filePath = new Path("/testDeadNodeDetectionMaxDeadNodesProbeQueue");
createFile(fs, filePath);
// Remove three DNs,
@@ -260,6 +266,55 @@ public class TestDeadNodeDetection {
}
}
+ @Test
+ public void testDeadNodeDetectionSuspectNode() throws Exception {
+ conf = new HdfsConfiguration();
+ conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
+ conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY, 1);
+ // We'll be using a 512 bytes block size just for tests
+ // so making sure the checksum bytes match it too.
+ conf.setInt("io.bytes.per.checksum", 512);
+ DeadNodeDetector.disabledProbeThreadForTest();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+
+ FileSystem fs = cluster.getFileSystem();
+ Path filePath = new Path("/testDeadNodeDetectionSuspectNode");
+ createFile(fs, filePath);
+
+ MiniDFSCluster.DataNodeProperties one = cluster.stopDataNode(0);
+
+ FSDataInputStream in = fs.open(filePath);
+ DFSInputStream din = (DFSInputStream) in.getWrappedStream();
+ DFSClient dfsClient = din.getDFSClient();
+ DeadNodeDetector deadNodeDetector =
+ dfsClient.getClientContext().getDeadNodeDetector();
+ try {
+ try {
+ in.read();
+ } catch (BlockMissingException e) {
+ }
+ waitForSuspectNode(din.getDFSClient());
+ cluster.restartDataNode(one, true);
+ Assert.assertEquals(1,
+ deadNodeDetector.getSuspectNodesProbeQueue().size());
+ Assert.assertEquals(0,
+ deadNodeDetector.clearAndGetDetectedDeadNodes().size());
+ deadNodeDetector.startProbeScheduler();
+ Thread.sleep(1000);
+ Assert.assertEquals(0,
+ deadNodeDetector.getSuspectNodesProbeQueue().size());
+ Assert.assertEquals(0,
+ deadNodeDetector.clearAndGetDetectedDeadNodes().size());
+ } finally {
+ in.close();
+ deleteFile(fs, filePath);
+ assertEquals(0, dfsClient.getDeadNodes(din).size());
+ assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector()
+ .clearAndGetDetectedDeadNodes().size());
+ }
+ }
+
private void createFile(FileSystem fs, Path filePath) throws IOException {
FSDataOutputStream out = null;
try {
@@ -286,12 +341,31 @@ public class TestDeadNodeDetection {
fs.delete(filePath, true);
}
- private void waitForDeadNodeRecovery(DFSInputStream din) throws Exception {
+ private void waitForDeadNode(DFSClient dfsClient, int size) throws Exception {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ if (dfsClient.getClientContext().getDeadNodeDetector()
+ .clearAndGetDetectedDeadNodes().size() == size) {
+ return true;
+ }
+ } catch (Exception e) {
+ // Ignore the exception
+ }
+
+ return false;
+ }
+ }, 5000, 100000);
+ }
+
+ private void waitForSuspectNode(DFSClient dfsClient) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
- if (din.getDFSClient().getDeadNodes(din).size() == 2) {
+ if (dfsClient.getClientContext().getDeadNodeDetector()
+ .getSuspectNodesProbeQueue().size() > 0) {
return true;
}
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org