You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/01/03 18:02:28 UTC
[airavata] 01/01: Fixing airavata-3287: Evicting invalidated ssh
clients from the pool
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch airavata-3287
in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 82139606a0ad46cdd740da5e92bc30196565b027
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Fri Jan 3 13:02:11 2020 -0500
Fixing airavata-3287: Evicting invalidated ssh clients from the pool
---
.../airavata/helix/adaptor/PoolingSSHJClient.java | 75 +++++----
.../airavata/helix/adaptor/SSHJAgentAdaptor.java | 169 +++++++++++++++++++--
.../adaptor/wrapper/SCPFileTransferWrapper.java | 12 +-
.../helix/adaptor/wrapper/SFTPClientWrapper.java | 12 +-
.../helix/adaptor/wrapper/SSHClientWrapper.java | 25 +++
.../helix/adaptor/wrapper/SessionWrapper.java | 12 +-
.../helix/impl/workflow/PreWorkflowManager.java | 33 ++--
7 files changed, 284 insertions(+), 54 deletions(-)
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
index 7b8eb94..05f37ac 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
@@ -22,7 +22,7 @@ package org.apache.airavata.helix.adaptor;
import net.schmizz.sshj.Config;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.common.DisconnectReason;
-import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.connection.ConnectionException;
import net.schmizz.sshj.sftp.RemoteFile;
import net.schmizz.sshj.sftp.SFTPClient;
import net.schmizz.sshj.transport.DisconnectListener;
@@ -32,6 +32,7 @@ import net.schmizz.sshj.userauth.UserAuthException;
import net.schmizz.sshj.userauth.method.AuthMethod;
import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
import org.apache.airavata.helix.adaptor.wrapper.SFTPClientWrapper;
+import org.apache.airavata.helix.adaptor.wrapper.SSHClientWrapper;
import org.apache.airavata.helix.adaptor.wrapper.SessionWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +55,7 @@ public class PoolingSSHJClient extends SSHClient {
private final static Logger logger = LoggerFactory.getLogger(PoolingSSHJClient.class);
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private final Map<SSHClient, SSHClientInfo> clientInfoMap = new HashMap<>();
+ private final Map<SSHClientWrapper, SSHClientInfo> clientInfoMap = new HashMap<>();
private HostKeyVerifier hostKeyVerifier;
private String username;
@@ -96,8 +97,8 @@ public class PoolingSSHJClient extends SSHClient {
////////////////// client specific operations ///////
- private SSHClient newClientWithSessionValidation() throws IOException {
- SSHClient newClient = createNewSSHClient();
+ private SSHClientWrapper newClientWithSessionValidation() throws IOException {
+ SSHClientWrapper newClient = createNewSSHClient();
SSHClientInfo info = new SSHClientInfo(1, System.currentTimeMillis(), clientInfoMap.size());
clientInfoMap.put(newClient, info);
@@ -132,7 +133,7 @@ public class PoolingSSHJClient extends SSHClient {
return newClient;
}
- private SSHClient leaseSSHClient() throws Exception {
+ private SSHClientWrapper leaseSSHClient() throws Exception {
lock.writeLock().lock();
try {
@@ -141,9 +142,9 @@ public class PoolingSSHJClient extends SSHClient {
} else {
- Optional<Map.Entry<SSHClient, SSHClientInfo>> minEntryOp = clientInfoMap.entrySet().stream().min(Comparator.comparing(entry -> entry.getValue().sessionCount));
+ Optional<Map.Entry<SSHClientWrapper, SSHClientInfo>> minEntryOp = clientInfoMap.entrySet().stream().min(Comparator.comparing(entry -> entry.getValue().sessionCount));
if (minEntryOp.isPresent()) {
- Map.Entry<SSHClient, SSHClientInfo> minEntry = minEntryOp.get();
+ Map.Entry<SSHClientWrapper, SSHClientInfo> minEntry = minEntryOp.get();
// use the connection with least amount of sessions created.
logger.debug("Session count for selected connection {} is {}. Threshold {} for host {}",
@@ -160,13 +161,12 @@ public class PoolingSSHJClient extends SSHClient {
minEntry.getValue().setSessionCount(minEntry.getValue().getSessionCount() + 1);
minEntry.getValue().setLastAccessedTime(System.currentTimeMillis());
- SSHClient sshClient = minEntry.getKey();
+ SSHClientWrapper sshClient = minEntry.getKey();
- if (!sshClient.isConnected() || !sshClient.isAuthenticated()) {
+ if (!sshClient.isConnected() || !sshClient.isAuthenticated() || !sshClient.isErrored()) {
logger.warn("Client for host {} is not connected or not authenticated. Creating a new client", host);
removeDisconnectedClients(sshClient);
return newClientWithSessionValidation();
-
} else {
return sshClient;
}
@@ -181,10 +181,17 @@ public class PoolingSSHJClient extends SSHClient {
}
}
- private void removeDisconnectedClients(SSHClient client) {
+ private void removeDisconnectedClients(SSHClientWrapper client) {
lock.writeLock().lock();
try {
+ client.disconnect();
+ } catch (Exception e) {
+ log.warn("Errored while disconnecting the client " + e.getMessage());
+ // Ignore
+ }
+
+ try {
if (clientInfoMap.containsKey(client)) {
logger.debug("Removing the disconnected connection {} for host {}", clientInfoMap.get(client).getClientId(), host);
clientInfoMap.remove(client);
@@ -195,7 +202,7 @@ public class PoolingSSHJClient extends SSHClient {
}
}
- private void untrackClosedSessions(SSHClient client, int sessionId) {
+ private void untrackClosedSessions(SSHClientWrapper client, int sessionId) {
lock.writeLock().lock();
try {
@@ -211,7 +218,7 @@ public class PoolingSSHJClient extends SSHClient {
}
private void removeStaleConnections() {
- List<Map.Entry<SSHClient, SSHClientInfo>> entriesTobeRemoved;
+ List<Map.Entry<SSHClientWrapper, SSHClientInfo>> entriesTobeRemoved;
lock.writeLock().lock();
logger.info("Current active connections for {} @ {} : {} are {}", username, host, port, clientInfoMap.size());
try {
@@ -235,13 +242,13 @@ public class PoolingSSHJClient extends SSHClient {
});
}
- private SSHClient createNewSSHClient() throws IOException {
+ private SSHClientWrapper createNewSSHClient() throws IOException {
- SSHClient sshClient;
+ SSHClientWrapper sshClient;
if (config != null) {
- sshClient = new SSHClient(config);
+ sshClient = new SSHClientWrapper(config);
} else {
- sshClient = new SSHClient();
+ sshClient = new SSHClientWrapper();
}
sshClient.getConnection().getTransport().setDisconnectListener(new DisconnectListener() {
@@ -267,15 +274,20 @@ public class PoolingSSHJClient extends SSHClient {
return sshClient;
}
- public Session startSessionWrapper() throws Exception {
+ public SessionWrapper startSessionWrapper() throws Exception {
- final SSHClient sshClient = leaseSSHClient();
+ final SSHClientWrapper sshClient = leaseSSHClient();
try {
- return new SessionWrapper(sshClient.startSession(), (id) -> untrackClosedSessions(sshClient, id));
+ return new SessionWrapper(sshClient.startSession(), (id) -> untrackClosedSessions(sshClient, id), sshClient);
} catch (Exception e) {
if (sshClient != null) {
+ // If it is a ConnectionExceptions, explicitly invalidate the client
+ if (e instanceof ConnectionException) {
+ sshClient.setErrored(true);
+ }
+
untrackClosedSessions(sshClient, -1);
}
throw e;
@@ -284,28 +296,39 @@ public class PoolingSSHJClient extends SSHClient {
public SCPFileTransferWrapper newSCPFileTransferWrapper() throws Exception {
- final SSHClient sshClient = leaseSSHClient();
+ final SSHClientWrapper sshClient = leaseSSHClient();
try {
- return new SCPFileTransferWrapper(sshClient.newSCPFileTransfer(), (id) -> untrackClosedSessions(sshClient, id));
+ return new SCPFileTransferWrapper(sshClient.newSCPFileTransfer(), (id) -> untrackClosedSessions(sshClient, id), sshClient);
} catch (Exception e) {
+
if (sshClient != null) {
+ // If it is a ConnectionExceptions, explicitly invalidate the client
+ if (e instanceof ConnectionException) {
+ sshClient.setErrored(true);
+ }
+
untrackClosedSessions(sshClient, -1);
}
throw e;
}
}
- public SFTPClient newSFTPClientWrapper() throws Exception {
+ public SFTPClientWrapper newSFTPClientWrapper() throws Exception {
- final SSHClient sshClient = leaseSSHClient();
+ final SSHClientWrapper sshClient = leaseSSHClient();
try {
- return new SFTPClientWrapper(sshClient.newSFTPClient(), (id) -> untrackClosedSessions(sshClient, id));
+ return new SFTPClientWrapper(sshClient.newSFTPClient(), (id) -> untrackClosedSessions(sshClient, id), sshClient);
} catch (Exception e) {
if (sshClient != null) {
+ // If it is a ConnectionExceptions, explicitly invalidate the client
+ if (e instanceof ConnectionException) {
+ sshClient.setErrored(true);
+ }
+
untrackClosedSessions(sshClient, -1);
}
throw e;
@@ -405,7 +428,7 @@ public class PoolingSSHJClient extends SSHClient {
return this;
}
- public Map<SSHClient, SSHClientInfo> getClientInfoMap() {
+ public Map<SSHClientWrapper, SSHClientInfo> getClientInfoMap() {
return clientInfoMap;
}
}
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
index 05ce146..5eae723 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
@@ -22,6 +22,7 @@ package org.apache.airavata.helix.adaptor;
import com.google.common.collect.Lists;
import net.schmizz.keepalive.KeepAliveProvider;
import net.schmizz.sshj.DefaultConfig;
+import net.schmizz.sshj.connection.ConnectionException;
import net.schmizz.sshj.connection.channel.direct.Session;
import net.schmizz.sshj.sftp.*;
import net.schmizz.sshj.userauth.keyprovider.KeyProvider;
@@ -38,6 +39,8 @@ import net.schmizz.sshj.xfer.LocalFileFilter;
import net.schmizz.sshj.xfer.LocalSourceFile;
import org.apache.airavata.agents.api.*;
import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
+import org.apache.airavata.helix.adaptor.wrapper.SFTPClientWrapper;
+import org.apache.airavata.helix.adaptor.wrapper.SessionWrapper;
import org.apache.airavata.helix.agent.ssh.StandardOutReader;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
@@ -165,8 +168,9 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
@Override
public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException {
- try (Session session = sshjClient.startSessionWrapper()) {
-
+ SessionWrapper session = null;
+ try {
+ session = sshjClient.startSessionWrapper();
Session.Command exec = session.exec((workingDirectory != null ? "cd " + workingDirectory + "; " : "") + command);
StandardOutReader standardOutReader = new StandardOutReader();
@@ -178,8 +182,21 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
standardOutReader.setExitCode(Optional.ofNullable(exec.getExitStatus()).orElseThrow(() -> new Exception("Exit status received as null")));
}
return standardOutReader;
+
} catch (Exception e) {
+ if (e instanceof ConnectionException) {
+ Optional.ofNullable(session).ifPresent(ft -> ft.setErrored(true));
+ }
throw new AgentException(e);
+
+ } finally {
+ Optional.ofNullable(session).ifPresent(ss -> {
+ try {
+ ss.close();
+ } catch (IOException e) {
+ //Ignore
+ }
+ });
}
}
@@ -190,29 +207,61 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
@Override
public void createDirectory(String path, boolean recursive) throws AgentException {
- try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+ SFTPClientWrapper sftpClient = null;
+ try {
+ sftpClient = sshjClient.newSFTPClientWrapper();
if (recursive) {
sftpClient.mkdirs(path);
} else {
sftpClient.mkdir(path);
}
} catch (Exception e) {
+ if (e instanceof ConnectionException) {
+ Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
+ }
throw new AgentException(e);
+
+ } finally {
+ Optional.ofNullable(sftpClient).ifPresent(client -> {
+ try {
+ client.close();
+ } catch (IOException e) {
+ //Ignore
+ }
+ });
}
}
@Override
public void uploadFile(String localFile, String remoteFile) throws AgentException {
- try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+ SCPFileTransferWrapper fileTransfer = null;
+ try {
+ fileTransfer = sshjClient.newSCPFileTransferWrapper();
fileTransfer.upload(localFile, remoteFile);
+
} catch (Exception e) {
+ if (e instanceof ConnectionException) {
+ Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true));
+ }
throw new AgentException(e);
+
+ } finally {
+ Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> {
+ try {
+ scpFileTransferWrapper.close();
+ } catch (IOException e) {
+ //Ignore
+ }
+ });
}
}
@Override
public void uploadFile(InputStream localInStream, FileMetadata metadata, String remoteFile) throws AgentException {
- try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+ SCPFileTransferWrapper fileTransfer = null;
+
+ try {
+ fileTransfer = sshjClient.newSCPFileTransferWrapper();
fileTransfer.upload(new LocalSourceFile() {
@Override
public String getName() {
@@ -265,22 +314,51 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
}
}, remoteFile);
} catch (Exception e) {
+ if (e instanceof ConnectionException) {
+ Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true));
+ }
throw new AgentException(e);
+
+ } finally {
+ Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> {
+ try {
+ scpFileTransferWrapper.close();
+ } catch (IOException e) {
+ //Ignore
+ }
+ });
}
}
@Override
public void downloadFile(String remoteFile, String localFile) throws AgentException {
- try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+ SCPFileTransferWrapper fileTransfer = null;
+ try {
+ fileTransfer = sshjClient.newSCPFileTransferWrapper();
fileTransfer.download(remoteFile, localFile);
+
} catch (Exception e) {
+ if (e instanceof ConnectionException) {
+ Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true));
+ }
throw new AgentException(e);
+
+ } finally {
+ Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> {
+ try {
+ scpFileTransferWrapper.close();
+ } catch (IOException e) {
+ //Ignore
+ }
+ });
}
}
@Override
public void downloadFile(String remoteFile, OutputStream localOutStream, FileMetadata metadata) throws AgentException {
- try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+ SCPFileTransferWrapper fileTransfer = null;
+ try {
+ fileTransfer = sshjClient.newSCPFileTransferWrapper();
fileTransfer.download(remoteFile, new LocalDestFile() {
@Override
public OutputStream getOutputStream() throws IOException {
@@ -318,43 +396,98 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
}
});
} catch (Exception e) {
+ if (e instanceof ConnectionException) {
+ Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true));
+ }
throw new AgentException(e);
+
+ } finally {
+ Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> {
+ try {
+ scpFileTransferWrapper.close();
+ } catch (IOException e) {
+ //Ignore
+ }
+ });
}
}
@Override
public List<String> listDirectory(String path) throws AgentException {
- try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+ SFTPClientWrapper sftpClient = null;
+ try {
+ sftpClient = sshjClient.newSFTPClientWrapper();
List<RemoteResourceInfo> ls = sftpClient.ls(path);
return ls.stream().map(RemoteResourceInfo::getName).collect(Collectors.toList());
} catch (Exception e) {
+ if (e instanceof ConnectionException) {
+ Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
+ }
throw new AgentException(e);
+
+ } finally {
+ Optional.ofNullable(sftpClient).ifPresent(client -> {
+ try {
+ client.close();
+ } catch (IOException e) {
+ //Ignore
+ }
+ });
}
}
@Override
public Boolean doesFileExist(String filePath) throws AgentException {
- try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+ SFTPClientWrapper sftpClient = null;
+ try {
+ sftpClient = sshjClient.newSFTPClientWrapper();
return sftpClient.statExistence(filePath) != null;
} catch (Exception e) {
+ if (e instanceof ConnectionException) {
+ Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
+ }
throw new AgentException(e);
+
+ } finally {
+ Optional.ofNullable(sftpClient).ifPresent(client -> {
+ try {
+ client.close();
+ } catch (IOException e) {
+ //Ignore
+ }
+ });
}
}
@Override
public List<String> getFileNameFromExtension(String fileName, String parentPath) throws AgentException {
-
- try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+ SFTPClientWrapper sftpClient = null;
+ try {
+ sftpClient = sshjClient.newSFTPClientWrapper();
List<RemoteResourceInfo> ls = sftpClient.ls(parentPath, resource -> isMatch(resource.getName(), fileName));
return ls.stream().map(RemoteResourceInfo::getPath).collect(Collectors.toList());
} catch (Exception e) {
+ if (e instanceof ConnectionException) {
+ Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
+ }
throw new AgentException(e);
+
+ } finally {
+ Optional.ofNullable(sftpClient).ifPresent(scpFileTransferWrapper -> {
+ try {
+ scpFileTransferWrapper.close();
+ } catch (IOException e) {
+ //Ignore
+ }
+ });
}
}
@Override
public FileMetadata getFileMetadata(String remoteFile) throws AgentException {
- try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+ SFTPClientWrapper sftpClient = null;
+ try {
+ sftpClient = sshjClient.newSFTPClientWrapper();
FileAttributes stat = sftpClient.stat(remoteFile);
FileMetadata metadata = new FileMetadata();
metadata.setName(new File(remoteFile).getName());
@@ -362,7 +495,19 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
metadata.setPermissions(FilePermission.toMask(stat.getPermissions()));
return metadata;
} catch (Exception e) {
+ if (e instanceof ConnectionException) {
+ Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
+ }
throw new AgentException(e);
+
+ } finally {
+ Optional.ofNullable(sftpClient).ifPresent(scpFileTransferWrapper -> {
+ try {
+ scpFileTransferWrapper.close();
+ } catch (IOException e) {
+ //Ignore
+ }
+ });
}
}
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java
index ff49e5b..2aaad74 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java
@@ -33,10 +33,12 @@ public class SCPFileTransferWrapper implements FileTransfer, Closeable {
private SCPFileTransfer scpFileTransfer;
private Consumer<Integer> onCloseFunction;
+ private SSHClientWrapper originalSSHClient;
- public SCPFileTransferWrapper(SCPFileTransfer scpFileTransfer, Consumer<Integer> onCloseFunction) {
+ public SCPFileTransferWrapper(SCPFileTransfer scpFileTransfer, Consumer<Integer> onCloseFunction, SSHClientWrapper originalSSHClient) {
this.scpFileTransfer = scpFileTransfer;
this.onCloseFunction = onCloseFunction;
+ this.originalSSHClient = originalSSHClient;
}
@Override
@@ -73,4 +75,12 @@ public class SCPFileTransferWrapper implements FileTransfer, Closeable {
public void close() throws IOException {
onCloseFunction.accept(-1);
}
+
+ public boolean isErrored() {
+ return originalSSHClient.isErrored();
+ }
+
+ public void setErrored(boolean errored) {
+ this.originalSSHClient.setErrored(errored);
+ }
}
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java
index ed21510..635ddfd 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java
@@ -27,10 +27,12 @@ import java.util.function.Consumer;
public class SFTPClientWrapper extends SFTPClient {
private SFTPClient sftpClient;
private Consumer<Integer> onCloseFunction;
+ private SSHClientWrapper originalSSHClient;
- public SFTPClientWrapper(SFTPClient sftpClient, Consumer<Integer> onCloseFunction) {
+ public SFTPClientWrapper(SFTPClient sftpClient, Consumer<Integer> onCloseFunction, SSHClientWrapper originalSSHClient) {
super(sftpClient.getSFTPEngine());
this.onCloseFunction = onCloseFunction;
+ this.originalSSHClient = originalSSHClient;
}
@Override
@@ -38,4 +40,12 @@ public class SFTPClientWrapper extends SFTPClient {
onCloseFunction.accept(-1);
super.close();
}
+
+ public boolean isErrored() {
+ return originalSSHClient.isErrored();
+ }
+
+ public void setErrored(boolean errored) {
+ this.originalSSHClient.setErrored(errored);
+ }
}
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SSHClientWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SSHClientWrapper.java
new file mode 100644
index 0000000..ff66ca1
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SSHClientWrapper.java
@@ -0,0 +1,25 @@
+package org.apache.airavata.helix.adaptor.wrapper;
+
+import net.schmizz.sshj.Config;
+import net.schmizz.sshj.SSHClient;
+
+public class SSHClientWrapper extends SSHClient {
+
+ public SSHClientWrapper() {
+ super();
+ }
+
+ public SSHClientWrapper(Config config) {
+ super(config);
+ }
+
+ private boolean errored = false;
+
+ public boolean isErrored() {
+ return errored;
+ }
+
+ public void setErrored(boolean errored) {
+ this.errored = errored;
+ }
+}
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java
index a1899b1..6713d5f 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java
@@ -39,10 +39,12 @@ public class SessionWrapper implements Session {
private Session session;
private Consumer<Integer> onCloseFunction;
+ private SSHClientWrapper originalSSHClient;
- public SessionWrapper(Session session, Consumer<Integer> onCloseFunction) {
+ public SessionWrapper(Session session, Consumer<Integer> onCloseFunction, SSHClientWrapper originalSSHClient) {
this.session = session;
this.onCloseFunction = onCloseFunction;
+ this.originalSSHClient = originalSSHClient;
}
@Override
@@ -180,4 +182,12 @@ public class SessionWrapper implements Session {
public void handle(Message msg, SSHPacket buf) throws SSHException {
session.handle(msg, buf);
}
+
+ public boolean isErrored() {
+ return originalSSHClient.isErrored();
+ }
+
+ public void setErrored(boolean errored) {
+ this.originalSSHClient.setErrored(errored);
+ }
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index a7de7ad..49b516b 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -160,22 +160,28 @@ public class PreWorkflowManager extends WorkflowManager {
}
String experimentId = processModel.getExperimentId();
-
- List<String> workflows = processModel.getProcessWorkflows().stream().map(ProcessWorkflow::getWorkflowId).collect(Collectors.toList());
final List<AbstractTask> allTasks = new ArrayList<>();
- if (workflows != null && workflows.size() > 0) {
- for (String wf : workflows) {
- logger.info("Creating cancellation task for workflow " + wf + " of process " + processId);
- WorkflowCancellationTask wfct = new WorkflowCancellationTask();
- wfct.setTaskId(UUID.randomUUID().toString());
- wfct.setCancellingWorkflowName(wf);
-
- if (allTasks.size() > 0) {
- allTasks.get(allTasks.size() -1).setNextTask(new OutPort(wfct.getTaskId(), wfct));
+
+ Optional<List<String>> workflowsOpt = Optional.ofNullable(processModel.getProcessWorkflows()).map(wfs -> wfs.stream().map(ProcessWorkflow::getWorkflowId).collect(Collectors.toList()));
+
+ if (workflowsOpt.isPresent()) {
+ List<String> workflows = workflowsOpt.get();
+ if (workflows.size() > 0) {
+ for (String wf : workflows) {
+ logger.info("Creating cancellation task for workflow " + wf + " of process " + processId);
+ WorkflowCancellationTask wfct = new WorkflowCancellationTask();
+ wfct.setTaskId(UUID.randomUUID().toString());
+ wfct.setCancellingWorkflowName(wf);
+
+ if (allTasks.size() > 0) {
+ allTasks.get(allTasks.size() - 1).setNextTask(new OutPort(wfct.getTaskId(), wfct));
+ }
+ allTasks.add(wfct);
}
- allTasks.add(wfct);
- }
+ } else {
+ logger.warn("No workflow registered with process " + processId + " to cancel");
+ }
} else {
logger.warn("No workflow registered with process " + processId + " to cancel");
}
@@ -212,6 +218,7 @@ public class PreWorkflowManager extends WorkflowManager {
public static void main(String[] args) throws Exception {
PreWorkflowManager preWorkflowManager = new PreWorkflowManager();
preWorkflowManager.startServer();
+ preWorkflowManager.createAndLaunchCancelWorkflow("PROCESS_7e19c779-e326-43d2-b025-aaa3a4b44c95","ultrascan");
}
private class ProcessLaunchMessageHandler implements MessageHandler {