You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2021/01/28 02:13:28 UTC

[hadoop] branch trunk updated: HDFS-15661. The DeadNodeDetector should not be shared by different DFSClients. Contributed by Jinglun.

This is an automated email from the ASF dual-hosted git repository.

sunlisheng pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f8769e0  HDFS-15661. The DeadNodeDetector should not be shared by different DFSClients. Contributed by Jinglun.
f8769e0 is described below

commit f8769e0f4b917d9fda8ff7a9fddb4d755d246a1e
Author: sunlisheng <su...@apache.org>
AuthorDate: Thu Jan 28 10:10:39 2021 +0800

    HDFS-15661. The DeadNodeDetector should not be shared by different DFSClients. Contributed by Jinglun.
---
 .../java/org/apache/hadoop/hdfs/ClientContext.java | 45 +++++++++++++++-------
 .../java/org/apache/hadoop/hdfs/DFSClient.java     |  9 ++++-
 .../org/apache/hadoop/hdfs/DeadNodeDetector.java   |  2 +-
 .../apache/hadoop/hdfs/TestDeadNodeDetection.java  | 37 ++++++++++++++++++
 4 files changed, 77 insertions(+), 16 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
index b34420d..47e985b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
@@ -40,10 +40,10 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
-import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -119,8 +119,6 @@ public class ClientContext {
   private NodeBase clientNode;
   private boolean topologyResolutionEnabled;
 
-  private Daemon deadNodeDetectorThr = null;
-
   /**
    * The switch to DeadNodeDetector.
    */
@@ -130,12 +128,18 @@ public class ClientContext {
    * Detect the dead datanodes in advance, and share this information among all
    * the DFSInputStreams in the same client.
    */
-  private DeadNodeDetector deadNodeDetector = null;
+  private volatile DeadNodeDetector deadNodeDetector = null;
+
+  /**
+   * Count the reference of ClientContext.
+   */
+  private int counter = 0;
 
   /**
    * ShortCircuitCache array size.
    */
   private final int clientShortCircuitNum;
+  private Configuration configuration;
 
   private ClientContext(String name, DfsClientConf conf,
       Configuration config) {
@@ -149,6 +153,7 @@ public class ClientContext {
       this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf);
     }
 
+    this.configuration = config;
     this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
         scConf.getSocketCacheExpiry());
     this.keyProviderCache = new KeyProviderCache(
@@ -159,11 +164,6 @@ public class ClientContext {
     this.byteArrayManager = ByteArrayManager.newInstance(
         conf.getWriteByteArrayManagerConf());
     this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled();
-    if (deadNodeDetectionEnabled && deadNodeDetector == null) {
-      deadNodeDetector = new DeadNodeDetector(name, config);
-      deadNodeDetectorThr = new Daemon(deadNodeDetector);
-      deadNodeDetectorThr.start();
-    }
     initTopologyResolution(config);
   }
 
@@ -201,6 +201,7 @@ public class ClientContext {
         context.printConfWarningIfNeeded(conf);
       }
     }
+    context.reference();
     return context;
   }
 
@@ -301,17 +302,33 @@ public class ClientContext {
   }
 
   /**
-   * Close dead node detector thread.
+   * Increment the counter. Start the dead node detector thread if there is no
+   * reference.
+   */
+  synchronized void reference() {
+    counter++;
+    if (deadNodeDetectionEnabled && deadNodeDetector == null) {
+      deadNodeDetector = new DeadNodeDetector(name, configuration);
+      deadNodeDetector.start();
+    }
+  }
+
+  /**
+   * Decrement the counter. Close the dead node detector thread if there is no
+   * reference.
    */
-  public void stopDeadNodeDetectorThread() {
-    if (deadNodeDetectorThr != null) {
-      deadNodeDetectorThr.interrupt();
+  synchronized void unreference() {
+    Preconditions.checkState(counter > 0);
+    counter--;
+    if (counter == 0 && deadNodeDetectionEnabled && deadNodeDetector != null) {
+      deadNodeDetector.interrupt();
       try {
-        deadNodeDetectorThr.join();
+        deadNodeDetector.join();
       } catch (InterruptedException e) {
         LOG.warn("Encountered exception while waiting to join on dead " +
             "node detector thread.", e);
       }
+      deadNodeDetector = null;
     }
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 861b6a9..fc3a16d 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -648,7 +648,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
       clientRunning = false;
       // close dead node detector thread
       if (!disabledStopDeadNodeDetectorThreadForTest) {
-        clientContext.stopDeadNodeDetectorThread();
+        clientContext.unreference();
       }
 
       // close connections to the namenode
@@ -3441,4 +3441,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   private boolean isDeadNodeDetectionEnabled() {
     return clientContext.isDeadNodeDetectionEnabled();
   }
+
+  /**
+   * Obtain DeadNodeDetector of the current client.
+   */
+  public DeadNodeDetector getDeadNodeDetector() {
+    return clientContext.getDeadNodeDetector();
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
index fd8263f..112bc04 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
@@ -62,7 +62,7 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCK
  * Detect the dead nodes in advance, and share this information among all the
  * DFSInputStreams in the same client.
  */
-public class DeadNodeDetector implements Runnable {
+public class DeadNodeDetector extends Daemon {
   public static final Logger LOG =
       LoggerFactory.getLogger(DeadNodeDetector.class);
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
index 09e6702..9c52fcd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.net.URI;
 import java.util.function.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -43,6 +44,11 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for dead node detection in DFSClient.
@@ -320,6 +326,37 @@ public class TestDeadNodeDetection {
     }
   }
 
+  @Test
+  public void testCloseDeadNodeDetector() throws Exception {
+    DistributedFileSystem dfs0 = (DistributedFileSystem) FileSystem
+        .newInstance(new URI("hdfs://127.0.0.1:2001/"), conf);
+    DistributedFileSystem dfs1 = (DistributedFileSystem) FileSystem
+        .newInstance(new URI("hdfs://127.0.0.1:2001/"), conf);
+    // The DeadNodeDetector is shared by different DFSClients.
+    DeadNodeDetector detector = dfs0.getClient().getDeadNodeDetector();
+    assertNotNull(detector);
+    assertSame(detector, dfs1.getClient().getDeadNodeDetector());
+    // Close one client. The dead node detector should be alive.
+    dfs0.close();
+    detector = dfs0.getClient().getDeadNodeDetector();
+    assertNotNull(detector);
+    assertSame(detector, dfs1.getClient().getDeadNodeDetector());
+    assertTrue(detector.isAlive());
+    // Close all clients. The dead node detector should be closed.
+    dfs1.close();
+    detector = dfs0.getClient().getDeadNodeDetector();
+    assertNull(detector);
+    assertSame(detector, dfs1.getClient().getDeadNodeDetector());
+    // Create a new client. The dead node detector should be alive.
+    dfs1 = (DistributedFileSystem) FileSystem
+        .newInstance(new URI("hdfs://127.0.0.1:2001/"), conf);
+    DeadNodeDetector newDetector = dfs0.getClient().getDeadNodeDetector();
+    assertNotNull(newDetector);
+    assertTrue(newDetector.isAlive());
+    assertNotSame(detector, newDetector);
+    dfs1.close();
+  }
+
   private void createFile(FileSystem fs, Path filePath) throws IOException {
     FSDataOutputStream out = null;
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org