You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2021/06/21 16:33:46 UTC

[hadoop] branch trunk updated: HDFS-14575. LeaseRenewer#daemon threads leak in DFSClient. Contributed by Renukaprasad C.

This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 10b79a2  HDFS-14575. LeaseRenewer#daemon threads leak in DFSClient. Contributed by Renukaprasad C.
10b79a2 is described below

commit 10b79a26fe0677b266acf237e8458e93743424a6
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Tue Jun 22 00:32:55 2021 +0800

    HDFS-14575. LeaseRenewer#daemon threads leak in DFSClient. Contributed by Renukaprasad C.
    
    Co-authored-by: Tao Yang <ta...@apache.org>
    Reviewed-by: He Xiaoqiao <he...@apache.org>
    Reviewed-by: Wei-Chiu Chuang <we...@apache.org>
---
 .../java/org/apache/hadoop/hdfs/DFSClient.java     | 10 ++-
 .../hadoop/hdfs/client/impl/LeaseRenewer.java      | 34 +++++++--
 .../hadoop/hdfs/client/impl/TestLeaseRenewer.java  | 87 ++++++++++++++++++++++
 3 files changed, 125 insertions(+), 6 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 3fa4dd0..56adc5c 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -505,7 +505,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
       throws IOException {
     synchronized (filesBeingWritten) {
       putFileBeingWritten(inodeId, out);
-      getLeaseRenewer().put(this);
+      LeaseRenewer renewer = getLeaseRenewer();
+      boolean result = renewer.put(this);
+      if (!result) {
+        // Existing LeaseRenewer cannot add another Daemon, so remove existing
+        // and add new one.
+        LeaseRenewer.remove(renewer);
+        renewer = getLeaseRenewer();
+        renewer.put(this);
+      }
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
index d108af9..6b4c899 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -79,6 +80,8 @@ public class LeaseRenewer {
   private static long leaseRenewerGraceDefault = 60*1000L;
   static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
 
+  private AtomicBoolean isLSRunning = new AtomicBoolean(false);
+
   /** Get a {@link LeaseRenewer} instance */
   public static LeaseRenewer getInstance(final String authority,
       final UserGroupInformation ugi, final DFSClient dfsc) {
@@ -88,6 +91,15 @@ public class LeaseRenewer {
   }
 
   /**
+   * Remove the given renewer from the Factory.
+   * Subsequent call will receive new {@link LeaseRenewer} instance.
+   * @param renewer Instance to be cleared from Factory
+   */
+  public static void remove(LeaseRenewer renewer) {
+    Factory.INSTANCE.remove(renewer);
+  }
+
+  /**
    * A factory for sharing {@link LeaseRenewer} objects
    * among {@link DFSClient} instances
    * so that there is only one renewer per authority per user.
@@ -156,6 +168,9 @@ public class LeaseRenewer {
       final LeaseRenewer stored = renewers.get(r.factorykey);
       //Since a renewer may expire, the stored renewer can be different.
       if (r == stored) {
+        // Expire LeaseRenewer daemon thread as soon as possible.
+        r.clearClients();
+        r.setEmptyTime(0);
         renewers.remove(r.factorykey);
       }
     }
@@ -241,6 +256,10 @@ public class LeaseRenewer {
     }
   }
 
+  private synchronized void clearClients() {
+    dfsclients.clear();
+  }
+
   private synchronized boolean clientsRunning() {
     for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
       if (!i.next().isClientRunning()) {
@@ -292,11 +311,18 @@ public class LeaseRenewer {
         && Time.monotonicNow() - emptyTime > gracePeriod;
   }
 
-  public synchronized void put(final DFSClient dfsc) {
+  public synchronized boolean put(final DFSClient dfsc) {
     if (dfsc.isClientRunning()) {
       if (!isRunning() || isRenewerExpired()) {
-        //start a new deamon with a new id.
+        // Start a new daemon with a new id.
         final int id = ++currentId;
+        if (isLSRunning.get()) {
+          // Not allowed to add multiple daemons into LeaseRenewer, let client
+          // create new LR and continue to acquire lease.
+          return false;
+        }
+        isLSRunning.getAndSet(true);
+
         daemon = new Daemon(new Runnable() {
           @Override
           public void run() {
@@ -328,6 +354,7 @@ public class LeaseRenewer {
       }
       emptyTime = Long.MAX_VALUE;
     }
+    return true;
   }
 
   @VisibleForTesting
@@ -426,9 +453,6 @@ public class LeaseRenewer {
           synchronized (this) {
             DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout();
             dfsclientsCopy = new ArrayList<>(dfsclients);
-            dfsclients.clear();
-            //Expire the current LeaseRenewer thread.
-            emptyTime = 0;
             Factory.INSTANCE.remove(LeaseRenewer.this);
           }
           for (DFSClient dfsClient : dfsclientsCopy) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
index 1ffec85..f1a11ed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
@@ -31,7 +31,11 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 
 import static org.junit.Assert.assertSame;
 
@@ -168,6 +172,11 @@ public class TestLeaseRenewer {
 
     renewer.closeClient(mockClient1);
     renewer.closeClient(mockClient2);
+    renewer.closeClient(MOCK_DFSCLIENT);
+
+    // Make sure renewer is not running due to expiration.
+    Thread.sleep(FAST_GRACE_PERIOD * 2);
+    Assert.assertTrue(!renewer.isRunning());
   }
 
   @Test
@@ -197,4 +206,82 @@ public class TestLeaseRenewer {
     Assert.assertFalse(renewer.isRunning());
   }
 
+  /**
+   * Test for HDFS-14575. In this fix, the LeaseRenewer clears all clients
+   * and expires immediately via setting empty time to 0 before it's removed
+   * from factory. Previously, LeaseRenewer#daemon thread might leak.
+   */
+  @Test
+  public void testDaemonThreadLeak() throws Exception {
+    Assert.assertFalse("Renewer not initially running", renewer.isRunning());
+
+    // Pretend to create a file#1, daemon#1 starts
+    renewer.put(MOCK_DFSCLIENT);
+    Assert.assertTrue("Renewer should have started running",
+        renewer.isRunning());
+    Pattern daemonThreadNamePattern = Pattern.compile("LeaseRenewer:\\S+");
+    Assert.assertEquals(1, countThreadMatching(daemonThreadNamePattern));
+
+    // Pretend to create file#2, daemon#2 starts due to expiration
+    LeaseRenewer lastRenewer = renewer;
+    renewer =
+        LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+    Assert.assertEquals(lastRenewer, renewer);
+
+    // Pretend to close file#1
+    renewer.closeClient(MOCK_DFSCLIENT);
+    Assert.assertEquals(1, countThreadMatching(daemonThreadNamePattern));
+
+    // Pretend to be expired
+    renewer.setEmptyTime(0);
+
+    renewer =
+        LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+    renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
+    boolean success = renewer.put(MOCK_DFSCLIENT);
+    if (!success) {
+      LeaseRenewer.remove(renewer);
+      renewer =
+          LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+      renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
+      renewer.put(MOCK_DFSCLIENT);
+    }
+
+    int threadCount = countThreadMatching(daemonThreadNamePattern);
+    //Sometimes old LR#Daemon gets closed and lead to count 1 (rare scenario)
+    Assert.assertTrue(1 == threadCount || 2 == threadCount);
+
+    // After grace period, both daemon#1 and renewer#1 will be removed due to
+    // expiration, then daemon#2 will leak before HDFS-14575.
+    Thread.sleep(FAST_GRACE_PERIOD * 2);
+
+    // Pretend to close file#2, renewer#2 will be created
+    lastRenewer = renewer;
+    renewer =
+        LeaseRenewer.getInstance(FAKE_AUTHORITY, FAKE_UGI_A, MOCK_DFSCLIENT);
+    Assert.assertEquals(lastRenewer, renewer);
+    renewer.setGraceSleepPeriod(FAST_GRACE_PERIOD);
+    renewer.closeClient(MOCK_DFSCLIENT);
+    renewer.setEmptyTime(0);
+    // Make sure LeaseRenewer#daemon threads will terminate after grace period
+    Thread.sleep(FAST_GRACE_PERIOD * 2);
+    Assert.assertEquals("LeaseRenewer#daemon thread leaks", 0,
+        countThreadMatching(daemonThreadNamePattern));
+  }
+
+  private static int countThreadMatching(Pattern pattern) {
+    ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+    ThreadInfo[] infos =
+        threadBean.getThreadInfo(threadBean.getAllThreadIds(), 1);
+    int count = 0;
+    for (ThreadInfo info : infos) {
+      if (info == null) {
+        continue;
+      }
+      if (pattern.matcher(info.getThreadName()).matches()) {
+        count++;
+      }
+    }
+    return count;
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org