You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by iv...@apache.org on 2013/06/21 07:03:11 UTC
svn commit: r1495293 - in /hadoop/common/branches/branch-1: CHANGES.txt
src/core/org/apache/hadoop/ipc/Server.java
src/test/org/apache/hadoop/ipc/TestRPC.java
Author: ivanmi
Date: Fri Jun 21 05:03:10 2013
New Revision: 1495293
URL: http://svn.apache.org/r1495293
Log:
HADOOP-7140. IPC Reader threads do not stop when server stops (Backported by Ivan Mitic).
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Server.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/ipc/TestRPC.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1495293&r1=1495292&r2=1495293&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Fri Jun 21 05:03:10 2013
@@ -62,6 +62,9 @@ Release 1.3.0 - unreleased
HADOOP-9624. TestFSMainOperationsLocalFileSystem failed when the Hadoop test
root path has "X" in its name. (Xi Fang via cnauroth)
+ HADOOP-7140. IPC Reader threads do not stop when server stops
+ (Todd Lipcon, backported by ivanmi)
+
Release 1.2.1 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Server.java?rev=1495293&r1=1495292&r2=1495293&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Server.java Fri Jun 21 05:03:10 2013
@@ -329,7 +329,6 @@ public abstract class Server {
private long cleanupInterval = 10000; //the minimum interval between
//two cleanup runs
private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
- private ExecutorService readPool;
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
@@ -343,12 +342,12 @@ public abstract class Server {
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
- readPool = Executors.newFixedThreadPool(readThreads);
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();
- Reader reader = new Reader(readSelector);
+ Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port " + port,
+ readSelector);
readers[i] = reader;
- readPool.execute(reader);
+ reader.start();
}
// Register accepts on the server socket with the selector.
@@ -357,15 +356,16 @@ public abstract class Server {
this.setDaemon(true);
}
- private class Reader implements Runnable {
+ private class Reader extends Thread {
private volatile boolean adding = false;
private Selector readSelector = null;
- Reader(Selector readSelector) {
+ Reader(String name, Selector readSelector) {
+ super(name);
this.readSelector = readSelector;
}
public void run() {
- LOG.info("Starting SocketReader");
+ LOG.info("Starting " + getName());
synchronized (this) {
while (running) {
SelectionKey key = null;
@@ -419,6 +419,16 @@ public abstract class Server {
adding = false;
this.notify();
}
+
+ void shutdown() {
+ assert !running;
+ readSelector.wakeup();
+ try {
+ join();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
/** cleanup connections from connectionList. Choose a random range
@@ -607,7 +617,9 @@ public abstract class Server {
LOG.info(getName() + ":Exception in closing listener socket. " + e);
}
}
- readPool.shutdown();
+ for (Reader r : readers) {
+ r.shutdown();
+ }
}
// The method that will return the next reader to work with
Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=1495293&r1=1495292&r2=1495293&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/ipc/TestRPC.java Fri Jun 21 05:03:10 2013
@@ -23,6 +23,9 @@ import static org.apache.hadoop.test.Met
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
@@ -410,6 +413,50 @@ public class TestRPC extends TestCase {
assertCounter("rpcAuthenticationSuccesses", 0, rb);
}
}
+
+ /**
+ * Count the number of threads that have a stack frame containing
+ * the given string
+ */
+ private static int countThreads(String search) {
+ ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+
+ int count = 0;
+ ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20);
+ for (ThreadInfo info : infos) {
+ if (info == null) continue;
+ for (StackTraceElement elem : info.getStackTrace()) {
+ if (elem.getClassName().contains(search)) {
+ count++;
+ break;
+ }
+ }
+ }
+ return count;
+ }
+
+
+ /**
+ * Test that server.stop() properly stops all threads
+ */
+ public void testStopsAllThreads() throws Exception {
+ int threadsBefore = countThreads("Server$Listener$Reader");
+ assertEquals("Expect no Reader threads running before test",
+ 0, threadsBefore);
+
+ final Server server = RPC.getServer(new TestImpl(), ADDRESS,
+ 0, 5, true, conf);
+ server.start();
+ try {
+ int threadsRunning = countThreads("Server$Listener$Reader");
+ assertTrue(threadsRunning > 0);
+ } finally {
+ server.stop();
+ }
+ int threadsAfter = countThreads("Server$Listener$Reader");
+ assertEquals("Expect no Reader threads left running after test",
+ 0, threadsAfter);
+ }
public void testAuthorization() throws Exception {
Configuration conf = new Configuration();