You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/05/28 20:53:06 UTC

hadoop git commit: HDFS-8429. Avoid stuck threads if there is an error in DomainSocketWatcher that stops the thread. (zhouyingchao via cmccabe)

Repository: hadoop
Updated Branches:
  refs/heads/trunk d2d95bfe8 -> 246cefa08


HDFS-8429. Avoid stuck threads if there is an error in DomainSocketWatcher that stops the thread.  (zhouyingchao via cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/246cefa0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/246cefa0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/246cefa0

Branch: refs/heads/trunk
Commit: 246cefa089156a50bf086b8b1e4d4324d66dc58c
Parents: d2d95bf
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Thu May 28 11:52:28 2015 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Thu May 28 11:52:28 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 +
 .../hadoop/net/unix/DomainSocketWatcher.java    | 21 +++++-
 .../hadoop/net/unix/DomainSocketWatcher.c       |  2 +-
 .../net/unix/TestDomainSocketWatcher.java       | 75 ++++++++++++++++++++
 4 files changed, 99 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/246cefa0/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 69f5aa7..51eff78 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -782,6 +782,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12035. shellcheck plugin displays a wrong version potentially
     (Kengo Seki via aw)
 
+    HDFS-8429. Avoid stuck threads if there is an error in DomainSocketWatcher
+    that stops the thread.  (zhouyingchao via cmccabe)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/246cefa0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
index 5648ae1..ad2fbfb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
@@ -470,6 +470,7 @@ public final class DomainSocketWatcher implements Closeable {
               // Handle pending additions (before pending removes).
               for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext(); ) {
                 Entry entry = iter.next();
+                iter.remove();
                 DomainSocket sock = entry.getDomainSocket();
                 Entry prevEntry = entries.put(sock.fd, entry);
                 Preconditions.checkState(prevEntry == null,
@@ -479,7 +480,6 @@ public final class DomainSocketWatcher implements Closeable {
                   LOG.trace(this + ": adding fd " + sock.fd);
                 }
                 fdSet.add(sock.fd);
-                iter.remove();
               }
               // Handle pending removals
               while (true) {
@@ -525,6 +525,25 @@ public final class DomainSocketWatcher implements Closeable {
           }
           entries.clear();
           fdSet.close();
+          closed = true;
+          if (!(toAdd.isEmpty() && toRemove.isEmpty())) {
+            // Items in toAdd might not be added to entries, handle it here
+            for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext();) {
+              Entry entry = iter.next();
+              entry.getDomainSocket().refCount.unreference();
+              entry.getHandler().handle(entry.getDomainSocket());
+              IOUtils.cleanup(LOG, entry.getDomainSocket());
+              iter.remove();
+            }
+            // Items in toRemove might not be really removed, handle it here
+            while (true) {
+              Map.Entry<Integer, DomainSocket> entry = toRemove.firstEntry();
+              if (entry == null)
+                break;
+              sendCallback("close", entries, fdSet, entry.getValue().fd);
+            }
+          }
+          processedCond.signalAll();
         } finally {
           lock.unlock();
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/246cefa0/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
index 596601b..82e6af5 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
@@ -212,7 +212,7 @@ done:
   free(carr);
   if (jthr) {
     (*env)->DeleteLocalRef(env, jarr);
-    jarr = NULL;
+    (*env)->Throw(env, jthr);
   }
   return jarr;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/246cefa0/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java
index 4b0e2a8..4cc86a7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import java.util.ArrayList;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -181,6 +182,80 @@ public class TestDomainSocketWatcher {
     watcher.close();
   }
 
+  @Test(timeout = 300000)
+  public void testStressInterruption() throws Exception {
+    final int SOCKET_NUM = 250;
+    final ReentrantLock lock = new ReentrantLock();
+    final DomainSocketWatcher watcher = newDomainSocketWatcher(10);
+    final ArrayList<DomainSocket[]> pairs = new ArrayList<DomainSocket[]>();
+    final AtomicInteger handled = new AtomicInteger(0);
+
+    final Thread adderThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          for (int i = 0; i < SOCKET_NUM; i++) {
+            DomainSocket pair[] = DomainSocket.socketpair();
+            watcher.add(pair[1], new DomainSocketWatcher.Handler() {
+              @Override
+              public boolean handle(DomainSocket sock) {
+                handled.incrementAndGet();
+                return true;
+              }
+            });
+            lock.lock();
+            try {
+              pairs.add(pair);
+            } finally {
+              lock.unlock();
+            }
+            TimeUnit.MILLISECONDS.sleep(1);
+          }
+        } catch (Throwable e) {
+          LOG.error(e);
+          throw new RuntimeException(e);
+        }
+      }
+    });
+
+    final Thread removerThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        final Random random = new Random();
+        try {
+          while (handled.get() != SOCKET_NUM) {
+            lock.lock();
+            try {
+              if (!pairs.isEmpty()) {
+                int idx = random.nextInt(pairs.size());
+                DomainSocket pair[] = pairs.remove(idx);
+                if (random.nextBoolean()) {
+                  pair[0].close();
+                } else {
+                  watcher.remove(pair[1]);
+                }
+                TimeUnit.MILLISECONDS.sleep(1);
+              }
+            } finally {
+              lock.unlock();
+            }
+          }
+        } catch (Throwable e) {
+          LOG.error(e);
+          throw new RuntimeException(e);
+        }
+      }
+    });
+
+    adderThread.start();
+    removerThread.start();
+    TimeUnit.MILLISECONDS.sleep(100);
+    watcher.watcherThread.interrupt();
+    Uninterruptibles.joinUninterruptibly(adderThread);
+    Uninterruptibles.joinUninterruptibly(removerThread);
+    Uninterruptibles.joinUninterruptibly(watcher.watcherThread);
+  }
+
   /**
    * Creates a new DomainSocketWatcher and tracks its thread for termination due
    * to an unexpected exception.  At the end of each test, if there was an