You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2022/08/02 17:32:59 UTC
[brooklyn-server] 05/06: even more checks for sshj task interruption
This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit b0282af9699d732b2eae28a29c06d2b488f7efe2
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Tue Aug 2 16:51:08 2022 +0100
even more checks for sshj task interruption
---
.../util/core/internal/ssh/sshj/SshjTool.java | 10 +++++
.../internal/ssh/sshj/SshjToolIntegrationTest.java | 45 +++++++++++++++++++++-
2 files changed, 54 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java b/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java
index 06a9579df8..98d73bedac 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java
@@ -38,11 +38,13 @@ import net.schmizz.sshj.transport.TransportException;
import net.schmizz.sshj.xfer.FileSystemFile;
import net.schmizz.sshj.xfer.InMemorySourceFile;
import net.schmizz.sshj.xfer.LocalDestFile;
+import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.core.BrooklynFeatureEnablement;
import org.apache.brooklyn.util.core.internal.ssh.BackoffLimitedRetryHandler;
import org.apache.brooklyn.util.core.internal.ssh.ShellTool;
import org.apache.brooklyn.util.core.internal.ssh.SshAbstractTool;
import org.apache.brooklyn.util.core.internal.ssh.SshTool;
+import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.RuntimeTimeoutException;
import org.apache.brooklyn.util.repeat.Repeater;
@@ -1069,6 +1071,8 @@ public class SshjTool extends SshAbstractTool implements SshTool {
}
protected boolean checkInterrupted(Throwable t) {
+ // see https://github.com/hierynomus/sshj/issues/800 -- we are trying to improve SSHJ so that the thread should keep its interrupted state
+
if (Thread.currentThread().isInterrupted()) return true;
if (t!=null && Exceptions.isRootCauseIsInterruption(t)) {
// sshj has an ugly habit of catching & clearing thread interrupts, and returning wrapped in ConnectionExceptions
@@ -1076,6 +1080,12 @@ public class SshjTool extends SshAbstractTool implements SshTool {
Thread.currentThread().interrupt();
return true;
}
+ Task<?> task = Tasks.current();
+ if (task!=null && task.isCancelled()) {
+ // interrupt flag and exception might not tell the whole story; if we have a context task let's use it also
+ Thread.currentThread().interrupt();
+ return true;
+ }
return false;
}
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjToolIntegrationTest.java b/core/src/test/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjToolIntegrationTest.java
index e5309ab3d4..2ffeb1ddc7 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjToolIntegrationTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjToolIntegrationTest.java
@@ -21,7 +21,14 @@ package org.apache.brooklyn.util.core.internal.ssh.sshj;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import net.schmizz.concurrent.ExceptionChainer;
+import net.schmizz.sshj.common.SSHException;
+import net.schmizz.sshj.common.StreamCopier;
+import net.schmizz.sshj.connection.ConnectionException;
import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.sftp.SFTPException;
+import net.schmizz.sshj.transport.TransportException;
+import net.schmizz.sshj.userauth.UserAuthException;
import org.apache.brooklyn.core.BrooklynFeatureEnablement;
import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.util.core.internal.ssh.ShellTool;
@@ -39,6 +46,8 @@ import org.testng.annotations.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
@@ -337,7 +346,41 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest {
return outstr;
}
- @Test(groups = {"Integration"}, invocationCount = 10)
+ // useful if we want to understand why SSHJ is swallowing the exceptions;
+ // shouldn't be needed once we have a solution to https://github.com/hierynomus/sshj/issues/800
+// static void hackChainerLogging() {
+// Arrays.asList(SSHException.class, ConnectionException.class, TransportException.class, SFTPException.class, UserAuthException.class
+// // , StreamCopier.class // not a public field
+// ).forEach(clazz -> {
+// try {
+// Field f = clazz.getField("chainer");
+// f.setAccessible(true);
+//
+// Field modifiersField = Field.class.getDeclaredField("modifiers");
+// modifiersField.setAccessible(true);
+// modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL);
+//
+// ExceptionChainer oldValue = (ExceptionChainer) f.get(null);
+// f.set(null, new ExceptionChainer() {
+// @Override
+// public Throwable chain(Throwable t) {
+// if (Exceptions.isRootCauseIsInterruption(t)) {
+// log.warn("Caught interruption (thread interrupted? "+Thread.currentThread().isInterrupted()+")", t);
+// log.warn("... caught at", new Throwable("source of catching, in " + clazz));
+// }
+// return oldValue.chain(t);
+// }
+// });
+// } catch (Exception e) {
+// throw Exceptions.propagate(e);
+// }
+// });
+// }
+// static {
+// hackChainerLogging();
+// }
+
+ @Test(groups = {"Integration"})
public void testSshIsInterrupted() {
log.info("STARTING");
final SshTool localTool = new SshjTool(ImmutableMap.of(