You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2015/09/07 17:46:35 UTC
[33/50] [abbrv] hadoop git commit: HADOOP-12213. Interrupted
exception can occur when Client#stop is called. Contributed by Kuhu Shukla.
HADOOP-12213. Interrupted exception can occur when Client#stop is called. Contributed by Kuhu Shukla.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0ebc6581
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0ebc6581
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0ebc6581
Branch: refs/heads/YARN-3926
Commit: 0ebc658105336cfe3e1a248b411de60f1d380928
Parents: 355eaaa
Author: Tsuyoshi Ozawa <oz...@apache.org>
Authored: Thu Sep 3 23:32:42 2015 +0900
Committer: Tsuyoshi Ozawa <oz...@apache.org>
Committed: Thu Sep 3 23:32:42 2015 +0900
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 ++
.../main/java/org/apache/hadoop/ipc/Client.java | 9 +++--
.../java/org/apache/hadoop/ipc/TestIPC.java | 38 +++++++++++++++++++-
3 files changed, 47 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ebc6581/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 20474be..512ca1b 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -1132,6 +1132,9 @@ Release 2.7.2 - UNRELEASED
HADOOP-10365. BufferedOutputStream in FileUtil#unpackEntries() should be
closed in finally block. (Kiran Kumar M R and Sanghyun Yun via ozawa)
+ HADOOP-12213. Interrupted exception can occur when Client#stop is called.
+ (Kuhu Shukla via ozawa)
+
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ebc6581/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 6996a51..9087e5c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -197,9 +197,10 @@ public class Client {
clientExecutor.shutdownNow();
}
} catch (InterruptedException e) {
- LOG.error("Interrupted while waiting for clientExecutor" +
- "to stop", e);
+ LOG.warn("Interrupted while waiting for clientExecutor" +
+ " to stop");
clientExecutor.shutdownNow();
+ Thread.currentThread().interrupt();
}
clientExecutor = null;
}
@@ -256,6 +257,10 @@ public class Client {
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, timeout);
}
+ @VisibleForTesting
+ public static final ExecutorService getClientExecutor() {
+ return Client.clientExcecutorFactory.clientExecutor;
+ }
/**
* Increment this client's reference count
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ebc6581/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index 08508ae..4e2e2f1 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -304,6 +304,8 @@ public class TestIPC {
String causeText=cause.getMessage();
assertTrue("Did not find " + causeText + " in " + message,
message.contains(causeText));
+ } finally {
+ client.stop();
}
}
@@ -416,6 +418,7 @@ public class TestIPC {
client.call(param, addr, null, null, 0, conf);
} finally {
+ client.stop();
server.stop();
}
}
@@ -531,6 +534,8 @@ public class TestIPC {
fail("Expected an exception to have been thrown");
} catch (IOException e) {
assertTrue(e.getMessage().contains("Injected fault"));
+ } finally {
+ client.stop();
}
}
@@ -556,11 +561,11 @@ public class TestIPC {
}).when(spyFactory).createSocket();
Server server = new TestServer(1, true);
+ Client client = new Client(LongWritable.class, conf, spyFactory);
server.start();
try {
// Call should fail due to injected exception.
InetSocketAddress address = NetUtils.getConnectAddress(server);
- Client client = new Client(LongWritable.class, conf, spyFactory);
try {
client.call(new LongWritable(RANDOM.nextLong()),
address, null, null, 0, conf);
@@ -577,6 +582,7 @@ public class TestIPC {
client.call(new LongWritable(RANDOM.nextLong()),
address, null, null, 0, conf);
} finally {
+ client.stop();
server.stop();
}
}
@@ -601,6 +607,7 @@ public class TestIPC {
// set timeout to be bigger than 3*ping interval
client.call(new LongWritable(RANDOM.nextLong()),
addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
+ client.stop();
}
@Test(timeout=60000)
@@ -621,6 +628,7 @@ public class TestIPC {
} catch (SocketTimeoutException e) {
LOG.info("Get a SocketTimeoutException ", e);
}
+ client.stop();
}
/**
@@ -851,6 +859,8 @@ public class TestIPC {
} catch (IOException e) {
LOG.error(e);
} catch (InterruptedException e) {
+ } finally {
+ client.stop();
}
}
});
@@ -952,6 +962,31 @@ public class TestIPC {
endFds - startFds < 20);
}
+ /**
+ * Check if Client is interrupted after handling
+ * InterruptedException during cleanup
+ */
+ @Test(timeout=30000)
+ public void testInterrupted() {
+ Client client = new Client(LongWritable.class, conf);
+ client.getClientExecutor().submit(new Runnable() {
+ public void run() {
+ while(true);
+ }
+ });
+ Thread.currentThread().interrupt();
+ client.stop();
+ try {
+ assertTrue(Thread.currentThread().isInterrupted());
+ LOG.info("Expected thread interrupt during client cleanup");
+ } catch (AssertionError e) {
+ LOG.error("The Client did not interrupt after handling an Interrupted Exception");
+ Assert.fail("The Client did not interrupt after handling an Interrupted Exception");
+ }
+ // Clear Thread interrupt
+ Thread.currentThread().interrupted();
+ }
+
private long countOpenFileDescriptors() {
return FD_DIR.list().length;
}
@@ -1315,6 +1350,7 @@ public class TestIPC {
Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries))
.createSocket();
}
+ client.stop();
}
private void doIpcVersionTest(