You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2015/01/26 23:05:18 UTC
[2/2] hadoop git commit: HADOOP-6221 RPC Client operations cannot be
interrupted (stevel)
HADOOP-6221 RPC Client operations cannot be interrupted (stevel)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1f2b6956
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1f2b6956
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1f2b6956
Branch: refs/heads/trunk
Commit: 1f2b6956c2012a7d6ea7e7ba5116d3ad71c23d7e
Parents: 21d5599
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jan 26 22:04:45 2015 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jan 26 22:04:56 2015 +0000
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 2 +
.../main/java/org/apache/hadoop/ipc/Client.java | 6 +
.../main/java/org/apache/hadoop/ipc/RPC.java | 9 +-
.../apache/hadoop/net/SocketIOWithTimeout.java | 12 +-
.../apache/hadoop/ipc/TestRPCWaitForProxy.java | 130 +++++++++++++++++++
5 files changed, 152 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f2b6956/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index e0da851..2806ee2 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -763,6 +763,8 @@ Release 2.7.0 - UNRELEASED
HADOOP-11499. Check of executorThreadsStarted in
ValueQueue#submitRefillTask() evades lock acquisition (Ted Yu via jlowe)
+ HADOOP-6221 RPC Client operations cannot be interrupted. (stevel)
+
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f2b6956/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 45a4660..dfde136 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -849,6 +849,12 @@ public class Client {
throw ioe;
}
+ // Throw the exception if the thread is interrupted
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warn("Interrupted while trying for connection");
+ throw ioe;
+ }
+
try {
Thread.sleep(action.delayMillis);
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f2b6956/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index 40f6515..8ada0ff 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -412,11 +412,18 @@ public class RPC {
throw ioe;
}
+ if (Thread.currentThread().isInterrupted()) {
+ // interrupted during some IO; this may not have been caught
+ throw new InterruptedIOException("Interrupted waiting for the proxy");
+ }
+
// wait for retry
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
- // IGNORE
+ Thread.currentThread().interrupt();
+ throw (IOException) new InterruptedIOException(
+ "Interrupted waiting for the proxy").initCause(ioe);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f2b6956/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
index ed12b3c..b50f7e9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
@@ -338,6 +338,12 @@ abstract class SocketIOWithTimeout {
return ret;
}
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedIOException("Interrupted while waiting for "
+ + "IO on channel " + channel + ". " + timeout
+ + " millis timeout left.");
+ }
+
/* Sometimes select() returns 0 much before timeout for
* unknown reasons. So select again if required.
*/
@@ -348,12 +354,6 @@ abstract class SocketIOWithTimeout {
}
}
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedIOException("Interruped while waiting for " +
- "IO on channel " + channel +
- ". " + timeout +
- " millis timeout left.");
- }
}
} finally {
if (key != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f2b6956/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
new file mode 100644
index 0000000..5807998
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCWaitForProxy.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
+import org.apache.hadoop.ipc.TestRPC.TestProtocol;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InterruptedIOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedByInterruptException;
+
+/**
+ * tests that the proxy can be interrupted
+ */
+public class TestRPCWaitForProxy extends Assert {
+ private static final String ADDRESS = "0.0.0.0";
+ private static final Logger
+ LOG = LoggerFactory.getLogger(TestRPCWaitForProxy.class);
+
+ private static final Configuration conf = new Configuration();
+
+ /**
+ * This tests that the time-bounded wait for a proxy operation works, and
+ * times out.
+ *
+ * @throws Throwable any exception other than that which was expected
+ */
+ @Test(timeout = 10000)
+ public void testWaitForProxy() throws Throwable {
+ RpcThread worker = new RpcThread(0);
+ worker.start();
+ worker.join();
+ Throwable caught = worker.getCaught();
+ assertNotNull("No exception was raised", caught);
+ if (!(caught instanceof ConnectException)) {
+ throw caught;
+ }
+ }
+
+ /**
+ * This test sets off a blocking thread and then interrupts it, before
+ * checking that the thread was interrupted
+ *
+ * @throws Throwable any exception other than that which was expected
+ */
+ @Test(timeout = 10000)
+ public void testInterruptedWaitForProxy() throws Throwable {
+ RpcThread worker = new RpcThread(100);
+ worker.start();
+ Thread.sleep(1000);
+ assertTrue("worker hasn't started", worker.waitStarted);
+ worker.interrupt();
+ worker.join();
+ Throwable caught = worker.getCaught();
+ assertNotNull("No exception was raised", caught);
+ // looking for the root cause here, which can be wrapped
+ // as part of the NetUtils work. Having this test look
+ // a the type of exception there would be brittle to improvements
+ // in exception diagnostics.
+ Throwable cause = caught.getCause();
+ if (cause == null) {
+ // no inner cause, use outer exception as root cause.
+ cause = caught;
+ }
+ if (!(cause instanceof InterruptedIOException)
+ && !(cause instanceof ClosedByInterruptException)) {
+ throw caught;
+ }
+ }
+
+ /**
+ * This thread waits for a proxy for the specified timeout, and retains any
+ * throwable that was raised in the process
+ */
+
+ private class RpcThread extends Thread {
+ private Throwable caught;
+ private int connectRetries;
+ private volatile boolean waitStarted = false;
+
+ private RpcThread(int connectRetries) {
+ this.connectRetries = connectRetries;
+ }
+ @Override
+ public void run() {
+ try {
+ Configuration config = new Configuration(conf);
+ config.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+ connectRetries);
+ config.setInt(
+ IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+ connectRetries);
+ waitStarted = true;
+ TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
+ TestProtocol.versionID,
+ new InetSocketAddress(ADDRESS, 20),
+ config,
+ 15000L);
+ proxy.echo("");
+ } catch (Throwable throwable) {
+ caught = throwable;
+ }
+ }
+
+ public Throwable getCaught() {
+ return caught;
+ }
+ }
+}