You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/01/03 18:02:27 UTC

[airavata] branch airavata-3287 created (now 8213960)

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a change to branch airavata-3287
in repository https://gitbox.apache.org/repos/asf/airavata.git.


      at 8213960  Fixing airavata-3287: Evicting invalidated ssh clients from the pool

This branch includes the following new commits:

     new 8213960  Fixing airavata-3287: Evicting invalidated ssh clients from the pool

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[airavata] 01/01: Fixing airavata-3287: Evicting invalidated ssh clients from the pool

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch airavata-3287
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 82139606a0ad46cdd740da5e92bc30196565b027
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Fri Jan 3 13:02:11 2020 -0500

    Fixing airavata-3287: Evicting invalidated ssh clients from the pool
---
 .../airavata/helix/adaptor/PoolingSSHJClient.java  |  75 +++++----
 .../airavata/helix/adaptor/SSHJAgentAdaptor.java   | 169 +++++++++++++++++++--
 .../adaptor/wrapper/SCPFileTransferWrapper.java    |  12 +-
 .../helix/adaptor/wrapper/SFTPClientWrapper.java   |  12 +-
 .../helix/adaptor/wrapper/SSHClientWrapper.java    |  25 +++
 .../helix/adaptor/wrapper/SessionWrapper.java      |  12 +-
 .../helix/impl/workflow/PreWorkflowManager.java    |  33 ++--
 7 files changed, 284 insertions(+), 54 deletions(-)

diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
index 7b8eb94..05f37ac 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
@@ -22,7 +22,7 @@ package org.apache.airavata.helix.adaptor;
 import net.schmizz.sshj.Config;
 import net.schmizz.sshj.SSHClient;
 import net.schmizz.sshj.common.DisconnectReason;
-import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.connection.ConnectionException;
 import net.schmizz.sshj.sftp.RemoteFile;
 import net.schmizz.sshj.sftp.SFTPClient;
 import net.schmizz.sshj.transport.DisconnectListener;
@@ -32,6 +32,7 @@ import net.schmizz.sshj.userauth.UserAuthException;
 import net.schmizz.sshj.userauth.method.AuthMethod;
 import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
 import org.apache.airavata.helix.adaptor.wrapper.SFTPClientWrapper;
