You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2015/09/07 17:46:35 UTC

[33/50] [abbrv] hadoop git commit: HADOOP-12213. Interrupted exception can occur when Client#stop is called. Contributed by Kuhu Shukla.

HADOOP-12213. Interrupted exception can occur when Client#stop is called. Contributed by Kuhu Shukla.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0ebc6581
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0ebc6581
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0ebc6581

Branch: refs/heads/YARN-3926
Commit: 0ebc658105336cfe3e1a248b411de60f1d380928
Parents: 355eaaa
Author: Tsuyoshi Ozawa <oz...@apache.org>
Authored: Thu Sep 3 23:32:42 2015 +0900
Committer: Tsuyoshi Ozawa <oz...@apache.org>
Committed: Thu Sep 3 23:32:42 2015 +0900

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 ++
 .../main/java/org/apache/hadoop/ipc/Client.java |  9 +++--
 .../java/org/apache/hadoop/ipc/TestIPC.java     | 38 +++++++++++++++++++-
 3 files changed, 47 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ebc6581/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 20474be..512ca1b 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -1132,6 +1132,9 @@ Release 2.7.2 - UNRELEASED
     HADOOP-10365. BufferedOutputStream in FileUtil#unpackEntries() should be
     closed in finally block. (Kiran Kumar M R and Sanghyun Yun via ozawa)
 
+    HADOOP-12213. Interrupted exception can occur when Client#stop is called.
+    (Kuhu Shukla via ozawa)
+
 Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ebc6581/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 6996a51..9087e5c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -197,9 +197,10 @@ public class Client {
             clientExecutor.shutdownNow();
           }
         } catch (InterruptedException e) {
-          LOG.error("Interrupted while waiting for clientExecutor" +
-              "to stop", e);
+          LOG.warn("Interrupted while waiting for clientExecutor" +
+              " to stop");
           clientExecutor.shutdownNow();
+          Thread.currentThread().interrupt();
         }
         clientExecutor = null;
       }
@@ -256,6 +257,10 @@ public class Client {
     conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, timeout);
   }
 
+  @VisibleForTesting
+  public static final ExecutorService getClientExecutor() {
+    return Client.clientExcecutorFactory.clientExecutor;
+  }
   /**
    * Increment this client's reference count
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ebc6581/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index 08508ae..4e2e2f1 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -304,6 +304,8 @@ public class TestIPC {
       String causeText=cause.getMessage();
       assertTrue("Did not find " + causeText + " in " + message,
               message.contains(causeText));
+    } finally {
+      client.stop();
     }
   }
   
@@ -416,6 +418,7 @@ public class TestIPC {
       client.call(param, addr, null, null, 0, conf);
       
     } finally {
+      client.stop();
       server.stop();
     }
   }
@@ -531,6 +534,8 @@ public class TestIPC {
       fail("Expected an exception to have been thrown");
     } catch (IOException e) {
       assertTrue(e.getMessage().contains("Injected fault"));
+    } finally {
+      client.stop();
     }
   }
 
@@ -556,11 +561,11 @@ public class TestIPC {
     }).when(spyFactory).createSocket();
       
     Server server = new TestServer(1, true);
+    Client client = new Client(LongWritable.class, conf, spyFactory);
     server.start();
     try {
       // Call should fail due to injected exception.
       InetSocketAddress address = NetUtils.getConnectAddress(server);
-      Client client = new Client(LongWritable.class, conf, spyFactory);
       try {
         client.call(new LongWritable(RANDOM.nextLong()),
                 address, null, null, 0, conf);
@@ -577,6 +582,7 @@ public class TestIPC {
       client.call(new LongWritable(RANDOM.nextLong()),
           address, null, null, 0, conf);
     } finally {
+      client.stop();
       server.stop();
     }
   }
@@ -601,6 +607,7 @@ public class TestIPC {
     // set timeout to be bigger than 3*ping interval
     client.call(new LongWritable(RANDOM.nextLong()),
         addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
+    client.stop();
   }
 
   @Test(timeout=60000)
@@ -621,6 +628,7 @@ public class TestIPC {
     } catch (SocketTimeoutException e) {
       LOG.info("Get a SocketTimeoutException ", e);
     }
+    client.stop();
   }
   
   /**
@@ -851,6 +859,8 @@ public class TestIPC {
             } catch (IOException e) {
               LOG.error(e);
             } catch (InterruptedException e) {
+            } finally {
+              client.stop();
             }
           }
         });
@@ -952,6 +962,31 @@ public class TestIPC {
         endFds - startFds < 20);
   }
   
+  /**
+   * Check if Client is interrupted after handling
+   * InterruptedException during cleanup
+   */
+  @Test(timeout=30000)
+  public void testInterrupted() {
+    Client client = new Client(LongWritable.class, conf);
+    client.getClientExecutor().submit(new Runnable() {
+      public void run() {
+        while(true);
+      }
+    });
+    Thread.currentThread().interrupt();
+    client.stop();
+    try {
+      assertTrue(Thread.currentThread().isInterrupted());
+      LOG.info("Expected thread interrupt during client cleanup");
+    } catch (AssertionError e) {
+      LOG.error("The Client did not interrupt after handling an Interrupted Exception");
+      Assert.fail("The Client did not interrupt after handling an Interrupted Exception");
+    }
+    // Clear Thread interrupt
+    Thread.currentThread().interrupted();
+  }
+
   private long countOpenFileDescriptors() {
     return FD_DIR.list().length;
   }
@@ -1315,6 +1350,7 @@ public class TestIPC {
       Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries))
           .createSocket();
     }
+    client.stop();
   }
   
   private void doIpcVersionTest(