You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2021/01/28 02:13:28 UTC
[hadoop] branch trunk updated: HDFS-15661. The DeadNodeDetector
should not be shared by different DFSClients. Contributed by Jinglun.
This is an automated email from the ASF dual-hosted git repository.
sunlisheng pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new f8769e0 HDFS-15661. The DeadNodeDetector should not be shared by different DFSClients. Contributed by Jinglun.
f8769e0 is described below
commit f8769e0f4b917d9fda8ff7a9fddb4d755d246a1e
Author: sunlisheng <su...@apache.org>
AuthorDate: Thu Jan 28 10:10:39 2021 +0800
HDFS-15661. The DeadNodeDetector should not be shared by different DFSClients. Contributed by Jinglun.
---
.../java/org/apache/hadoop/hdfs/ClientContext.java | 45 +++++++++++++++-------
.../java/org/apache/hadoop/hdfs/DFSClient.java | 9 ++++-
.../org/apache/hadoop/hdfs/DeadNodeDetector.java | 2 +-
.../apache/hadoop/hdfs/TestDeadNodeDetection.java | 37 ++++++++++++++++++
4 files changed, 77 insertions(+), 16 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
index b34420d..47e985b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
@@ -40,10 +40,10 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
-import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,8 +119,6 @@ public class ClientContext {
private NodeBase clientNode;
private boolean topologyResolutionEnabled;
- private Daemon deadNodeDetectorThr = null;
-
/**
* The switch to DeadNodeDetector.
*/
@@ -130,12 +128,18 @@ public class ClientContext {
* Detect the dead datanodes in advance, and share this information among all
* the DFSInputStreams in the same client.
*/
- private DeadNodeDetector deadNodeDetector = null;
+ private volatile DeadNodeDetector deadNodeDetector = null;
+
+ /**
+ * Count the reference of ClientContext.
+ */
+ private int counter = 0;
/**
* ShortCircuitCache array size.
*/
private final int clientShortCircuitNum;
+ private Configuration configuration;
private ClientContext(String name, DfsClientConf conf,
Configuration config) {
@@ -149,6 +153,7 @@ public class ClientContext {
this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf);
}
+ this.configuration = config;
this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
scConf.getSocketCacheExpiry());
this.keyProviderCache = new KeyProviderCache(
@@ -159,11 +164,6 @@ public class ClientContext {
this.byteArrayManager = ByteArrayManager.newInstance(
conf.getWriteByteArrayManagerConf());
this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled();
- if (deadNodeDetectionEnabled && deadNodeDetector == null) {
- deadNodeDetector = new DeadNodeDetector(name, config);
- deadNodeDetectorThr = new Daemon(deadNodeDetector);
- deadNodeDetectorThr.start();
- }
initTopologyResolution(config);
}
@@ -201,6 +201,7 @@ public class ClientContext {
context.printConfWarningIfNeeded(conf);
}
}
+ context.reference();
return context;
}
@@ -301,17 +302,33 @@ public class ClientContext {
}
/**
- * Close dead node detector thread.
+ * Increment the counter. Start the dead node detector thread if there is no
+ * reference.
+ */
+ synchronized void reference() {
+ counter++;
+ if (deadNodeDetectionEnabled && deadNodeDetector == null) {
+ deadNodeDetector = new DeadNodeDetector(name, configuration);
+ deadNodeDetector.start();
+ }
+ }
+
+ /**
+ * Decrement the counter. Close the dead node detector thread if there is no
+ * reference.
*/
- public void stopDeadNodeDetectorThread() {
- if (deadNodeDetectorThr != null) {
- deadNodeDetectorThr.interrupt();
+ synchronized void unreference() {
+ Preconditions.checkState(counter > 0);
+ counter--;
+ if (counter == 0 && deadNodeDetectionEnabled && deadNodeDetector != null) {
+ deadNodeDetector.interrupt();
try {
- deadNodeDetectorThr.join();
+ deadNodeDetector.join();
} catch (InterruptedException e) {
LOG.warn("Encountered exception while waiting to join on dead " +
"node detector thread.", e);
}
+ deadNodeDetector = null;
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 861b6a9..fc3a16d 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -648,7 +648,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
clientRunning = false;
// close dead node detector thread
if (!disabledStopDeadNodeDetectorThreadForTest) {
- clientContext.stopDeadNodeDetectorThread();
+ clientContext.unreference();
}
// close connections to the namenode
@@ -3441,4 +3441,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private boolean isDeadNodeDetectionEnabled() {
return clientContext.isDeadNodeDetectionEnabled();
}
+
+ /**
+ * Obtain DeadNodeDetector of the current client.
+ */
+ public DeadNodeDetector getDeadNodeDetector() {
+ return clientContext.getDeadNodeDetector();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
index fd8263f..112bc04 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
@@ -62,7 +62,7 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCK
* Detect the dead nodes in advance, and share this information among all the
* DFSInputStreams in the same client.
*/
-public class DeadNodeDetector implements Runnable {
+public class DeadNodeDetector extends Daemon {
public static final Logger LOG =
LoggerFactory.getLogger(DeadNodeDetector.class);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
index 09e6702..9c52fcd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;
+import java.net.URI;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -43,6 +44,11 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
/**
* Tests for dead node detection in DFSClient.
@@ -320,6 +326,37 @@ public class TestDeadNodeDetection {
}
}
+ @Test
+ public void testCloseDeadNodeDetector() throws Exception {
+ DistributedFileSystem dfs0 = (DistributedFileSystem) FileSystem
+ .newInstance(new URI("hdfs://127.0.0.1:2001/"), conf);
+ DistributedFileSystem dfs1 = (DistributedFileSystem) FileSystem
+ .newInstance(new URI("hdfs://127.0.0.1:2001/"), conf);
+ // The DeadNodeDetector is shared by different DFSClients.
+ DeadNodeDetector detector = dfs0.getClient().getDeadNodeDetector();
+ assertNotNull(detector);
+ assertSame(detector, dfs1.getClient().getDeadNodeDetector());
+ // Close one client. The dead node detector should be alive.
+ dfs0.close();
+ detector = dfs0.getClient().getDeadNodeDetector();
+ assertNotNull(detector);
+ assertSame(detector, dfs1.getClient().getDeadNodeDetector());
+ assertTrue(detector.isAlive());
+ // Close all clients. The dead node detector should be closed.
+ dfs1.close();
+ detector = dfs0.getClient().getDeadNodeDetector();
+ assertNull(detector);
+ assertSame(detector, dfs1.getClient().getDeadNodeDetector());
+ // Create a new client. The dead node detector should be alive.
+ dfs1 = (DistributedFileSystem) FileSystem
+ .newInstance(new URI("hdfs://127.0.0.1:2001/"), conf);
+ DeadNodeDetector newDetector = dfs0.getClient().getDeadNodeDetector();
+ assertNotNull(newDetector);
+ assertTrue(newDetector.isAlive());
+ assertNotSame(detector, newDetector);
+ dfs1.close();
+ }
+
private void createFile(FileSystem fs, Path filePath) throws IOException {
FSDataOutputStream out = null;
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org