You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2022/04/26 17:24:55 UTC
[accumulo] branch main updated: Handle thread interrupt in server client execute loop (#2622)
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new ef0f52b16a Handle thread interrupt in server client execute loop (#2622)
ef0f52b16a is described below
commit ef0f52b16ae148bceb74d9d3ff2a091a4d497aeb
Author: nikita <72...@users.noreply.github.com>
AuthorDate: Tue Apr 26 06:35:50 2022 -0700
Handle thread interrupt in server client execute loop (#2622)
Re-throw ClosedByInterruptException as UncheckedIOException to
exit out of interrupted retry loops to improve the client side user
experience when interrupting the process with `Ctrl+C`
This fixes #2621
Co-authored-by: Nikita Sirohi <ni...@gh.st>
---
.../java/org/apache/accumulo/core/rpc/TTimeoutTransport.java | 10 ++++++++++
.../src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java | 6 ++++++
.../main/java/org/apache/accumulo/server/rpc/TServerUtils.java | 6 ++++++
3 files changed, 22 insertions(+)
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
index d7068c718d..b69d081e28 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
@@ -23,9 +23,11 @@ import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
+import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.spi.SelectorProvider;
import org.apache.accumulo.core.util.HostAndPort;
@@ -87,6 +89,10 @@ public class TTimeoutTransport {
socket = openSocket(addr, (int) timeoutMillis);
} catch (IOException e) {
// openSocket handles closing the Socket on error
+ if (e instanceof ClosedByInterruptException) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedIOException(e);
+ }
throw new TTransportException(e);
}
@@ -100,6 +106,10 @@ public class TTimeoutTransport {
return new TIOStreamTransport(input, output);
} catch (IOException e) {
closeSocket(socket, e);
+ if (e instanceof ClosedByInterruptException) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedIOException(e);
+ }
throw new TTransportException(e);
} catch (TTransportException e) {
closeSocket(socket, e);
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index 3e63ac1cbf..30c6d8a50b 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@ -20,7 +20,9 @@ package org.apache.accumulo.core.rpc;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.net.InetAddress;
+import java.nio.channels.ClosedByInterruptException;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.HashMap;
@@ -328,6 +330,10 @@ public class ThriftUtil {
throw e;
} catch (IOException e) {
log.warn("Failed to open SASL transport", e);
+ if (e instanceof ClosedByInterruptException) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedIOException(e);
+ }
throw new TTransportException(e);
}
} else {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index ba8ba2d8f8..0910f5332e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -21,10 +21,12 @@ package org.apache.accumulo.server.rpc;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
+import java.nio.channels.ClosedByInterruptException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
@@ -527,6 +529,10 @@ public class TServerUtils {
serverUser = UserGroupInformation.getLoginUser();
} catch (IOException e) {
transport.close();
+ if (e instanceof ClosedByInterruptException) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedIOException(e);
+ }
throw new TTransportException(e);
}