You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2011/02/15 08:18:45 UTC

svn commit: r1070791 - in /hadoop/common/branches/branch-0.22: CHANGES.txt src/java/org/apache/hadoop/ipc/Server.java src/test/core/org/apache/hadoop/ipc/TestRPC.java

Author: todd
Date: Tue Feb 15 07:18:44 2011
New Revision: 1070791

URL: http://svn.apache.org/viewvc?rev=1070791&view=rev
Log:
HADOOP-7140. IPC Reader threads do not stop when server stops. Contributed by Todd Lipcon.

Modified:
    hadoop/common/branches/branch-0.22/CHANGES.txt
    hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestRPC.java

Modified: hadoop/common/branches/branch-0.22/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/CHANGES.txt?rev=1070791&r1=1070790&r2=1070791&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.22/CHANGES.txt Tue Feb 15 07:18:44 2011
@@ -416,6 +416,8 @@ Release 0.22.0 - Unreleased
     HADOOP-6642. Fix javac, javadoc, findbugs warnings related to security work. 
     (Chris Douglas, Po Cheung via shv)
 
+    HADOOP-7140. IPC Reader threads do not stop when server stops (todd)
+
 Release 0.21.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java?rev=1070791&r1=1070790&r2=1070791&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java Tue Feb 15 07:18:44 2011
@@ -299,7 +299,6 @@ public abstract class Server {
     private long cleanupInterval = 10000; //the minimum interval between 
                                           //two cleanup runs
     private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
-    private ExecutorService readPool;
     
     public Listener() throws IOException {
       address = new InetSocketAddress(bindAddress, port);
@@ -313,12 +312,12 @@ public abstract class Server {
       // create a selector;
       selector= Selector.open();
       readers = new Reader[readThreads];
-      readPool = Executors.newFixedThreadPool(readThreads);
       for (int i = 0; i < readThreads; i++) {
         Selector readSelector = Selector.open();
-        Reader reader = new Reader(readSelector);
+        Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port " + port,
+                                   readSelector);
         readers[i] = reader;
-        readPool.execute(reader);
+        reader.start();
       }
 
       // Register accepts on the server socket with the selector.
@@ -327,15 +326,16 @@ public abstract class Server {
       this.setDaemon(true);
     }
     
-    private class Reader implements Runnable {
+    private class Reader extends Thread {
       private volatile boolean adding = false;
       private Selector readSelector = null;
 
-      Reader(Selector readSelector) {
+      Reader(String name, Selector readSelector) {
+        super(name);
         this.readSelector = readSelector;
       }
       public void run() {
-        LOG.info("Starting SocketReader");
+        LOG.info("Starting " + getName());
         synchronized (this) {
           while (running) {
             SelectionKey key = null;
@@ -389,6 +389,16 @@ public abstract class Server {
         adding = false;
         this.notify();        
       }
+
+      void shutdown() {
+        assert !running;
+        readSelector.wakeup();
+        try {
+          join();
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        }
+      }
     }
     /** cleanup connections from connectionList. Choose a random range
      * to scan and also have a limit on the number of the connections
@@ -577,7 +587,9 @@ public abstract class Server {
           LOG.info(getName() + ":Exception in closing listener socket. " + e);
         }
       }
-      readPool.shutdown();
+      for (Reader r : readers) {
+        r.shutdown();
+      }
     }
     
     synchronized Selector getSelector() { return selector; }

Modified: hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestRPC.java?rev=1070791&r1=1070790&r2=1070791&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestRPC.java Tue Feb 15 07:18:44 2011
@@ -21,6 +21,9 @@ package org.apache.hadoop.ipc;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
 import java.lang.reflect.Method;
 
 import junit.framework.TestCase;
@@ -506,6 +509,50 @@ public class TestRPC extends TestCase {
     }
     assertTrue(succeeded);
   }
+
+  /**
+   * Count the number of threads that have a stack frame containing
+   * the given string
+   */
+  private static int countThreads(String search) {
+    ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+
+    int count = 0;
+    ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20);
+    for (ThreadInfo info : infos) {
+      if (info == null) continue;
+      for (StackTraceElement elem : info.getStackTrace()) {
+        if (elem.getClassName().contains(search)) {
+          count++;
+          break;
+        }
+      }
+    }
+    return count;
+  }
+
+
+  /**
+   * Test that server.stop() properly stops all threads
+   */
+  public void testStopsAllThreads() throws Exception {
+    int threadsBefore = countThreads("Server$Listener$Reader");
+    assertEquals("Expect no Reader threads running before test",
+      0, threadsBefore);
+
+    final Server server = RPC.getServer(TestProtocol.class,
+        new TestImpl(), ADDRESS, 0, 5, true, conf, null);
+    server.start();
+    try {
+      int threadsRunning = countThreads("Server$Listener$Reader");
+      assertTrue(threadsRunning > 0);
+    } finally {
+      server.stop();
+    }
+    int threadsAfter = countThreads("Server$Listener$Reader");
+    assertEquals("Expect no Reader threads left running after test",
+      0, threadsAfter);
+  }
   
   public static void main(String[] args) throws Exception {