You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/01/03 18:02:28 UTC

[airavata] 01/01: Fixing airavata-3287: Evicting invalidated ssh clients from the pool

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch airavata-3287
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 82139606a0ad46cdd740da5e92bc30196565b027
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Fri Jan 3 13:02:11 2020 -0500

    Fixing airavata-3287: Evicting invalidated ssh clients from the pool
---
 .../airavata/helix/adaptor/PoolingSSHJClient.java  |  75 +++++----
 .../airavata/helix/adaptor/SSHJAgentAdaptor.java   | 169 +++++++++++++++++++--
 .../adaptor/wrapper/SCPFileTransferWrapper.java    |  12 +-
 .../helix/adaptor/wrapper/SFTPClientWrapper.java   |  12 +-
 .../helix/adaptor/wrapper/SSHClientWrapper.java    |  25 +++
 .../helix/adaptor/wrapper/SessionWrapper.java      |  12 +-
 .../helix/impl/workflow/PreWorkflowManager.java    |  33 ++--
 7 files changed, 284 insertions(+), 54 deletions(-)

diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
index 7b8eb94..05f37ac 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
@@ -22,7 +22,7 @@ package org.apache.airavata.helix.adaptor;
 import net.schmizz.sshj.Config;
 import net.schmizz.sshj.SSHClient;
 import net.schmizz.sshj.common.DisconnectReason;
-import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.connection.ConnectionException;
 import net.schmizz.sshj.sftp.RemoteFile;
 import net.schmizz.sshj.sftp.SFTPClient;
 import net.schmizz.sshj.transport.DisconnectListener;
@@ -32,6 +32,7 @@ import net.schmizz.sshj.userauth.UserAuthException;
 import net.schmizz.sshj.userauth.method.AuthMethod;
 import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
 import org.apache.airavata.helix.adaptor.wrapper.SFTPClientWrapper;
