You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/05/28 20:53:06 UTC
hadoop git commit: HDFS-8429. Avoid stuck threads if there is an
error in DomainSocketWatcher that stops the thread. (zhouyingchao via
cmccabe)
Repository: hadoop
Updated Branches:
refs/heads/trunk d2d95bfe8 -> 246cefa08
HDFS-8429. Avoid stuck threads if there is an error in DomainSocketWatcher that stops the thread. (zhouyingchao via cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/246cefa0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/246cefa0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/246cefa0
Branch: refs/heads/trunk
Commit: 246cefa089156a50bf086b8b1e4d4324d66dc58c
Parents: d2d95bf
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Thu May 28 11:52:28 2015 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Thu May 28 11:52:28 2015 -0700
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../hadoop/net/unix/DomainSocketWatcher.java | 21 +++++-
.../hadoop/net/unix/DomainSocketWatcher.c | 2 +-
.../net/unix/TestDomainSocketWatcher.java | 75 ++++++++++++++++++++
4 files changed, 99 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/246cefa0/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 69f5aa7..51eff78 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -782,6 +782,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12035. shellcheck plugin displays a wrong version potentially
(Kengo Seki via aw)
+ HDFS-8429. Avoid stuck threads if there is an error in DomainSocketWatcher
+ that stops the thread. (zhouyingchao via cmccabe)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/246cefa0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
index 5648ae1..ad2fbfb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
@@ -470,6 +470,7 @@ public final class DomainSocketWatcher implements Closeable {
// Handle pending additions (before pending removes).
for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext(); ) {
Entry entry = iter.next();
+ iter.remove();
DomainSocket sock = entry.getDomainSocket();
Entry prevEntry = entries.put(sock.fd, entry);
Preconditions.checkState(prevEntry == null,
@@ -479,7 +480,6 @@ public final class DomainSocketWatcher implements Closeable {
LOG.trace(this + ": adding fd " + sock.fd);
}
fdSet.add(sock.fd);
- iter.remove();
}
// Handle pending removals
while (true) {
@@ -525,6 +525,25 @@ public final class DomainSocketWatcher implements Closeable {
}
entries.clear();
fdSet.close();
+ closed = true;
+ if (!(toAdd.isEmpty() && toRemove.isEmpty())) {
+ // Items in toAdd might not be added to entries, handle it here
+ for (Iterator<Entry> iter = toAdd.iterator(); iter.hasNext();) {
+ Entry entry = iter.next();
+ entry.getDomainSocket().refCount.unreference();
+ entry.getHandler().handle(entry.getDomainSocket());
+ IOUtils.cleanup(LOG, entry.getDomainSocket());
+ iter.remove();
+ }
+ // Items in toRemove might not be really removed, handle it here
+ while (true) {
+ Map.Entry<Integer, DomainSocket> entry = toRemove.firstEntry();
+ if (entry == null)
+ break;
+ sendCallback("close", entries, fdSet, entry.getValue().fd);
+ }
+ }
+ processedCond.signalAll();
} finally {
lock.unlock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/246cefa0/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
index 596601b..82e6af5 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
@@ -212,7 +212,7 @@ done:
free(carr);
if (jthr) {
(*env)->DeleteLocalRef(env, jarr);
- jarr = NULL;
+ (*env)->Throw(env, jthr);
}
return jarr;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/246cefa0/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java
index 4b0e2a8..4cc86a7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocketWatcher.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@@ -181,6 +182,80 @@ public class TestDomainSocketWatcher {
watcher.close();
}
+ @Test(timeout = 300000)
+ public void testStressInterruption() throws Exception {
+ final int SOCKET_NUM = 250;
+ final ReentrantLock lock = new ReentrantLock();
+ final DomainSocketWatcher watcher = newDomainSocketWatcher(10);
+ final ArrayList<DomainSocket[]> pairs = new ArrayList<DomainSocket[]>();
+ final AtomicInteger handled = new AtomicInteger(0);
+
+ final Thread adderThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < SOCKET_NUM; i++) {
+ DomainSocket pair[] = DomainSocket.socketpair();
+ watcher.add(pair[1], new DomainSocketWatcher.Handler() {
+ @Override
+ public boolean handle(DomainSocket sock) {
+ handled.incrementAndGet();
+ return true;
+ }
+ });
+ lock.lock();
+ try {
+ pairs.add(pair);
+ } finally {
+ lock.unlock();
+ }
+ TimeUnit.MILLISECONDS.sleep(1);
+ }
+ } catch (Throwable e) {
+ LOG.error(e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ final Thread removerThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ final Random random = new Random();
+ try {
+ while (handled.get() != SOCKET_NUM) {
+ lock.lock();
+ try {
+ if (!pairs.isEmpty()) {
+ int idx = random.nextInt(pairs.size());
+ DomainSocket pair[] = pairs.remove(idx);
+ if (random.nextBoolean()) {
+ pair[0].close();
+ } else {
+ watcher.remove(pair[1]);
+ }
+ TimeUnit.MILLISECONDS.sleep(1);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error(e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ adderThread.start();
+ removerThread.start();
+ TimeUnit.MILLISECONDS.sleep(100);
+ watcher.watcherThread.interrupt();
+ Uninterruptibles.joinUninterruptibly(adderThread);
+ Uninterruptibles.joinUninterruptibly(removerThread);
+ Uninterruptibles.joinUninterruptibly(watcher.watcherThread);
+ }
+
/**
* Creates a new DomainSocketWatcher and tracks its thread for termination due
* to an unexpected exception. At the end of each test, if there was an