You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2022/08/02 17:32:59 UTC

[brooklyn-server] 05/06: even more checks for sshj task interruption

This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit b0282af9699d732b2eae28a29c06d2b488f7efe2
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Tue Aug 2 16:51:08 2022 +0100

    even more checks for sshj task interruption
---
 .../util/core/internal/ssh/sshj/SshjTool.java      | 10 +++++
 .../internal/ssh/sshj/SshjToolIntegrationTest.java | 45 +++++++++++++++++++++-
 2 files changed, 54 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java b/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java
index 06a9579df8..98d73bedac 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java
@@ -38,11 +38,13 @@ import net.schmizz.sshj.transport.TransportException;
 import net.schmizz.sshj.xfer.FileSystemFile;
 import net.schmizz.sshj.xfer.InMemorySourceFile;
 import net.schmizz.sshj.xfer.LocalDestFile;
+import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.core.BrooklynFeatureEnablement;
 import org.apache.brooklyn.util.core.internal.ssh.BackoffLimitedRetryHandler;
 import org.apache.brooklyn.util.core.internal.ssh.ShellTool;
 import org.apache.brooklyn.util.core.internal.ssh.SshAbstractTool;
 import org.apache.brooklyn.util.core.internal.ssh.SshTool;
+import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.exceptions.RuntimeTimeoutException;
 import org.apache.brooklyn.util.repeat.Repeater;
@@ -1069,6 +1071,8 @@ public class SshjTool extends SshAbstractTool implements SshTool {
     }
 
     protected boolean checkInterrupted(Throwable t) {
+        // see https://github.com/hierynomus/sshj/issues/800 -- we are trying to improve SSHJ so that the thread should keep its interrupted state
+
         if (Thread.currentThread().isInterrupted()) return true;
         if (t!=null && Exceptions.isRootCauseIsInterruption(t)) {
             // sshj has an ugly habit of catching & clearing thread interrupts, and returning wrapped in ConnectionExceptions
@@ -1076,6 +1080,12 @@ public class SshjTool extends SshAbstractTool implements SshTool {
             Thread.currentThread().interrupt();
             return true;
         }
+        Task<?> task = Tasks.current();
+        if (task!=null && task.isCancelled()) {
+            //  interrupt flag and exception might not tell the whole story; if we have a context task let's use it also
+            Thread.currentThread().interrupt();
+            return true;
+        }
         return false;
     }
 
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjToolIntegrationTest.java b/core/src/test/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjToolIntegrationTest.java
index e5309ab3d4..2ffeb1ddc7 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjToolIntegrationTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjToolIntegrationTest.java
@@ -21,7 +21,14 @@ package org.apache.brooklyn.util.core.internal.ssh.sshj;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import net.schmizz.concurrent.ExceptionChainer;
+import net.schmizz.sshj.common.SSHException;
+import net.schmizz.sshj.common.StreamCopier;
+import net.schmizz.sshj.connection.ConnectionException;
 import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.sftp.SFTPException;
+import net.schmizz.sshj.transport.TransportException;
+import net.schmizz.sshj.userauth.UserAuthException;
 import org.apache.brooklyn.core.BrooklynFeatureEnablement;
 import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.core.internal.ssh.ShellTool;
@@ -39,6 +46,8 @@ import org.testng.annotations.Test;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -337,7 +346,41 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest {
         return outstr;
     }
 
-    @Test(groups = {"Integration"}, invocationCount = 10)
+    // useful if we want to understand why SSHJ is swallowing the exceptions;
+    // shouldn't be needed once we have a solution to https://github.com/hierynomus/sshj/issues/800
+//    static void hackChainerLogging() {
+//        Arrays.asList(SSHException.class, ConnectionException.class, TransportException.class, SFTPException.class, UserAuthException.class
+//                // , StreamCopier.class   // not a public field
+//        ).forEach(clazz -> {
+//            try {
+//                Field f = clazz.getField("chainer");
+//                f.setAccessible(true);
+//
+//                Field modifiersField = Field.class.getDeclaredField("modifiers");
+//                modifiersField.setAccessible(true);
+//                modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL);
+//
+//                ExceptionChainer oldValue = (ExceptionChainer) f.get(null);
+//                f.set(null, new ExceptionChainer() {
+//                    @Override
+//                    public Throwable chain(Throwable t) {
+//                        if (Exceptions.isRootCauseIsInterruption(t)) {
+//                            log.warn("Caught interruption (thread interrupted? "+Thread.currentThread().isInterrupted()+")", t);
+//                            log.warn("... caught at", new Throwable("source of catching, in " + clazz));
+//                        }
+//                        return oldValue.chain(t);
+//                    }
+//                });
+//            } catch (Exception e) {
+//                throw Exceptions.propagate(e);
+//            }
+//        });
+//    }
+//    static {
+//        hackChainerLogging();
+//    }
+
+    @Test(groups = {"Integration"})
     public void testSshIsInterrupted() {
         log.info("STARTING");
         final SshTool localTool = new SshjTool(ImmutableMap.of(