+import org.apache.airavata.helix.adaptor.wrapper.SSHClientWrapper;
 import org.apache.airavata.helix.adaptor.wrapper.SessionWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +55,7 @@ public class PoolingSSHJClient extends SSHClient {
     private final static Logger logger = LoggerFactory.getLogger(PoolingSSHJClient.class);
 
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
-    private final Map<SSHClient, SSHClientInfo> clientInfoMap = new HashMap<>();
+    private final Map<SSHClientWrapper, SSHClientInfo> clientInfoMap = new HashMap<>();
 
     private HostKeyVerifier hostKeyVerifier;
     private String username;
@@ -96,8 +97,8 @@ public class PoolingSSHJClient extends SSHClient {
 
     ////////////////// client specific operations ///////
 
-    private SSHClient newClientWithSessionValidation() throws IOException {
-        SSHClient newClient = createNewSSHClient();
+    private SSHClientWrapper newClientWithSessionValidation() throws IOException {
+        SSHClientWrapper newClient = createNewSSHClient();
         SSHClientInfo info = new SSHClientInfo(1, System.currentTimeMillis(), clientInfoMap.size());
         clientInfoMap.put(newClient, info);
 
@@ -132,7 +133,7 @@ public class PoolingSSHJClient extends SSHClient {
         return newClient;
     }
 
-    private SSHClient leaseSSHClient() throws Exception {
+    private SSHClientWrapper leaseSSHClient() throws Exception {
         lock.writeLock().lock();
 
         try {
@@ -141,9 +142,9 @@ public class PoolingSSHJClient extends SSHClient {
 
             } else {
 
-                Optional<Map.Entry<SSHClient, SSHClientInfo>> minEntryOp = clientInfoMap.entrySet().stream().min(Comparator.comparing(entry -> entry.getValue().sessionCount));
+                Optional<Map.Entry<SSHClientWrapper, SSHClientInfo>> minEntryOp = clientInfoMap.entrySet().stream().min(Comparator.comparing(entry -> entry.getValue().sessionCount));
                 if (minEntryOp.isPresent()) {
-                    Map.Entry<SSHClient, SSHClientInfo> minEntry = minEntryOp.get();
+                    Map.Entry<SSHClientWrapper, SSHClientInfo> minEntry = minEntryOp.get();
                     // use the connection with least amount of sessions created.
 
                     logger.debug("Session count for selected connection {} is {}. Threshold {} for host {}",
@@ -160,13 +161,12 @@ public class PoolingSSHJClient extends SSHClient {
                         minEntry.getValue().setSessionCount(minEntry.getValue().getSessionCount() + 1);
                         minEntry.getValue().setLastAccessedTime(System.currentTimeMillis());
 
-                        SSHClient sshClient = minEntry.getKey();
+                        SSHClientWrapper sshClient = minEntry.getKey();
 
-                        if (!sshClient.isConnected() || !sshClient.isAuthenticated()) {
+                        if (!sshClient.isConnected() || !sshClient.isAuthenticated() || !sshClient.isErrored()) {
                             logger.warn("Client for host {} is not connected or not authenticated. Creating a new client", host);
                             removeDisconnectedClients(sshClient);
                             return newClientWithSessionValidation();
-
                         } else {
                             return sshClient;
                         }
@@ -181,10 +181,17 @@ public class PoolingSSHJClient extends SSHClient {
         }
     }
 
-    private void removeDisconnectedClients(SSHClient client) {
+    private void removeDisconnectedClients(SSHClientWrapper client) {
         lock.writeLock().lock();
 
         try {
+            client.disconnect();
+        } catch (Exception e) {
+            log.warn("Errored while disconnecting the client " + e.getMessage());
+            // Ignore
+        }
+
+        try {
             if (clientInfoMap.containsKey(client)) {
                 logger.debug("Removing the disconnected connection {} for host {}", clientInfoMap.get(client).getClientId(), host);
                 clientInfoMap.remove(client);
@@ -195,7 +202,7 @@ public class PoolingSSHJClient extends SSHClient {
         }
     }
 
-    private void untrackClosedSessions(SSHClient client, int sessionId) {
+    private void untrackClosedSessions(SSHClientWrapper client, int sessionId) {
         lock.writeLock().lock();
 
         try {
@@ -211,7 +218,7 @@ public class PoolingSSHJClient extends SSHClient {
     }
 
     private void removeStaleConnections() {
-        List<Map.Entry<SSHClient, SSHClientInfo>> entriesTobeRemoved;
+        List<Map.Entry<SSHClientWrapper, SSHClientInfo>> entriesTobeRemoved;
         lock.writeLock().lock();
         logger.info("Current active connections for  {} @ {} : {} are {}", username, host, port, clientInfoMap.size());
         try {
@@ -235,13 +242,13 @@ public class PoolingSSHJClient extends SSHClient {
         });
     }
 
-    private SSHClient createNewSSHClient() throws IOException {
+    private SSHClientWrapper createNewSSHClient() throws IOException {
 
-        SSHClient sshClient;
+        SSHClientWrapper sshClient;
         if (config != null) {
-            sshClient = new SSHClient(config);
+            sshClient = new SSHClientWrapper(config);
         } else {
-            sshClient = new SSHClient();
+            sshClient = new SSHClientWrapper();
         }
 
         sshClient.getConnection().getTransport().setDisconnectListener(new DisconnectListener() {
@@ -267,15 +274,20 @@ public class PoolingSSHJClient extends SSHClient {
         return sshClient;
     }
 
-    public Session startSessionWrapper() throws Exception {
+    public SessionWrapper startSessionWrapper() throws Exception {
 
-        final SSHClient sshClient = leaseSSHClient();
+        final SSHClientWrapper sshClient = leaseSSHClient();
 
         try {
-            return new SessionWrapper(sshClient.startSession(), (id) -> untrackClosedSessions(sshClient, id));
+            return new SessionWrapper(sshClient.startSession(), (id) -> untrackClosedSessions(sshClient, id), sshClient);
 
         } catch (Exception e) {
             if (sshClient != null) {
+                // If it is a ConnectionExceptions, explicitly invalidate the client
+                if (e instanceof ConnectionException) {
+                    sshClient.setErrored(true);
+                }
+
                 untrackClosedSessions(sshClient, -1);
             }
             throw e;
@@ -284,28 +296,39 @@ public class PoolingSSHJClient extends SSHClient {
 
     public SCPFileTransferWrapper newSCPFileTransferWrapper() throws Exception {
 
-        final SSHClient sshClient = leaseSSHClient();
+        final SSHClientWrapper sshClient = leaseSSHClient();
 
         try {
-            return new SCPFileTransferWrapper(sshClient.newSCPFileTransfer(), (id) -> untrackClosedSessions(sshClient, id));
+            return new SCPFileTransferWrapper(sshClient.newSCPFileTransfer(), (id) -> untrackClosedSessions(sshClient, id), sshClient);
 
         } catch (Exception e) {
+
             if (sshClient != null) {
+                // If it is a ConnectionExceptions, explicitly invalidate the client
+                if (e instanceof ConnectionException) {
+                    sshClient.setErrored(true);
+                }
+
                 untrackClosedSessions(sshClient, -1);
             }
             throw e;
         }
     }
 
-    public SFTPClient newSFTPClientWrapper() throws Exception {
+    public SFTPClientWrapper newSFTPClientWrapper() throws Exception {
 
-        final SSHClient sshClient = leaseSSHClient();
+        final SSHClientWrapper sshClient = leaseSSHClient();
 
         try {
-            return new SFTPClientWrapper(sshClient.newSFTPClient(), (id) -> untrackClosedSessions(sshClient, id));
+            return new SFTPClientWrapper(sshClient.newSFTPClient(), (id) -> untrackClosedSessions(sshClient, id), sshClient);
         } catch (Exception e) {
 
             if (sshClient != null) {
+                // If it is a ConnectionExceptions, explicitly invalidate the client
+                if (e instanceof ConnectionException) {
+                    sshClient.setErrored(true);
+                }
+
                 untrackClosedSessions(sshClient, -1);
             }
             throw e;
@@ -405,7 +428,7 @@ public class PoolingSSHJClient extends SSHClient {
         return this;
     }
 
-    public Map<SSHClient, SSHClientInfo> getClientInfoMap() {
+    public Map<SSHClientWrapper, SSHClientInfo> getClientInfoMap() {
         return clientInfoMap;
     }
 }
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
index 05ce146..5eae723 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
@@ -22,6 +22,7 @@ package org.apache.airavata.helix.adaptor;
 import com.google.common.collect.Lists;
 import net.schmizz.keepalive.KeepAliveProvider;
 import net.schmizz.sshj.DefaultConfig;
+import net.schmizz.sshj.connection.ConnectionException;
 import net.schmizz.sshj.connection.channel.direct.Session;
 import net.schmizz.sshj.sftp.*;
 import net.schmizz.sshj.userauth.keyprovider.KeyProvider;
@@ -38,6 +39,8 @@ import net.schmizz.sshj.xfer.LocalFileFilter;
 import net.schmizz.sshj.xfer.LocalSourceFile;
 import org.apache.airavata.agents.api.*;
 import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
+import org.apache.airavata.helix.adaptor.wrapper.SFTPClientWrapper;
+import org.apache.airavata.helix.adaptor.wrapper.SessionWrapper;
 import org.apache.airavata.helix.agent.ssh.StandardOutReader;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
@@ -165,8 +168,9 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
 
     @Override
     public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException {
-        try (Session session = sshjClient.startSessionWrapper()) {
-
+        SessionWrapper session = null;
+        try {
+            session = sshjClient.startSessionWrapper();
             Session.Command exec = session.exec((workingDirectory != null ? "cd " + workingDirectory + "; " : "") + command);
             StandardOutReader standardOutReader = new StandardOutReader();
 
@@ -178,8 +182,21 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
                 standardOutReader.setExitCode(Optional.ofNullable(exec.getExitStatus()).orElseThrow(() -> new Exception("Exit status received as null")));
             }
             return standardOutReader;
+
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(session).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(session).ifPresent(ss -> {
+                try {
+                    ss.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
@@ -190,29 +207,61 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
 
     @Override
     public void createDirectory(String path, boolean recursive) throws AgentException {
-        try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+        SFTPClientWrapper sftpClient = null;
+        try {
+            sftpClient = sshjClient.newSFTPClientWrapper();
             if (recursive) {
                 sftpClient.mkdirs(path);
             } else {
                 sftpClient.mkdir(path);
             }
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(sftpClient).ifPresent(client -> {
+                try {
+                    client.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
     @Override
     public void uploadFile(String localFile, String remoteFile) throws AgentException {
-        try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+        SCPFileTransferWrapper fileTransfer = null;
+        try {
+            fileTransfer = sshjClient.newSCPFileTransferWrapper();
             fileTransfer.upload(localFile, remoteFile);
+
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> {
+                try {
+                    scpFileTransferWrapper.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
     @Override
     public void uploadFile(InputStream localInStream, FileMetadata metadata, String remoteFile) throws AgentException {
-        try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+        SCPFileTransferWrapper fileTransfer = null;
+
+        try {
+            fileTransfer = sshjClient.newSCPFileTransferWrapper();
             fileTransfer.upload(new LocalSourceFile() {
                 @Override
                 public String getName() {
@@ -265,22 +314,51 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
                 }
             }, remoteFile);
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> {
+                try {
+                    scpFileTransferWrapper.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
     @Override
     public void downloadFile(String remoteFile, String localFile) throws AgentException {
-        try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+        SCPFileTransferWrapper fileTransfer = null;
+        try {
+            fileTransfer = sshjClient.newSCPFileTransferWrapper();
             fileTransfer.download(remoteFile, localFile);
+
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> {
+                try {
+                    scpFileTransferWrapper.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
     @Override
     public void downloadFile(String remoteFile, OutputStream localOutStream, FileMetadata metadata) throws AgentException {
-        try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+        SCPFileTransferWrapper fileTransfer = null;
+        try {
+            fileTransfer = sshjClient.newSCPFileTransferWrapper();
             fileTransfer.download(remoteFile, new LocalDestFile() {
                 @Override
                 public OutputStream getOutputStream() throws IOException {
@@ -318,43 +396,98 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
                 }
             });
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> {
+                try {
+                    scpFileTransferWrapper.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
     @Override
     public List<String> listDirectory(String path) throws AgentException {
-        try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+        SFTPClientWrapper sftpClient = null;
+        try {
+            sftpClient = sshjClient.newSFTPClientWrapper();
             List<RemoteResourceInfo> ls = sftpClient.ls(path);
             return ls.stream().map(RemoteResourceInfo::getName).collect(Collectors.toList());
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(sftpClient).ifPresent(client -> {
+                try {
+                    client.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
     @Override
     public Boolean doesFileExist(String filePath) throws AgentException {
-        try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+        SFTPClientWrapper sftpClient = null;
+        try {
+            sftpClient = sshjClient.newSFTPClientWrapper();
             return sftpClient.statExistence(filePath) != null;
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(sftpClient).ifPresent(client -> {
+                try {
+                    client.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
     @Override
     public List<String> getFileNameFromExtension(String fileName, String parentPath) throws AgentException {
-
-        try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+        SFTPClientWrapper sftpClient = null;
+        try  {
+            sftpClient = sshjClient.newSFTPClientWrapper();
             List<RemoteResourceInfo> ls = sftpClient.ls(parentPath, resource -> isMatch(resource.getName(), fileName));
             return ls.stream().map(RemoteResourceInfo::getPath).collect(Collectors.toList());
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(sftpClient).ifPresent(scpFileTransferWrapper -> {
+                try {
+                    scpFileTransferWrapper.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
     @Override
     public FileMetadata getFileMetadata(String remoteFile) throws AgentException {
-        try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+        SFTPClientWrapper sftpClient = null;
+        try {
+            sftpClient = sshjClient.newSFTPClientWrapper();
             FileAttributes stat = sftpClient.stat(remoteFile);
             FileMetadata metadata = new FileMetadata();
             metadata.setName(new File(remoteFile).getName());
@@ -362,7 +495,19 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
             metadata.setPermissions(FilePermission.toMask(stat.getPermissions()));
             return metadata;
         } catch (Exception e) {
+            if (e instanceof ConnectionException) {
+                Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
+            }
             throw new AgentException(e);
+
+        } finally {
+            Optional.ofNullable(sftpClient).ifPresent(scpFileTransferWrapper -> {
+                try {
+                    scpFileTransferWrapper.close();
+                } catch (IOException e) {
+                    //Ignore
+                }
+            });
         }
     }
 
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java
index ff49e5b..2aaad74 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java
@@ -33,10 +33,12 @@ public class SCPFileTransferWrapper implements FileTransfer, Closeable {
 
     private SCPFileTransfer scpFileTransfer;
     private Consumer<Integer> onCloseFunction;
+    private SSHClientWrapper originalSSHClient;
 
-    public SCPFileTransferWrapper(SCPFileTransfer scpFileTransfer, Consumer<Integer> onCloseFunction) {
+    public SCPFileTransferWrapper(SCPFileTransfer scpFileTransfer, Consumer<Integer> onCloseFunction, SSHClientWrapper originalSSHClient) {
         this.scpFileTransfer = scpFileTransfer;
         this.onCloseFunction = onCloseFunction;
+        this.originalSSHClient = originalSSHClient;
     }
 
     @Override
@@ -73,4 +75,12 @@ public class SCPFileTransferWrapper implements FileTransfer, Closeable {
     public void close() throws IOException {
         onCloseFunction.accept(-1);
     }
+
+    public boolean isErrored() {
+        return originalSSHClient.isErrored();
+    }
+
+    public void setErrored(boolean errored) {
+        this.originalSSHClient.setErrored(errored);
+    }
 }
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java
index ed21510..635ddfd 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java
@@ -27,10 +27,12 @@ import java.util.function.Consumer;
 public class SFTPClientWrapper extends SFTPClient {
     private SFTPClient sftpClient;
     private Consumer<Integer> onCloseFunction;
+    private SSHClientWrapper originalSSHClient;
 
-    public SFTPClientWrapper(SFTPClient sftpClient, Consumer<Integer> onCloseFunction) {
+    public SFTPClientWrapper(SFTPClient sftpClient, Consumer<Integer> onCloseFunction, SSHClientWrapper originalSSHClient) {
         super(sftpClient.getSFTPEngine());
         this.onCloseFunction = onCloseFunction;
+        this.originalSSHClient = originalSSHClient;
     }
 
     @Override
@@ -38,4 +40,12 @@ public class SFTPClientWrapper extends SFTPClient {
         onCloseFunction.accept(-1);
         super.close();
     }
+
+    public boolean isErrored() {
+        return originalSSHClient.isErrored();
+    }
+
+    public void setErrored(boolean errored) {
+        this.originalSSHClient.setErrored(errored);
+    }
 }
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SSHClientWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SSHClientWrapper.java
new file mode 100644
index 0000000..ff66ca1
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SSHClientWrapper.java
@@ -0,0 +1,25 @@
+package org.apache.airavata.helix.adaptor.wrapper;
+
+import net.schmizz.sshj.Config;
+import net.schmizz.sshj.SSHClient;
+
+public class SSHClientWrapper extends SSHClient {
+
+    public SSHClientWrapper() {
+        super();
+    }
+
+    public SSHClientWrapper(Config config) {
+        super(config);
+    }
+
+    private boolean errored = false;
+
+    public boolean isErrored() {
+        return errored;
+    }
+
+    public void setErrored(boolean errored) {
+        this.errored = errored;
+    }
+}
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java
index a1899b1..6713d5f 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java
@@ -39,10 +39,12 @@ public class SessionWrapper implements Session {
 
     private Session session;
     private Consumer<Integer> onCloseFunction;
+    private SSHClientWrapper originalSSHClient;
 
-    public SessionWrapper(Session session, Consumer<Integer> onCloseFunction) {
+    public SessionWrapper(Session session, Consumer<Integer> onCloseFunction, SSHClientWrapper originalSSHClient) {
         this.session = session;
         this.onCloseFunction = onCloseFunction;
+        this.originalSSHClient = originalSSHClient;
     }
 
     @Override
@@ -180,4 +182,12 @@ public class SessionWrapper implements Session {
     public void handle(Message msg, SSHPacket buf) throws SSHException {
         session.handle(msg, buf);
     }
+
+    public boolean isErrored() {
+        return originalSSHClient.isErrored();
+    }
+
+    public void setErrored(boolean errored) {
+        this.originalSSHClient.setErrored(errored);
+    }
 }
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index a7de7ad..49b516b 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -160,22 +160,28 @@ public class PreWorkflowManager extends WorkflowManager {
         }
 
         String experimentId = processModel.getExperimentId();
-
-        List<String> workflows = processModel.getProcessWorkflows().stream().map(ProcessWorkflow::getWorkflowId).collect(Collectors.toList());
         final List<AbstractTask> allTasks = new ArrayList<>();
-        if (workflows != null && workflows.size() > 0) {
-            for (String wf : workflows) {
-                logger.info("Creating cancellation task for workflow " + wf + " of process " + processId);
-                WorkflowCancellationTask wfct = new WorkflowCancellationTask();
-                wfct.setTaskId(UUID.randomUUID().toString());
-                wfct.setCancellingWorkflowName(wf);
-
-                if (allTasks.size() > 0) {
-                    allTasks.get(allTasks.size() -1).setNextTask(new OutPort(wfct.getTaskId(), wfct));
+
+        Optional<List<String>> workflowsOpt = Optional.ofNullable(processModel.getProcessWorkflows()).map(wfs -> wfs.stream().map(ProcessWorkflow::getWorkflowId).collect(Collectors.toList()));
+
+        if (workflowsOpt.isPresent()) {
+            List<String> workflows = workflowsOpt.get();
+            if (workflows.size() > 0) {
+                for (String wf : workflows) {
+                    logger.info("Creating cancellation task for workflow " + wf + " of process " + processId);
+                    WorkflowCancellationTask wfct = new WorkflowCancellationTask();
+                    wfct.setTaskId(UUID.randomUUID().toString());
+                    wfct.setCancellingWorkflowName(wf);
+
+                    if (allTasks.size() > 0) {
+                        allTasks.get(allTasks.size() - 1).setNextTask(new OutPort(wfct.getTaskId(), wfct));
+                    }
+                    allTasks.add(wfct);
                 }
-                allTasks.add(wfct);
-            }
 
+            } else {
+                logger.warn("No workflow registered with process " + processId + " to cancel");
+            }
         } else {
             logger.warn("No workflow registered with process " + processId + " to cancel");
         }
@@ -212,6 +218,7 @@ public class PreWorkflowManager extends WorkflowManager {
     public static void main(String[] args) throws Exception {
         PreWorkflowManager preWorkflowManager = new PreWorkflowManager();
         preWorkflowManager.startServer();
+        preWorkflowManager.createAndLaunchCancelWorkflow("PROCESS_7e19c779-e326-43d2-b025-aaa3a4b44c95","ultrascan");
     }
 
     private class ProcessLaunchMessageHandler implements MessageHandler {