You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2018/09/04 18:59:37 UTC
[2/2] hadoop git commit: HADOOP-10219. ipc.Client.setupIOstreams()
needs to check for ClientCache.stopClient requested shutdowns. Contributed by
Kihwal Lee and Lukas Majercak.
HADOOP-10219. ipc.Client.setupIOstreams() needs to check for ClientCache.stopClient requested shutdowns.
Contributed by Kihwal Lee and Lukas Majercak.
(cherry picked from commit 9e96ac666d783376a8cdea9c3cc84098c5bdcb56)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6ed97eba
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6ed97eba
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6ed97eba
Branch: refs/heads/branch-2.9
Commit: 6ed97eba2237b16295e873beb99379e12f116a6e
Parents: 809faed
Author: Steve Loughran <st...@apache.org>
Authored: Tue Sep 4 16:46:12 2018 +0100
Committer: Inigo Goiri <in...@apache.org>
Committed: Tue Sep 4 11:59:23 2018 -0700
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/ipc/Client.java | 14 ++++++
.../java/org/apache/hadoop/ipc/TestIPC.java | 45 ++++++++++++++++++++
2 files changed, 59 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ed97eba/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 533b6ca..2636adb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -70,6 +70,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
@@ -440,6 +441,8 @@ public class Client implements AutoCloseable {
private final Object sendRpcRequestLock = new Object();
+ private AtomicReference<Thread> connectingThread = new AtomicReference<>();
+
public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
this.remoteId = remoteId;
this.server = remoteId.getAddress();
@@ -777,6 +780,7 @@ public class Client implements AutoCloseable {
}
}
try {
+ connectingThread.set(Thread.currentThread());
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+server);
}
@@ -862,6 +866,8 @@ public class Client implements AutoCloseable {
markClosed(new IOException("Couldn't set up IO streams: " + t, t));
}
close();
+ } finally {
+ connectingThread.set(null);
}
}
@@ -1215,6 +1221,13 @@ public class Client implements AutoCloseable {
notifyAll();
}
}
+
+ private void interruptConnectingThread() {
+ Thread connThread = connectingThread.get();
+ if (connThread != null) {
+ connThread.interrupt();
+ }
+ }
/** Close the connection. */
private synchronized void close() {
@@ -1317,6 +1330,7 @@ public class Client implements AutoCloseable {
// wake up all connections
for (Connection conn : connections.values()) {
conn.interrupt();
+ conn.interruptConnectingThread();
}
// wait until all connections are closed
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ed97eba/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index a6c57fe..95e76f7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -1398,6 +1399,50 @@ public class TestIPC {
assertEquals(Client.getTimeout(config), -1);
}
+ @Test(timeout=60000)
+ public void testSetupConnectionShouldNotBlockShutdown() throws Exception {
+ // Start server
+ SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
+ Server server = new TestServer(1, true);
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+
+ // Track how many times we retried to set up the connection
+ final AtomicInteger createSocketCalled = new AtomicInteger();
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ createSocketCalled.addAndGet(1);
+ Thread.sleep(MIN_SLEEP_TIME * 5);
+ throw new ConnectTimeoutException("fake");
+ }
+ }).when(mockFactory).createSocket();
+ final Client client = new Client(LongWritable.class, conf, mockFactory);
+
+ final AtomicBoolean callStarted = new AtomicBoolean(false);
+
+ // Call a random function asynchronously so that we can call stop()
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ callStarted.set(true);
+ call(client, RANDOM.nextLong(), addr, conf);
+ } catch (IOException ignored) {}
+ }
+ }).start();
+
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return callStarted.get() && createSocketCalled.get() == 1;
+ }
+ }, 50, 60000);
+
+ // stop() should stop the client immediately without any more retries
+ client.stop();
+ assertEquals(1, createSocketCalled.get());
+ }
+
private void assertRetriesOnSocketTimeouts(Configuration conf,
int maxTimeoutRetries) throws IOException {
SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org