You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by iv...@apache.org on 2013/06/21 07:03:11 UTC

svn commit: r1495293 - in /hadoop/common/branches/branch-1: CHANGES.txt src/core/org/apache/hadoop/ipc/Server.java src/test/org/apache/hadoop/ipc/TestRPC.java

Author: ivanmi
Date: Fri Jun 21 05:03:10 2013
New Revision: 1495293

URL: http://svn.apache.org/r1495293
Log:
HADOOP-7140. IPC Reader threads do not stop when server stops (Backported by Ivan Mitic).

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/ipc/TestRPC.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1495293&r1=1495292&r2=1495293&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Fri Jun 21 05:03:10 2013
@@ -62,6 +62,9 @@ Release 1.3.0 - unreleased
     HADOOP-9624. TestFSMainOperationsLocalFileSystem failed when the Hadoop test
     root path has "X" in its name. (Xi Fang via cnauroth)
 
+    HADOOP-7140. IPC Reader threads do not stop when server stops
+    (Todd Lipcon, backported by ivanmi)
+
 Release 1.2.1 - Unreleased 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Server.java?rev=1495293&r1=1495292&r2=1495293&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Server.java Fri Jun 21 05:03:10 2013
@@ -329,7 +329,6 @@ public abstract class Server {
     private long cleanupInterval = 10000; //the minimum interval between 
                                           //two cleanup runs
     private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
-    private ExecutorService readPool; 
    
     public Listener() throws IOException {
       address = new InetSocketAddress(bindAddress, port);
@@ -343,12 +342,12 @@ public abstract class Server {
       // create a selector;
       selector= Selector.open();
       readers = new Reader[readThreads];
-      readPool = Executors.newFixedThreadPool(readThreads);
       for (int i = 0; i < readThreads; i++) {
         Selector readSelector = Selector.open();
-        Reader reader = new Reader(readSelector);
+        Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port " + port,
+                                   readSelector);
         readers[i] = reader;
-        readPool.execute(reader);
+        reader.start();
       }
 
       // Register accepts on the server socket with the selector.
@@ -357,15 +356,16 @@ public abstract class Server {
       this.setDaemon(true);
     }
     
-    private class Reader implements Runnable {
+    private class Reader extends Thread {
       private volatile boolean adding = false;
       private Selector readSelector = null;
 
-      Reader(Selector readSelector) {
+      Reader(String name, Selector readSelector) {
+        super(name);
         this.readSelector = readSelector;
       }
       public void run() {
-        LOG.info("Starting SocketReader");
+        LOG.info("Starting " + getName());
         synchronized (this) {
           while (running) {
             SelectionKey key = null;
@@ -419,6 +419,16 @@ public abstract class Server {
         adding = false;
         this.notify();        
       }
+
+      void shutdown() {
+        assert !running;
+        readSelector.wakeup();
+        try {
+          join();
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        }
+      }
     }
 
     /** cleanup connections from connectionList. Choose a random range
@@ -607,7 +617,9 @@ public abstract class Server {
           LOG.info(getName() + ":Exception in closing listener socket. " + e);
         }
       }
-      readPool.shutdown();
+      for (Reader r : readers) {
+        r.shutdown();
+      }
     }
 
     // The method that will return the next reader to work with

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=1495293&r1=1495292&r2=1495293&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/ipc/TestRPC.java Fri Jun 21 05:03:10 2013
@@ -23,6 +23,9 @@ import static org.apache.hadoop.test.Met
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
 import java.lang.reflect.Method;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
@@ -410,6 +413,50 @@ public class TestRPC extends TestCase {
       assertCounter("rpcAuthenticationSuccesses", 0, rb);
     }
   }
+
+  /**
+   * Count the number of threads that have a stack frame containing
+   * the given string
+   */
+  private static int countThreads(String search) {
+    ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+
+    int count = 0;
+    ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20);
+    for (ThreadInfo info : infos) {
+      if (info == null) continue;
+      for (StackTraceElement elem : info.getStackTrace()) {
+        if (elem.getClassName().contains(search)) {
+          count++;
+          break;
+        }
+      }
+    }
+    return count;
+  }
+
+
+  /**
+   * Test that server.stop() properly stops all threads
+   */
+  public void testStopsAllThreads() throws Exception {
+    int threadsBefore = countThreads("Server$Listener$Reader");
+    assertEquals("Expect no Reader threads running before test",
+      0, threadsBefore);
+
+    final Server server = RPC.getServer(new TestImpl(), ADDRESS,
+        0, 5, true, conf);
+    server.start();
+    try {
+      int threadsRunning = countThreads("Server$Listener$Reader");
+      assertTrue(threadsRunning > 0);
+    } finally {
+      server.stop();
+    }
+    int threadsAfter = countThreads("Server$Listener$Reader");
+    assertEquals("Expect no Reader threads left running after test",
+      0, threadsAfter);
+  }
   
   public void testAuthorization() throws Exception {
     Configuration conf = new Configuration();