You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/01/04 20:30:01 UTC

git commit: FLUME-1820. Should not be possible for RPC client to block indefinitely on close()

Updated Branches:
  refs/heads/trunk 58173b898 -> 21c67ed59


FLUME-1820. Should not be possible for RPC client to block indefinitely on close()

(Mike Percy via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/21c67ed5
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/21c67ed5
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/21c67ed5

Branch: refs/heads/trunk
Commit: 21c67ed590ff414c7bc1d821ca91cd7a9391468b
Parents: 58173b8
Author: Hari Shreedharan <ha...@gmail.com>
Authored: Fri Jan 4 11:28:29 2013 -0800
Committer: Hari Shreedharan <ha...@gmail.com>
Committed: Fri Jan 4 11:28:29 2013 -0800

----------------------------------------------------------------------
 .../org/apache/flume/api/NettyAvroRpcClient.java   |   23 +++++++++-----
 1 files changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/21c67ed5/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
index bd116ff..cf9724c 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -124,17 +124,24 @@ implements RpcClient {
   public void close() throws FlumeException {
     if (callTimeoutPool != null) {
       callTimeoutPool.shutdown();
-      while (!callTimeoutPool.isTerminated()) {
-        try {
-          callTimeoutPool.awaitTermination(requestTimeout,
-              TimeUnit.MILLISECONDS);
-        } catch (InterruptedException ex) {
-          logger.warn(this + ": Interrupted during close", ex);
+      try {
+        if (!callTimeoutPool.awaitTermination(requestTimeout,
+            TimeUnit.MILLISECONDS)) {
           callTimeoutPool.shutdownNow();
-          Thread.currentThread().interrupt();
-          break;
+          if (!callTimeoutPool.awaitTermination(requestTimeout,
+              TimeUnit.MILLISECONDS)) {
+            logger.warn(this + ": Unable to cleanly shut down call timeout " +
+                "pool");
+          }
         }
+      } catch (InterruptedException ex) {
+        logger.warn(this + ": Interrupted during close", ex);
+        // re-cancel if current thread also interrupted
+        callTimeoutPool.shutdownNow();
+        // preserve interrupt status
+        Thread.currentThread().interrupt();
       }
+
       callTimeoutPool = null;
     }
     try {