You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2022/04/26 17:24:55 UTC

[accumulo] branch main updated: Handle thread interrupt in server client execute loop (#2622)

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new ef0f52b16a Handle thread interrupt in server client execute loop (#2622)
ef0f52b16a is described below

commit ef0f52b16ae148bceb74d9d3ff2a091a4d497aeb
Author: nikita <72...@users.noreply.github.com>
AuthorDate: Tue Apr 26 06:35:50 2022 -0700

    Handle thread interrupt in server client execute loop (#2622)
    
    Re-throw ClosedByInterruptException as UncheckedIOException to
    exit out of interrupted retry loops to improve the client side user
    experience when interrupting the process with `Ctrl+C`
    
    This fixes #2621
    
    Co-authored-by: Nikita Sirohi <ni...@gh.st>
---
 .../java/org/apache/accumulo/core/rpc/TTimeoutTransport.java   | 10 ++++++++++
 .../src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java |  6 ++++++
 .../main/java/org/apache/accumulo/server/rpc/TServerUtils.java |  6 ++++++
 3 files changed, 22 insertions(+)

diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
index d7068c718d..b69d081e28 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
@@ -23,9 +23,11 @@ import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.UncheckedIOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketAddress;
+import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.spi.SelectorProvider;
 
 import org.apache.accumulo.core.util.HostAndPort;
@@ -87,6 +89,10 @@ public class TTimeoutTransport {
       socket = openSocket(addr, (int) timeoutMillis);
     } catch (IOException e) {
       // openSocket handles closing the Socket on error
+      if (e instanceof ClosedByInterruptException) {
+        Thread.currentThread().interrupt();
+        throw new UncheckedIOException(e);
+      }
       throw new TTransportException(e);
     }
 
@@ -100,6 +106,10 @@ public class TTimeoutTransport {
       return new TIOStreamTransport(input, output);
     } catch (IOException e) {
       closeSocket(socket, e);
+      if (e instanceof ClosedByInterruptException) {
+        Thread.currentThread().interrupt();
+        throw new UncheckedIOException(e);
+      }
       throw new TTransportException(e);
     } catch (TTransportException e) {
       closeSocket(socket, e);
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index 3e63ac1cbf..30c6d8a50b 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@ -20,7 +20,9 @@ package org.apache.accumulo.core.rpc;
 
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.net.InetAddress;
+import java.nio.channels.ClosedByInterruptException;
 import java.security.KeyStore;
 import java.security.SecureRandom;
 import java.util.HashMap;
@@ -328,6 +330,10 @@ public class ThriftUtil {
           throw e;
         } catch (IOException e) {
           log.warn("Failed to open SASL transport", e);
+          if (e instanceof ClosedByInterruptException) {
+            Thread.currentThread().interrupt();
+            throw new UncheckedIOException(e);
+          }
           throw new TTransportException(e);
         }
       } else {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index ba8ba2d8f8..0910f5332e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -21,10 +21,12 @@ package org.apache.accumulo.server.rpc;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.UnknownHostException;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashSet;
@@ -527,6 +529,10 @@ public class TServerUtils {
       serverUser = UserGroupInformation.getLoginUser();
     } catch (IOException e) {
       transport.close();
+      if (e instanceof ClosedByInterruptException) {
+        Thread.currentThread().interrupt();
+        throw new UncheckedIOException(e);
+      }
       throw new TTransportException(e);
     }