You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/09/01 03:15:03 UTC

hadoop git commit: HADOOP-11295. RPC Server Reader thread can't shutdown if RPCCallQueue is full. Contributed by Ming Ma.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6.1 95edb6e64 -> 4ec7b6174


HADOOP-11295. RPC Server Reader thread can't shutdown if RPCCallQueue is full. Contributed by Ming Ma.

(cherry picked from commit 685af8a3d0504724fe588daf3722519fedc45b01)
(cherry picked from commit 6c01e586198a3c3ebaa7561778c124ae62553246)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ec7b617
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ec7b617
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ec7b617

Branch: refs/heads/branch-2.6.1
Commit: 4ec7b6174df4db30eb0d7354cd5ad0f40ab874dd
Parents: 95edb6e
Author: Kihwal Lee <ki...@apache.org>
Authored: Tue Feb 17 17:14:58 2015 -0600
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Mon Aug 31 18:10:00 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 +
 .../main/java/org/apache/hadoop/ipc/Server.java |  3 +-
 .../java/org/apache/hadoop/ipc/TestRPC.java     | 68 ++++++++++++++++++++
 3 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec7b617/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 6e56370..5891feb 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -39,6 +39,9 @@ Release 2.6.1 - UNRELEASED
     HADOOP-11482. Use correct UGI when KMSClientProvider is called by a proxy
     user. Contributed by Arun Suresh.
 
+    HADOOP-11295. RPC Server Reader thread can't shutdown if RPCCallQueue is
+    full. (Ming Ma via kihwal)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec7b617/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 127f22b..43d76a1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -662,7 +662,8 @@ public abstract class Server {
         assert !running;
         readSelector.wakeup();
         try {
-          join();
+          super.interrupt();
+          super.join();
         } catch (InterruptedException ie) {
           Thread.currentThread().interrupt();
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec7b617/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index f1855f62..8a4dcb6 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -38,9 +38,16 @@ import java.lang.reflect.Proxy;
 import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -1009,6 +1016,67 @@ public class TestRPC {
     }
   }
 
+  /**
+   *  Verify the RPC server can shutdown properly when callQueue is full.
+   */
+  @Test (timeout=30000)
+  public void testRPCServerShutdown() throws Exception {
+    final int numClients = 3;
+    final List<Future<Void>> res = new ArrayList<Future<Void>>();
+    final ExecutorService executorService =
+        Executors.newFixedThreadPool(numClients);
+    final Configuration conf = new Configuration();
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    final Server server = new RPC.Builder(conf)
+        .setProtocol(TestProtocol.class).setInstance(new TestImpl())
+        .setBindAddress(ADDRESS).setPort(0)
+        .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
+        .build();
+    server.start();
+
+    final TestProtocol proxy =
+        RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
+            NetUtils.getConnectAddress(server), conf);
+    try {
+      // start a sleep RPC call to consume the only handler thread.
+      // Start another sleep RPC call to make callQueue full.
+      // Start another sleep RPC call to make reader thread block on CallQueue.
+      for (int i = 0; i < numClients; i++) {
+        res.add(executorService.submit(
+            new Callable<Void>() {
+              @Override
+              public Void call() throws IOException, InterruptedException {
+                proxy.sleep(100000);
+                return null;
+              }
+            }));
+      }
+      while (server.getCallQueueLen() != 1
+          && countThreads(CallQueueManager.class.getName()) != 1
+          && countThreads(TestProtocol.class.getName()) != 1) {
+        Thread.sleep(100);
+      }
+    } finally {
+      try {
+        server.stop();
+        assertEquals("Not enough clients", numClients, res.size());
+        for (Future<Void> f : res) {
+          try {
+            f.get();
+            fail("Future get should not return");
+          } catch (ExecutionException e) {
+            assertTrue("Unexpected exception: " + e,
+                e.getCause() instanceof IOException);
+            LOG.info("Expected exception", e.getCause());
+          }
+        }
+      } finally {
+        RPC.stopProxy(proxy);
+        executorService.shutdown();
+      }
+    }
+  }
+
   public static void main(String[] args) throws IOException {
     new TestRPC().testCallsInternal(conf);