+import org.apache.airavata.helix.adaptor.wrapper.SSHClientWrapper;
 import org.apache.airavata.helix.adaptor.wrapper.SessionWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +55,7 @@ public class PoolingSSHJClient extends SSHClient {
     private final static Logger logger = LoggerFactory.getLogger(PoolingSSHJClient.class);
 
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
-    private final Map<SSHClient, SSHClientInfo> clientInfoMap = new HashMap<>();
+    private final Map<SSHClientWrapper, SSHClientInfo> clientInfoMap = new HashMap<>();
 
     private HostKeyVerifier hostKeyVerifier;
     private String username;
@@ -96,8 +97,8 @@ public class PoolingSSHJClient extends SSHClient {
 
     ////////////////// client specific operations ///////
 
-    private SSHClient newClientWithSessionValidation() throws IOException {
-        SSHClient newClient = createNewSSHClient();
+    private SSHClientWrapper newClientWithSessionValidation() throws IOException {
+        SSHClientWrapper newClient = createNewSSHClient();
         SSHClientInfo info = new SSHClientInfo(1, System.currentTimeMillis(), clientInfoMap.size());
         clientInfoMap.put(newClient, info);
 
@@ -132,7 +133,7 @@ public class PoolingSSHJClient extends SSHClient {
         return newClient;
     }
 
-    private SSHClient leaseSSHClient() throws Exception {
+    private SSHClientWrapper leaseSSHClient() throws Exception {
         lock.writeLock().lock();
 
         try {
@@ -141,9 +142,9 @@ public class PoolingSSHJClient extends SSHClient {
 
             } else {
 
-                Optional<Map.Entry<SSHClient, SSHClientInfo>> minEntryOp = clientInfoMap.entrySet().stream().min(Comparator.comparing(entry -> entry.getValue().sessionCount));
+                Optional<Map.Entry<SSHClientWrapper, SSHClientInfo>> minEntryOp = clientInfoMap.entrySet().stream().min(Comparator.comparing(entry -> entry.getValue().sessionCount));
                 if (minEntryOp.isPresent()) {
-                    Map.Entry<SSHClient, SSHClientInfo> minEntry = minEntryOp.get();
+                    Map.Entry<SSHClientWrapper, SSHClientInfo> minEntry = minEntryOp.get();
                     // use the connection with least amount of sessions created.
 
                     logger.debug("Session count for selected connection {} is {}. Threshold {} for host {}",
@@ -160,13 +161,12 @@ public class PoolingSSHJClient extends SSHClient {
                         minEntry.getValue().setSessionCount(minEntry.getValue().getSessionCount() + 1);
                         minEntry.getValue().setLastAccessedTime(System.currentTimeMillis());
 
-                        SSHClient sshClient = minEntry.getKey();
+                        SSHClientWrapper sshClient = minEntry.getKey();
 
-                        if (!sshClient.isConnected() || !sshClient.isAuthenticated()) {
+                        if (!sshClient.isConnected() || !sshClient.isAuthenticated() || !sshClient.isErrored()) {
                             logger.warn("Client for host {} is not connected or not authenticated. Creating a new client", host);
                             removeDisconnectedClients(sshClient);
                             return newClientWithSessionValidation();
-
                         } else {
                             return sshClient;
                         }
@@ -181,10 +181,17 @@ public class PoolingSSHJClient extends SSHClient {
         }
     }
 
-    private void removeDisconnectedClients(SSHClient client) {
+    private void removeDisconnectedClients(SSHClientWrapper client) {
         lock.writeLock().lock();
 
         try {
+            client.disconnect();
+        } catch (Exception e) {
+            log.warn("Errored while disconnecting the client " + e.getMessage());
+            // Ignore
+        }
+
+        try {
             if (clientInfoMap.containsKey(client)) {
                 logger.debug("Removing the disconnected connection {} for host {}", clientInfoMap.get(client).getClientId(), host);
                 clientInfoMap.remove(client);
@@ -195,7 +202,7 @@ public class PoolingSSHJClient extends SSHClient {
         }
     }
 
-    private void untrackClosedSessions(SSHClient client, int sessionId) {
+    private void untrackClosedSessions(SSHClientWrapper client, int sessionId) {
         lock.writeLock().lock();
 
         try {
@@ -211,7 +218,7 @@ public class PoolingSSHJClient extends SSHClient {
     }
 
     private void removeStaleConnections() {
-        List<Map.Entry<SSHClient, SSHClientInfo>> entriesTobeRemoved;
+        List<Map.Entry<SSHClientWrapper, SSHClientInfo>> entriesTobeRemoved;
         lock.writeLock().lock();
         logger.info("Current active connections for  {} @ {} : {} are {}", username, host, port, clientInfoMap.size());
         try {
@@ -235,13 +242,13 @@ public class PoolingSSHJClient extends SSHClient {
         });
     }
 
-    private SSHClient createNewSSHClient() throws IOException {
+    private SSHClientWrapper createNewSSHClient() throws IOException {
 
-        SSHClient sshClient;
+        SSHClientWrapper sshClient;
         if (config != null) {
-            sshClient = new SSHClient(config);
+            sshClient = new SSHClientWrapper(config);
         } else {
-            sshClient = new SSHClient();
+            sshClient = new SSHClientWrapper();
         }
 
         sshClient.getConnection().getTransport().setDisconnectListener(new DisconnectListener() {
@@ -267,15 +274,20 @@ public class PoolingSSHJClient extends SSHClient {
         return sshClient;
     }
 
-    public Session startSessionWrapper() throws Exception {
+    public SessionWrapper startSessionWrapper() throws Exception {
 
-        final SSHClient sshClient = leaseSSHClient();
+        final SSHClientWrapper sshClient = leaseSSHClient();
 
         try {
-            return new SessionWrapper(sshClient.startSession(), (id) -> untrackClosedSessions(sshClient, id));
+            return new SessionWrapper(sshClient.startSession(), (id) -> untrackClosedSessions(sshClient, id), sshClient);
 
         } catch (Exception e) {
             if (sshClient != null) {
+                // If it is a ConnectionExceptions, explicitly invalidate the client
+                if (e instanceof ConnectionException) {
+                    sshClient.setErrored(true);
+                }
+
                 untrackClosedSessions(sshClient, -1);
             }
             throw e;
@@ -284,28 +296,39 @@ public class PoolingSSHJClient extends SSHClient {
 
     public SCPFileTransferWrapper newSCPFileTransferWrapper() throws Exception {
 
-        final SSHClient sshClient = leaseSSHClient();
+        final SSHClientWrapper sshClient = leaseSSHClient();
 
         try {
-            return new SCPFileTransferWrapper(sshClient.newSCPFileTransfer(), (id) -> untrackClosedSessions(sshClient, id));
+            return new SCPFileTransferWrapper(sshClient.newSCPFileTransfer(), (id) -> untrackClosedSessions(sshClient, id), sshClient);
 
         } catch (Exception e) {
+
             if (sshClient != null) {
+                // If it is a ConnectionExceptions, explicitly invalidate the client
+                if (e instanceof ConnectionException) {
+                    sshClient.setErrored(true);
+                }
+
                 untrackClosedSessions(sshClient, -1);
             }
             throw e;
         }
     }
 
-    public SFTPClient newSFTPClientWrapper() throws Exception {
+    public SFTPClientWrapper newSFTPClientWrapper() throws Exception {
 
-        final SSHClient sshClient = leaseSSHClient();
+        final SSHClientWrapper sshClient = leaseSSHClient();
 
         try {
-            return new SFTPClientWrapper(sshClient.newSFTPClient(), (id) -> untrackClosedSessions(sshClient, id));
+            return new SFTPClientWrapper(sshClient.newSFTPClient(), (id) -> untrackClosedSessions(sshClient, id), sshClient);
         } catch (Exception e) {
 
             if (sshClient != null) {
+                // If it is a ConnectionExceptions, explicitly invalidate the client
+                if (e instanceof ConnectionException) {
+                    sshClient.setErrored(true);
+                }
+
                 untrackClosedSessions(sshClient, -1);
             }
             throw e;
@@ -405,7 +428,7 @@ public class PoolingSSHJClient extends SSHClient {
         return this;
     }
 
-    public Map<SSHClient, SSHClientInfo> getClientInfoMap() {
+    public Map<SSHClientWrapper, SSHClientInfo> getClientInfoMap() {
         return clientInfoMap;
     }
 }
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
index 05ce146..5eae723 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
@@ -22,6 +22,7 @@ package org.apache.airavata.helix.adaptor;
 import com.google.common.collect.Lists;
 import net.schmizz.keepalive.KeepAliveProvider;
 import net.schmizz.sshj.DefaultConfig;
+import net.schmizz.sshj.connection.ConnectionException;
 import net.schmizz.sshj.connection.channel.direct.Session;
 import net.schmizz.sshj.sftp.*;
 import net.schmizz.sshj.userauth.keyprovider.KeyProvider;
@@ -38,6 +39,8 @@ import net.schmizz.sshj.xfer.LocalFileFilter;
 import net.schmizz.sshj.xfer.LocalSourceFile;
 import org.apache.airavata.agents.api.*;
 import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
+import org.apache.airavata.helix.adaptor.wrapper.SFTPClientWrapper;
+import org.apache.airavata.helix.adaptor.wrapper.SessionWrapper;
 import org.apache.airavata.helix.agent.ssh.StandardOutReader;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
@@ -165,8 +168,9 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
 
     @Override
     public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException {
-        try (Session session = sshjClient.startSessionWrapper()) {
-
+        SessionWrapper session = null;
+        try {
+            session = sshjClient.startSessionWrapper();
             Session.Command exec = session.exec((workingDirectory != null ? "cd " + workingDirectory + "; " : "") + command);
             StandardOutReader standardOutReader = new StandardOutReader();
 
@@ -178,8 +182,21 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
                 standardOutReader.setExitCode(Optional.ofNullable(exec.getExitStatus()).orElseThrow(() -> new Exception("Exit status received as null")));
             }
             return standardOutReader;
+
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(session).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(session).ifPresent(ss -> {
+                try {
+                    ss.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
@@ -190,29 +207,61 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
 
     @Override
     public void createDirectory(String path, boolean recursive) throws AgentException {
-        try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+        SFTPClientWrapper sftpClient = null;
+        try {
+            sftpClient = sshjClient.newSFTPClientWrapper();
             if (recursive) {
                 sftpClient.mkdirs(path);
             } else {
                 sftpClient.mkdir(path);
             }
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(sftpClient).ifPresent(client -> {
+                try {
+                    client.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
     @Override
     public void uploadFile(String localFile, String remoteFile) throws AgentException {
-        try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+        SCPFileTransferWrapper fileTransfer = null;
+        try {
+            fileTransfer = sshjClient.newSCPFileTransferWrapper();
             fileTransfer.upload(localFile, remoteFile);
+
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> {
+                try {
+                    scpFileTransferWrapper.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
     @Override
     public void uploadFile(InputStream localInStream, FileMetadata metadata, String remoteFile) throws AgentException {
-        try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+        SCPFileTransferWrapper fileTransfer = null;
+
+        try {
+            fileTransfer = sshjClient.newSCPFileTransferWrapper();
             fileTransfer.upload(new LocalSourceFile() {
                 @Override
                 public String getName() {
@@ -265,22 +314,51 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
                 }
             }, remoteFile);
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> {
+                try {
+                    scpFileTransferWrapper.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
     @Override
     public void downloadFile(String remoteFile, String localFile) throws AgentException {
-        try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+        SCPFileTransferWrapper fileTransfer = null;
+        try {
+            fileTransfer = sshjClient.newSCPFileTransferWrapper();
             fileTransfer.download(remoteFile, localFile);
+
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> {
+                try {
+                    scpFileTransferWrapper.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
     @Override
     public void downloadFile(String remoteFile, OutputStream localOutStream, FileMetadata metadata) throws AgentException {
-        try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+        SCPFileTransferWrapper fileTransfer = null;
+        try {
+            fileTransfer = sshjClient.newSCPFileTransferWrapper();
             fileTransfer.download(remoteFile, new LocalDestFile() {
                 @Override
                 public OutputStream getOutputStream() throws IOException {
@@ -318,43 +396,98 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
                 }
             });
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> {
+                try {
+                    scpFileTransferWrapper.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
     @Override
     public List<String> listDirectory(String path) throws AgentException {
-        try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+        SFTPClientWrapper sftpClient = null;
+        try {
+            sftpClient = sshjClient.newSFTPClientWrapper();
             List<RemoteResourceInfo> ls = sftpClient.ls(path);
             return ls.stream().map(RemoteResourceInfo::getName).collect(Collectors.toList());
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(sftpClient).ifPresent(client -> {
+                try {
+                    client.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
     @Override
     public Boolean doesFileExist(String filePath) throws AgentException {
-        try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+        SFTPClientWrapper sftpClient = null;
+        try {
+            sftpClient = sshjClient.newSFTPClientWrapper();
             return sftpClient.statExistence(filePath) != null;
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(sftpClient).ifPresent(client -> {
+                try {
+                    client.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
     @Override
     public List<String> getFileNameFromExtension(String fileName, String parentPath) throws AgentException {
-
-        try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+        SFTPClientWrapper sftpClient = null;
+        try  {
+            sftpClient = sshjClient.newSFTPClientWrapper();
             List<RemoteResourceInfo> ls = sftpClient.ls(parentPath, resource -> isMatch(resource.getName(), fileName));
             return ls.stream().map(RemoteResourceInfo::getPath).collect(Collectors.toList());
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(sftpClient).ifPresent(scpFileTransferWrapper -> {
+                try {
+                    scpFileTransferWrapper.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
     @Override
     public FileMetadata getFileMetadata(String remoteFile) throws AgentException {
-        try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+        SFTPClientWrapper sftpClient = null;
+        try {
+            sftpClient = sshjClient.newSFTPClientWrapper();
             FileAttributes stat = sftpClient.stat(remoteFile);
             FileMetadata metadata = new FileMetadata();
             metadata.setName(new File(remoteFile).getName());
@@ -362,7 +495,19 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
             metadata.setPermissions(FilePermission.toMask(stat.getPermissions()));
             return metadata;
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(sftpClient).ifPresent(scpFileTransferWrapper -> {
+                try {
+                    scpFileTransferWrapper.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java
index ff49e5b..2aaad74 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java
@@ -33,10 +33,12 @@ public class SCPFileTransferWrapper implements FileTransfer, Closeable {
 
     private SCPFileTransfer scpFileTransfer;
     private Consumer<Integer> onCloseFunction;
+    private SSHClientWrapper originalSSHClient;
 
-    public SCPFileTransferWrapper(SCPFileTransfer scpFileTransfer, Consumer<Integer> onCloseFunction) {
+    public SCPFileTransferWrapper(SCPFileTransfer scpFileTransfer, Consumer<Integer> onCloseFunction, SSHClientWrapper originalSSHClient) {
         this.scpFileTransfer = scpFileTransfer;
         this.onCloseFunction = onCloseFunction;
+        this.originalSSHClient = originalSSHClient;
     }
 
     @Override
@@ -73,4 +75,12 @@ public class SCPFileTransferWrapper implements FileTransfer, Closeable {
     public void close() throws IOException {
         onCloseFunction.accept(-1);
     }
+
+    public boolean isErrored() {
+        return originalSSHClient.isErrored();
+    }
+
+    public void setErrored(boolean errored) {
+        this.originalSSHClient.setErrored(errored);
+    }
 }
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java
index ed21510..635ddfd 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java
@@ -27,10 +27,12 @@ import java.util.function.Consumer;
 public class SFTPClientWrapper extends SFTPClient {
     private SFTPClient sftpClient;
     private Consumer<Integer> onCloseFunction;
+    private SSHClientWrapper originalSSHClient;
 
-    public SFTPClientWrapper(SFTPClient sftpClient, Consumer<Integer> onCloseFunction) {
+    public SFTPClientWrapper(SFTPClient sftpClient, Consumer<Integer> onCloseFunction, SSHClientWrapper originalSSHClient) {
         super(sftpClient.getSFTPEngine());
         this.onCloseFunction = onCloseFunction;
+        this.originalSSHClient = originalSSHClient;
     }
 
     @Override
@@ -38,4 +40,12 @@ public class SFTPClientWrapper extends SFTPClient {
         onCloseFunction.accept(-1);
         super.close();
     }
+
+    public boolean isErrored() {
+        return originalSSHClient.isErrored();
+    }
+
+    public void setErrored(boolean errored) {
+        this.originalSSHClient.setErrored(errored);
+    }
 }
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SSHClientWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SSHClientWrapper.java
new file mode 100644
index 0000000..ff66ca1
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SSHClientWrapper.java
@@ -0,0 +1,25 @@
+package org.apache.airavata.helix.adaptor.wrapper;
+
+import net.schmizz.sshj.Config;
+import net.schmizz.sshj.SSHClient;
+
+public class SSHClientWrapper extends SSHClient {
+
+    public SSHClientWrapper() {
+        super();
+    }
+
+    public SSHClientWrapper(Config config) {
+        super(config);
+    }
+
+    private boolean errored = false;
+
+    public boolean isErrored() {
+        return errored;
+    }
+
+    public void setErrored(boolean errored) {
+        this.errored = errored;
+    }
+}
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java
index a1899b1..6713d5f 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java
@@ -39,10 +39,12 @@ public class SessionWrapper implements Session {
 
     private Session session;
     private Consumer<Integer> onCloseFunction;
+    private SSHClientWrapper originalSSHClient;
 
-    public SessionWrapper(Session session, Consumer<Integer> onCloseFunction) {
+    public SessionWrapper(Session session, Consumer<Integer> onCloseFunction, SSHClientWrapper originalSSHClient) {
         this.session = session;
         this.onCloseFunction = onCloseFunction;
+        this.originalSSHClient = originalSSHClient;
     }
 
     @Override
@@ -180,4 +182,12 @@ public class SessionWrapper implements Session {
     public void handle(Message msg, SSHPacket buf) throws SSHException {
         session.handle(msg, buf);
     }
+
+    public boolean isErrored() {
+        return originalSSHClient.isErrored();
+    }
+
+    public void setErrored(boolean errored) {
+        this.originalSSHClient.setErrored(errored);
+    }
 }
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index a7de7ad..49b516b 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -160,22 +160,28 @@ public class PreWorkflowManager extends WorkflowManager {
         }
 
         String experimentId = processModel.getExperimentId();
-
-        List<String> workflows = processModel.getProcessWorkflows().stream().map(ProcessWorkflow::getWorkflowId).collect(Collectors.toList());
         final List<AbstractTask> allTasks = new ArrayList<>();
-        if (workflows != null && workflows.size() > 0) {
-            for (String wf : workflows) {
-                logger.info("Creating cancellation task for workflow " + wf + " of process " + processId);
-                WorkflowCancellationTask wfct = new WorkflowCancellationTask();
-                wfct.setTaskId(UUID.randomUUID().toString());
-                wfct.setCancellingWorkflowName(wf);
-
-                if (allTasks.size() > 0) {
-                    allTasks.get(allTasks.size() -1).setNextTask(new OutPort(wfct.getTaskId(), wfct));
+
+        Optional<List<String>> workflowsOpt = Optional.ofNullable(processModel.getProcessWorkflows()).map(wfs -> wfs.stream().map(ProcessWorkflow::getWorkflowId).collect(Collectors.toList()));
+
+        if (workflowsOpt.isPresent()) {
+            List<String> workflows = workflowsOpt.get();
+            if (workflows.size() > 0) {
+                for (String wf : workflows) {
+                    logger.info("Creating cancellation task for workflow " + wf + " of process " + processId);
+                    WorkflowCancellationTask wfct = new WorkflowCancellationTask();
+                    wfct.setTaskId(UUID.randomUUID().toString());
+                    wfct.setCancellingWorkflowName(wf);
+
+                    if (allTasks.size() > 0) {
+                        allTasks.get(allTasks.size() - 1).setNextTask(new OutPort(wfct.getTaskId(), wfct));
+                    }
+                    allTasks.add(wfct);
                 }
-                allTasks.add(wfct);
-            }
 
+            } else {
+                logger.warn("No workflow registered with process " + processId + " to cancel");
+            }
         } else {
             logger.warn("No workflow registered with process " + processId + " to cancel");
         }
@@ -212,6 +218,7 @@ public class PreWorkflowManager extends WorkflowManager {
     public static void main(String[] args) throws Exception {
         PreWorkflowManager preWorkflowManager = new PreWorkflowManager();
         preWorkflowManager.startServer();
+        preWorkflowManager.createAndLaunchCancelWorkflow("PROCESS_7e19c779-e326-43d2-b025-aaa3a4b44c95","ultrascan");
     }
 
     private class ProcessLaunchMessageHandler implements MessageHandler {