You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2021/03/15 03:36:30 UTC
[hadoop] branch trunk updated: HDFS-15809. DeadNodeDetector does
not remove live nodes from dead node set. Contributed by Jinglun.
This is an automated email from the ASF dual-hosted git repository.
sunlisheng pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7025f399 HDFS-15809. DeadNodeDetector does not remove live nodes from dead node set. Contributed by Jinglun.
7025f399 is described below
commit 7025f39944e628345109b43cba2cd4d49ca8cc6b
Author: sunlisheng <su...@apache.org>
AuthorDate: Mon Mar 15 11:34:13 2021 +0800
HDFS-15809. DeadNodeDetector does not remove live nodes from dead node set. Contributed by Jinglun.
---
.../org/apache/hadoop/hdfs/DeadNodeDetector.java | 85 +++++++++++++---------
.../hadoop/hdfs/client/HdfsClientConfigKeys.java | 10 +--
.../src/main/resources/hdfs-default.xml | 20 ++---
.../apache/hadoop/hdfs/TestDeadNodeDetection.java | 71 ++++++++++++------
4 files changed, 107 insertions(+), 79 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
index e17f261..cd46551 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
@@ -29,9 +29,9 @@ import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.Deque;
+import java.util.LinkedList;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -40,8 +40,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT;
@@ -54,9 +52,9 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_DEFAULT;
/**
* Detect the dead nodes in advance, and share this information among all the
@@ -74,7 +72,7 @@ public class DeadNodeDetector extends Daemon {
/**
* Waiting time when DeadNodeDetector's state is idle.
*/
- private static final long IDLE_SLEEP_MS = 10000;
+ private final long idleSleepMs;
/**
* Client context name.
@@ -114,16 +112,6 @@ public class DeadNodeDetector extends Daemon {
private long suspectNodeDetectInterval = 0;
/**
- * The max queue size of probing dead node.
- */
- private int maxDeadNodesProbeQueueLen = 0;
-
- /**
- * The max queue size of probing suspect node.
- */
- private int maxSuspectNodesProbeQueueLen;
-
- /**
* Connection timeout for probing dead node in milliseconds.
*/
private long probeConnectionTimeoutMs;
@@ -131,12 +119,12 @@ public class DeadNodeDetector extends Daemon {
/**
* The dead node probe queue.
*/
- private Queue<DatanodeInfo> deadNodesProbeQueue;
+ private UniqueQueue<DatanodeInfo> deadNodesProbeQueue;
/**
* The suspect node probe queue.
*/
- private Queue<DatanodeInfo> suspectNodesProbeQueue;
+ private UniqueQueue<DatanodeInfo> suspectNodesProbeQueue;
/**
* The thread pool of probing dead node.
@@ -182,6 +170,32 @@ public class DeadNodeDetector extends Daemon {
}
/**
+ * The thread safe unique queue.
+ */
+ static class UniqueQueue<T> {
+ private Deque<T> queue = new LinkedList<>();
+ private Set<T> set = new HashSet<>();
+
+ synchronized boolean offer(T dn) {
+ if (set.add(dn)) {
+ queue.addLast(dn);
+ return true;
+ }
+ return false;
+ }
+
+ synchronized T poll() {
+ T dn = queue.pollFirst();
+ set.remove(dn);
+ return dn;
+ }
+
+ synchronized int size() {
+ return set.size();
+ }
+ }
+
+ /**
* Disabled start probe suspect/dead thread for the testing.
*/
private static volatile boolean disabledProbeThreadForTest = false;
@@ -203,20 +217,14 @@ public class DeadNodeDetector extends Daemon {
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT);
socketTimeout =
conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT);
- maxDeadNodesProbeQueueLen =
- conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY,
- DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT);
- maxSuspectNodesProbeQueueLen =
- conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY,
- DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT);
probeConnectionTimeoutMs = conf.getLong(
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT);
+ this.deadNodesProbeQueue = new UniqueQueue<>();
+ this.suspectNodesProbeQueue = new UniqueQueue<>();
- this.deadNodesProbeQueue =
- new ArrayBlockingQueue<DatanodeInfo>(maxDeadNodesProbeQueueLen);
- this.suspectNodesProbeQueue =
- new ArrayBlockingQueue<DatanodeInfo>(maxSuspectNodesProbeQueueLen);
+ idleSleepMs = conf.getLong(DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY,
+ DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_DEFAULT);
int deadNodeDetectDeadThreads =
conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY,
@@ -447,8 +455,7 @@ public class DeadNodeDetector extends Daemon {
for (DatanodeInfo datanodeInfo : datanodeInfos) {
if (!deadNodesProbeQueue.offer(datanodeInfo)) {
LOG.debug("Skip to add dead node {} to check " +
- "since the probe queue is full.", datanodeInfo);
- break;
+ "since the node is already in the probe queue.", datanodeInfo);
} else {
LOG.debug("Add dead node to check: {}.", datanodeInfo);
}
@@ -458,7 +465,7 @@ public class DeadNodeDetector extends Daemon {
private void idle() {
try {
- Thread.sleep(IDLE_SLEEP_MS);
+ Thread.sleep(idleSleepMs);
} catch (InterruptedException e) {
LOG.debug("Got interrupted while DeadNodeDetector is idle.", e);
Thread.currentThread().interrupt();
@@ -483,14 +490,24 @@ public class DeadNodeDetector extends Daemon {
deadNodes.remove(datanodeInfo.getDatanodeUuid());
}
- public Queue<DatanodeInfo> getDeadNodesProbeQueue() {
+ public UniqueQueue<DatanodeInfo> getDeadNodesProbeQueue() {
return deadNodesProbeQueue;
}
- public Queue<DatanodeInfo> getSuspectNodesProbeQueue() {
+ public UniqueQueue<DatanodeInfo> getSuspectNodesProbeQueue() {
return suspectNodesProbeQueue;
}
+ @VisibleForTesting
+ void setSuspectQueue(UniqueQueue<DatanodeInfo> queue) {
+ this.suspectNodesProbeQueue = queue;
+ }
+
+ @VisibleForTesting
+ void setDeadQueue(UniqueQueue<DatanodeInfo> queue) {
+ this.deadNodesProbeQueue = queue;
+ }
+
/**
* Add datanode to suspectNodes and suspectAndDeadNodes.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index c17ad0e..2a6a7a5 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -164,13 +164,9 @@ public interface HdfsClientConfigKeys {
"dfs.client.deadnode.detection.enabled";
boolean DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT = false;
- String DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY =
- "dfs.client.deadnode.detection.deadnode.queue.max";
- int DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT = 100;
-
- String DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY =
- "dfs.client.deadnode.detection.suspectnode.queue.max";
- int DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT = 1000;
+ String DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY =
+ "dfs.client.deadnode.detection.idle.sleep.ms";
+ long DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_DEFAULT = 10000;
String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY =
"dfs.client.deadnode.detection.probe.connection.timeout.ms";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 94ff3ec..c308058 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3191,26 +3191,18 @@
</property>
<property>
- <name>dfs.client.deadnode.detection.deadnode.queue.max</name>
- <value>100</value>
- <description>
- The max queue size of probing dead node.
- </description>
- </property>
-
- <property>
- <name>dfs.client.deadnode.detection.suspectnode.queue.max</name>
- <value>1000</value>
+ <name>dfs.client.deadnode.detection.probe.deadnode.threads</name>
+ <value>10</value>
<description>
- The max queue size of probing suspect node.
+ The maximum number of threads to use for probing dead node.
</description>
</property>
<property>
- <name>dfs.client.deadnode.detection.probe.deadnode.threads</name>
- <value>10</value>
+ <name>dfs.client.deadnode.detection.idle.sleep.ms</name>
+ <value>10000</value>
<description>
- The maximum number of threads to use for probing dead node.
+ The sleep time of DeadNodeDetector per iteration.
</description>
</property>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
index 9134f36..e8da918 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
@@ -30,19 +30,20 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.IOException;
+import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertNotSame;
@@ -73,6 +74,7 @@ public class TestDeadNodeDetection {
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY,
1000);
conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 0);
+ conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY, 100);
}
@After
@@ -247,42 +249,63 @@ public class TestDeadNodeDetection {
}
@Test
- public void testDeadNodeDetectionMaxDeadNodesProbeQueue() throws Exception {
- conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, 1);
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
- cluster.waitActive();
-
- FileSystem fs = cluster.getFileSystem();
- Path filePath = new Path("/testDeadNodeDetectionMaxDeadNodesProbeQueue");
- createFile(fs, filePath);
-
- // Remove three DNs,
- cluster.stopDataNode(0);
- cluster.stopDataNode(0);
- cluster.stopDataNode(0);
-
- FSDataInputStream in = fs.open(filePath);
- DFSInputStream din = (DFSInputStream) in.getWrappedStream();
- DFSClient dfsClient = din.getDFSClient();
+ public void testDeadNodeDetectionDeadNodeProbe() throws Exception {
+ FileSystem fs = null;
+ FSDataInputStream in = null;
+ Path filePath = new Path("/" + GenericTestUtils.getMethodName());
try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ cluster.waitActive();
+
+ fs = cluster.getFileSystem();
+ createFile(fs, filePath);
+
+ // Remove three DNs,
+ cluster.stopDataNode(0);
+ cluster.stopDataNode(0);
+ cluster.stopDataNode(0);
+
+ in = fs.open(filePath);
+ DFSInputStream din = (DFSInputStream) in.getWrappedStream();
+ DFSClient dfsClient = din.getDFSClient();
+ DeadNodeDetector deadNodeDetector =
+ dfsClient.getClientContext().getDeadNodeDetector();
+ // Spy suspect queue and dead queue.
+ DeadNodeDetector.UniqueQueue<DatanodeInfo> queue =
+ deadNodeDetector.getSuspectNodesProbeQueue();
+ DeadNodeDetector.UniqueQueue<DatanodeInfo> suspectSpy =
+ Mockito.spy(queue);
+ deadNodeDetector.setSuspectQueue(suspectSpy);
+ queue = deadNodeDetector.getDeadNodesProbeQueue();
+ DeadNodeDetector.UniqueQueue<DatanodeInfo> deadSpy = Mockito.spy(queue);
+ deadNodeDetector.setDeadQueue(deadSpy);
+ // Trigger dead node detection.
try {
in.read();
} catch (BlockMissingException e) {
}
Thread.sleep(1500);
- Assert.assertTrue((dfsClient.getClientContext().getDeadNodeDetector()
- .getDeadNodesProbeQueue().size()
- + dfsClient.getDeadNodes(din).size()) <= 4);
+ Collection<DatanodeInfo> deadNodes =
+ dfsClient.getDeadNodeDetector().clearAndGetDetectedDeadNodes();
+ assertEquals(3, deadNodes.size());
+ for (DatanodeInfo dead : deadNodes) {
+ // Each node is suspected once then marked as dead.
+ Mockito.verify(suspectSpy, Mockito.times(1)).offer(dead);
+ // All the dead nodes should be scheduled and probed at least once.
+ Mockito.verify(deadSpy, Mockito.atLeastOnce()).offer(dead);
+ Mockito.verify(deadSpy, Mockito.atLeastOnce()).poll();
+ }
} finally {
- in.close();
+ if (in != null) {
+ in.close();
+ }
deleteFile(fs, filePath);
}
}
@Test
public void testDeadNodeDetectionSuspectNode() throws Exception {
- conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY, 1);
DeadNodeDetector.setDisabledProbeThreadForTest(true);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org