You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/11/12 22:27:48 UTC
[hadoop] branch branch-3.1 updated: HADOOP-16677. Recalculate the
remaining timeout millis correctly while throwing an InterupptedException
in SocketIOWithTimeout. (#1687)
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new b4f39c2 HADOOP-16677. Recalculate the remaining timeout millis correctly while throwing an InterupptedException in SocketIOWithTimeout. (#1687)
b4f39c2 is described below
commit b4f39c2a8acbb3620fece0e141ae1a9ad1f4319a
Author: Xudong Cao <ia...@gmail.com>
AuthorDate: Wed Nov 13 06:19:39 2019 +0800
HADOOP-16677. Recalculate the remaining timeout millis correctly while throwing an InterupptedException in SocketIOWithTimeout. (#1687)
(cherry picked from commit df6b3162c11987ba5299c69cb251332228dacf36)
(cherry picked from commit 975f669141add885d631685d693e81c44c190751)
---
.../org/apache/hadoop/net/SocketIOWithTimeout.java | 24 +++++++-------
.../apache/hadoop/net/TestSocketIOWithTimeout.java | 38 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 11 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
index f489581..312a481 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
@@ -326,34 +326,36 @@ abstract class SocketIOWithTimeout {
SelectionKey key = null;
int ret = 0;
+ long timeoutLeft = timeout;
try {
while (true) {
long start = (timeout == 0) ? 0 : Time.now();
key = channel.register(info.selector, ops);
- ret = info.selector.select(timeout);
+ ret = info.selector.select(timeoutLeft);
if (ret != 0) {
return ret;
}
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedIOException("Interrupted while waiting for "
- + "IO on channel " + channel + ". " + timeout
- + " millis timeout left.");
- }
-
/* Sometimes select() returns 0 much before timeout for
* unknown reasons. So select again if required.
*/
if (timeout > 0) {
- timeout -= Time.now() - start;
- if (timeout <= 0) {
- return 0;
- }
+ timeoutLeft -= Time.now() - start;
+ timeoutLeft = Math.max(0, timeoutLeft);
}
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedIOException("Interrupted while waiting for "
+ + "IO on channel " + channel + ". Total timeout mills is "
+ + timeout + ", " + timeoutLeft + " millis timeout left.");
+ }
+
+ if (timeoutLeft == 0) {
+ return 0;
+ }
}
} finally {
if (key != null) {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java
index f1c03cf..272eae7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java
@@ -185,4 +185,42 @@ public class TestSocketIOWithTimeout {
}
}
}
+
+ @Test
+ public void testSocketIOWithTimeoutInterrupted() throws Exception {
+ Pipe pipe = Pipe.open();
+ final int timeout = TIMEOUT * 10;
+
+ try (Pipe.SourceChannel source = pipe.source();
+ InputStream in = new SocketInputStream(source, timeout)) {
+
+ TestingThread thread = new TestingThread(ctx) {
+ @Override
+ public void doWork() throws Exception {
+ try {
+ in.read();
+ fail("Did not fail with interrupt");
+ } catch (InterruptedIOException ste) {
+ String detail = ste.getMessage();
+ String totalString = "Total timeout mills is " + timeout;
+ String leftString = "millis timeout left";
+
+ assertTrue(detail.contains(totalString));
+ assertTrue(detail.contains(leftString));
+ }
+ }
+ };
+
+ ctx.addThread(thread);
+ ctx.startThreads();
+ // If the thread is interrupted before it calls read()
+ // then it throws ClosedByInterruptException due to
+ // some Java quirk. Waiting for it to call read()
+ // gets it into select(), so we get the expected
+ // InterruptedIOException.
+ Thread.sleep(1000);
+ thread.interrupt();
+ ctx.stop();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org