You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/04/13 17:49:54 UTC

[airavata] branch develop updated: Adding sshj adaptor and replacing jsch adaptor for SSH communication

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new a2acaac  Adding sshj adaptor and replacing jsch adaptor for SSH communication
a2acaac is described below

commit a2acaac097dfd24c85c1acbb4f041a4ee65a7d95
Author: dimuthu <di...@gmail.com>
AuthorDate: Fri Apr 13 13:49:47 2018 -0400

    Adding sshj adaptor and replacing jsch adaptor for SSH communication
---
 modules/airavata-helix/agent-api/pom.xml           |  13 +-
 .../org/apache/airavata/agents/api/AgentUtils.java |  34 ++
 modules/airavata-helix/agent-impl/pom.xml          |   1 +
 .../airavata-helix/agent-impl/ssh-agent/pom.xml    |  30 +-
 .../helix/agent/local/LocalAgentAdaptor.java       |  74 -----
 .../airavata/helix/agent/ssh/SshAgentAdaptor.java  |  36 +--
 .../agent/storage/StorageResourceAdaptorImpl.java  |   5 +-
 .../{agent-api => agent-impl/sshj-agent}/pom.xml   |  45 +--
 .../airavata/helix/adaptor/PoolingSSHJClient.java  | 345 +++++++++++++++++++++
 .../airavata/helix/adaptor/SSHJAgentAdaptor.java   | 217 +++++++++++++
 .../helix/adaptor/SSHJStorageAdaptor.java}         |  24 +-
 .../adaptor/wrapper/SCPFileTransferWrapper.java    |  76 +++++
 .../helix/adaptor/wrapper/SFTPClientWrapper.java   |  41 +++
 .../helix/adaptor/wrapper/SessionWrapper.java      | 183 +++++++++++
 modules/airavata-helix/helix-spectator/pom.xml     |  10 +
 modules/airavata-helix/task-core/pom.xml           |   2 +-
 .../helix/core/support/AdaptorSupportImpl.java     |   6 +-
 17 files changed, 964 insertions(+), 178 deletions(-)

diff --git a/modules/airavata-helix/agent-api/pom.xml b/modules/airavata-helix/agent-api/pom.xml
index f4ac36e..25e5fd6 100644
--- a/modules/airavata-helix/agent-api/pom.xml
+++ b/modules/airavata-helix/agent-api/pom.xml
@@ -41,9 +41,20 @@
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-registry-core</artifactId>
+            <artifactId>registry-api-stubs</artifactId>
             <version>0.17-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-credential-store-stubs</artifactId>
+            <version>0.17-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <!--<build>
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java
new file mode 100644
index 0000000..fa6c448
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java
@@ -0,0 +1,34 @@
+package org.apache.airavata.agents.api;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.client.CredentialStoreClientFactory;
+import org.apache.airavata.credential.store.cpi.CredentialStoreService;
+import org.apache.airavata.credential.store.exception.CredentialStoreException;
+import org.apache.airavata.registry.api.RegistryService;
+import org.apache.airavata.registry.api.client.RegistryServiceClientFactory;
+import org.apache.airavata.registry.api.exception.RegistryServiceException;
+
+public class AgentUtils {
+
+    // TODO this is inefficient. Try to use a connection pool
+    public static RegistryService.Client getRegistryServiceClient() throws AgentException {
+        try {
+            final int serverPort = Integer.parseInt(ServerSettings.getRegistryServerPort());
+            final String serverHost = ServerSettings.getRegistryServerHost();
+            return RegistryServiceClientFactory.createRegistryClient(serverHost, serverPort);
+        } catch (RegistryServiceException | ApplicationSettingsException e) {
+            throw new AgentException("Unable to create registry client...", e);
+        }
+    }
+
+    public static CredentialStoreService.Client getCredentialClient() throws AgentException {
+        try {
+            final int serverPort = Integer.parseInt(ServerSettings.getCredentialStoreServerPort());
+            final String serverHost =ServerSettings.getCredentialStoreServerHost();
+            return CredentialStoreClientFactory.createAiravataCSClient(serverHost, serverPort);
+        } catch (CredentialStoreException | ApplicationSettingsException e) {
+            throw new AgentException("Unable to create credential client...", e);
+        }
+    }
+}
diff --git a/modules/airavata-helix/agent-impl/pom.xml b/modules/airavata-helix/agent-impl/pom.xml
index f0b8474..ce6317b 100644
--- a/modules/airavata-helix/agent-impl/pom.xml
+++ b/modules/airavata-helix/agent-impl/pom.xml
@@ -35,6 +35,7 @@
     <packaging>pom</packaging>
     <modules>
         <module>ssh-agent</module>
+        <module>sshj-agent</module>
     </modules>
 
     <dependencies>
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/pom.xml b/modules/airavata-helix/agent-impl/ssh-agent/pom.xml
index 837b7f3..61f470d 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/pom.xml
+++ b/modules/airavata-helix/agent-impl/ssh-agent/pom.xml
@@ -12,7 +12,6 @@
     <name>SSH Agent</name>
     <artifactId>ssh-agent</artifactId>
 
-
     <dependencies>
         <dependency>
             <groupId>com.jcraft</groupId>
@@ -21,34 +20,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-registry-core</artifactId>
-            <version>0.17-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>registry-api-service</artifactId>
-            <version>0.17-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-credential-store</artifactId>
-            <version>0.17-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-credential-store</artifactId>
+            <artifactId>agent-api</artifactId>
             <version>0.17-SNAPSHOT</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>agent-api</artifactId>
-            <version>0.17-SNAPSHOT</version>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>1.4</version>
         </dependency>
     </dependencies>
 
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java
deleted file mode 100644
index cb640f3..0000000
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.helix.agent.local;
-
-import org.apache.airavata.agents.api.AgentAdaptor;
-import org.apache.airavata.agents.api.AgentException;
-import org.apache.airavata.agents.api.CommandOutput;
-
-import java.util.List;
-
-public class LocalAgentAdaptor implements AgentAdaptor {
-
-    public void init(Object agentPams) throws AgentException {
-        throw new AgentException("Operation not implemented");
-    }
-
-    @Override
-    public void init(String computeResource, String gatewayId, String userId, String token) throws AgentException {
-        throw new AgentException("Operation not implemented");
-    }
-
-    @Override
-    public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException {
-        throw new AgentException("Operation not implemented");
-    }
-
-    @Override
-    public void createDirectory(String path) throws AgentException {
-        throw new AgentException("Operation not implemented");
-    }
-
-    @Override
-    public void copyFileTo(String localFile, String remoteFile) throws AgentException {
-        throw new AgentException("Operation not implemented");
-    }
-
-    @Override
-    public void copyFileFrom(String remoteFile, String localFile) throws AgentException {
-        throw new AgentException("Operation not implemented");
-    }
-
-
-    @Override
-    public List<String> listDirectory(String path) throws AgentException {
-        throw new AgentException("Operation not implemented");
-    }
-
-    @Override
-    public Boolean doesFileExist(String filePath) throws AgentException {
-        throw new AgentException("Operation not implemented");
-    }
-
-    @Override
-    public List<String> getFileNameFromExtension(String fileName, String parentPath) throws AgentException {
-        throw new AgentException("Operation not implemented");
-    }
-}
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
index bf899e0..9c26f6a 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
@@ -21,25 +21,14 @@ package org.apache.airavata.helix.agent.ssh;
 
 import com.jcraft.jsch.*;
 import org.apache.airavata.agents.api.*;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.DBUtil;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.credential.store.client.CredentialStoreClientFactory;
-import org.apache.airavata.credential.store.cpi.CredentialStoreService;
 import org.apache.airavata.model.credential.store.SSHCredential;
-import org.apache.airavata.credential.store.exception.CredentialStoreException;
-import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
 import org.apache.airavata.model.appcatalog.computeresource.*;
-import org.apache.airavata.registry.api.RegistryService;
-import org.apache.airavata.registry.api.client.RegistryServiceClientFactory;
-import org.apache.airavata.registry.api.exception.RegistryServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.*;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Optional;
 import java.util.UUID;
 
 /**
@@ -92,11 +81,11 @@ public class SshAgentAdaptor implements AgentAdaptor {
     @Override
     public void init(String computeResourceId, String gatewayId, String userId, String token) throws AgentException {
         try {
-            ComputeResourceDescription computeResourceDescription = getRegistryServiceClient().getComputeResource(computeResourceId);
+            ComputeResourceDescription computeResourceDescription = AgentUtils.getRegistryServiceClient().getComputeResource(computeResourceId);
 
             logger.info("Fetching credentials for cred store token " + token);
 
-            SSHCredential sshCredential = getCredentialClient().getSSHCredential(token, gatewayId);
+            SSHCredential sshCredential = AgentUtils.getCredentialClient().getSSHCredential(token, gatewayId);
             if (sshCredential == null) {
                 throw new AgentException("Null credential for token " + token);
             }
@@ -117,27 +106,6 @@ public class SshAgentAdaptor implements AgentAdaptor {
         }
     }
 
-    // TODO this is inefficient. Try to use a connection pool
-    public static RegistryService.Client getRegistryServiceClient() throws AgentException {
-        try {
-            final int serverPort = Integer.parseInt(ServerSettings.getRegistryServerPort());
-            final String serverHost = ServerSettings.getRegistryServerHost();
-            return RegistryServiceClientFactory.createRegistryClient(serverHost, serverPort);
-        } catch (RegistryServiceException | ApplicationSettingsException e) {
-            throw new AgentException("Unable to create registry client...", e);
-        }
-    }
-
-    public static CredentialStoreService.Client getCredentialClient() throws AgentException {
-        try {
-            final int serverPort = Integer.parseInt(ServerSettings.getCredentialStoreServerPort());
-            final String serverHost =ServerSettings.getCredentialStoreServerHost();
-            return CredentialStoreClientFactory.createAiravataCSClient(serverHost, serverPort);
-        } catch (CredentialStoreException | ApplicationSettingsException e) {
-            throw new AgentException("Unable to create credential client...", e);
-        }
-    }
-
     public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException {
         StandardOutReader commandOutput = new StandardOutReader();
         ChannelExec channelExec = null;
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
index 2e62a63..c36e34b 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
@@ -20,6 +20,7 @@
 package org.apache.airavata.helix.agent.storage;
 
 import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.AgentUtils;
 import org.apache.airavata.agents.api.CommandOutput;
 import org.apache.airavata.agents.api.StorageResourceAdaptor;
 import org.apache.airavata.helix.agent.ssh.SshAdaptorParams;
@@ -39,11 +40,11 @@ public class StorageResourceAdaptorImpl extends SshAgentAdaptor implements Stora
         try {
             logger.info("Initializing Storage Resource Adaptor for storage resource : "+ storageResourceId + ", gateway : " +
                     gatewayId +", user " + loginUser + ", token : " + token);
-            StorageResourceDescription storageResource = getRegistryServiceClient().getStorageResource(storageResourceId);
+            StorageResourceDescription storageResource = AgentUtils.getRegistryServiceClient().getStorageResource(storageResourceId);
 
             logger.info("Fetching credentials for cred store token " + token);
 
-            SSHCredential sshCredential = getCredentialClient().getSSHCredential(token, gatewayId);
+            SSHCredential sshCredential = AgentUtils.getCredentialClient().getSSHCredential(token, gatewayId);
             if (sshCredential == null) {
                 throw new AgentException("Null credential for token " + token);
             }
diff --git a/modules/airavata-helix/agent-api/pom.xml b/modules/airavata-helix/agent-impl/sshj-agent/pom.xml
similarity index 61%
copy from modules/airavata-helix/agent-api/pom.xml
copy to modules/airavata-helix/agent-impl/sshj-agent/pom.xml
index f4ac36e..78351d5 100644
--- a/modules/airavata-helix/agent-api/pom.xml
+++ b/modules/airavata-helix/agent-impl/sshj-agent/pom.xml
@@ -24,40 +24,41 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>airavata-helix</artifactId>
+        <artifactId>agent-impl</artifactId>
         <groupId>org.apache.airavata</groupId>
         <version>0.17-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <name>Agent API</name>
-    <artifactId>agent-api</artifactId>
+
+    <artifactId>sshj-agent</artifactId>
 
     <dependencies>
         <dependency>
-            <groupId>org.codehaus.jackson</groupId>
-            <artifactId>jackson-mapper-asl</artifactId>
-            <version>1.8.5</version>
+            <groupId>com.hierynomus</groupId>
+            <artifactId>sshj</artifactId>
+            <version>0.23.0</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.bouncycastle</groupId>
+                        <artifactId>bcpkix-jdk15on</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.bouncycastle</groupId>
+                        <artifactId>bcprov-jdk15on</artifactId>
+                    </exclusion>
+                </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-registry-core</artifactId>
+            <artifactId>ssh-agent</artifactId>
             <version>0.17-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.jcraft</groupId>
+                    <artifactId>jsch</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
 
-    <!--<build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <version>3.5.1</version>
-                <configuration>
-                    <source>${java.version}</source>
-                    <target>${java.version}</target>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>-->
-
 </project>
\ No newline at end of file
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
new file mode 100644
index 0000000..768b4e0
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
@@ -0,0 +1,345 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.adaptor;
+
+import net.schmizz.sshj.Config;
+import net.schmizz.sshj.SSHClient;
+import net.schmizz.sshj.common.DisconnectReason;
+import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.sftp.RemoteFile;
+import net.schmizz.sshj.sftp.SFTPClient;
+import net.schmizz.sshj.transport.DisconnectListener;
+import net.schmizz.sshj.transport.TransportException;
+import net.schmizz.sshj.transport.verification.HostKeyVerifier;
+import net.schmizz.sshj.userauth.UserAuthException;
+import net.schmizz.sshj.userauth.method.AuthMethod;
+import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
+import org.apache.airavata.helix.adaptor.wrapper.SFTPClientWrapper;
+import org.apache.airavata.helix.adaptor.wrapper.SessionWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This class will keep a pool of {@link SSHClient} and scale them according to the number of SSH requests.
+ * This pool is MaxSessions per connection aware and thread safe. It is intelligent to decide the number of connections
+ * that it should create and number of sessions should be used in each created connection to avoid possible connection
+ * refusals from the server side.
+ */
+public class PoolingSSHJClient extends SSHClient {
+
+    private final static Logger logger = LoggerFactory.getLogger(PoolingSSHJClient.class);
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final Map<SSHClient, SSHClientInfo> clientInfoMap = new HashMap<>();
+
+    private HostKeyVerifier hostKeyVerifier;
+    private String username;
+    private List<AuthMethod> authMethods;
+    private Config config;
+    private String host;
+    private int port;
+
+    private int maxSessionsForConnection = 10;
+
+    public void addHostKeyVerifier(HostKeyVerifier verifier) {
+        this.hostKeyVerifier = verifier;
+    }
+
+    public void auth(String username, List<AuthMethod> methods) throws UserAuthException, TransportException {
+        this.username = username;
+        this.authMethods = methods;
+    }
+
+    public PoolingSSHJClient(Config config, String host, int port) {
+        this.config = config;
+        this.host = host;
+        this.port = port;
+    }
+
+    ////////////////// client specific operations ///////
+
+    private SSHClient leaseSSHClient() throws Exception {
+        lock.writeLock().lock();
+
+        try {
+            if (clientInfoMap.isEmpty()) {
+                SSHClient newClient = createNewSSHClient();
+                SSHClientInfo info = new SSHClientInfo(1, System.currentTimeMillis());
+                clientInfoMap.put(newClient, info);
+
+                /* if this is the very first connection that is created to the compute host, fetch the MaxSessions
+                 * value form SSHD config file in order to tune the pool
+                 */
+                logger.info("Fetching max sessions for the connection of " + host);
+                try (SFTPClient sftpClient = newClient.newSFTPClient()) {
+                    RemoteFile remoteFile = sftpClient.open("/etc/ssh/sshd_config");
+                    byte[] readContent = new byte[(int) remoteFile.length()];
+                    remoteFile.read(0, readContent, 0, readContent.length);
+
+                    if (logger.isTraceEnabled()) {
+                        logger.trace("SSHD config file content : " + new String(readContent));
+                    }
+                    String[] lines = new String(readContent).split("\n");
+
+                    for (String line : lines) {
+                        if (line.trim().startsWith("MaxSessions")) {
+                            String[] splits = line.split(" ");
+                            if (splits.length == 2) {
+                                int sessionCount = Integer.parseInt(splits[1]);
+                                logger.info("Max session count is : " + sessionCount + " for " + host);
+                                setMaxSessionsForConnection(sessionCount);
+                            }
+                            break;
+                        }
+                    }
+                } catch (Exception e) {
+                    logger.warn("Failed to fetch max session count for " + host + ". Continuing with default value 10. " + e.getMessage() );
+                }
+                return newClient;
+
+            } else {
+
+                Optional<Map.Entry<SSHClient, SSHClientInfo>> minEntryOp = clientInfoMap.entrySet().stream().min(Comparator.comparing(entry -> entry.getValue().sessionCount));
+                if (minEntryOp.isPresent()) {
+                    Map.Entry<SSHClient, SSHClientInfo> minEntry = minEntryOp.get();
+                    // use the connection with least amount of sessions created.
+
+                    logger.debug("Session count for selected connection {}. Threshold {}", minEntry.getValue().getSessionCount(), maxSessionsForConnection);
+                    if (minEntry.getValue().getSessionCount() >= maxSessionsForConnection) {
+                        // if it exceeds the maximum session count, create a new connection
+                        logger.debug("Connection with least amount of sessions exceeds the threshold. So creating a new connection");
+                        SSHClient newClient = createNewSSHClient();
+                        SSHClientInfo info = new SSHClientInfo(1, System.currentTimeMillis());
+                        clientInfoMap.put(newClient, info);
+                        return newClient;
+
+                    } else {
+                        // otherwise reuse the same connetion
+                        logger.debug("Reusing the same connection as it doesn't exceed the threshold");
+                        minEntry.getValue().setSessionCount(minEntry.getValue().getSessionCount() + 1);
+                        minEntry.getValue().setLastAccessedTime(System.currentTimeMillis());
+                        return minEntry.getKey();
+                    }
+                } else {
+                    throw new Exception("Failed to find a connection in the pool for " + host);
+                }
+            }
+
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void removeDisconnectedClients(SSHClient client) {
+        lock.writeLock().lock();
+
+        try {
+            if (clientInfoMap.containsKey(client)) {
+                clientInfoMap.remove(client);
+            }
+
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void untrackClosedSessions(SSHClient client, int sessionId) {
+        lock.writeLock().lock();
+
+        try {
+            if (clientInfoMap.containsKey(client)) {
+                SSHClientInfo sshClientInfo = clientInfoMap.get(client);
+                sshClientInfo.setSessionCount(sshClientInfo.getSessionCount() - 1);
+            }
+
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private SSHClient createNewSSHClient() throws IOException {
+
+        SSHClient sshClient;
+        if (config != null) {
+            sshClient = new SSHClient(config);
+        } else {
+            sshClient = new SSHClient();
+        }
+
+        sshClient.getConnection().getTransport().setDisconnectListener(new DisconnectListener() {
+            @Override
+            public void notifyDisconnect(DisconnectReason reason, String message) {
+                logger.warn("Connection disconnected " + message + " due to " + reason.name());
+                removeDisconnectedClients(sshClient);
+            }
+        });
+
+        if (hostKeyVerifier != null) {
+            sshClient.addHostKeyVerifier(hostKeyVerifier);
+        }
+
+        sshClient.connect(host);
+
+        sshClient.getConnection().getKeepAlive().setKeepAliveInterval(5); //send keep alive signal every 5sec
+
+        if (authMethods != null) {
+            sshClient.auth(username, authMethods);
+        }
+
+        return sshClient;
+    }
+
+    public Session startSessionWrapper() throws Exception {
+
+        final SSHClient sshClient = leaseSSHClient();
+
+        try {
+            return new SessionWrapper(sshClient.startSession(), (id) -> untrackClosedSessions(sshClient, id));
+
+        } catch (Exception e) {
+            if (sshClient != null) {
+                untrackClosedSessions(sshClient, -1);
+            }
+            throw e;
+        }
+    }
+
+    public SCPFileTransferWrapper newSCPFileTransferWrapper() throws Exception {
+
+        final SSHClient sshClient = leaseSSHClient();
+
+        try {
+            return new SCPFileTransferWrapper(sshClient.newSCPFileTransfer(), (id) -> untrackClosedSessions(sshClient, id));
+
+        } catch (Exception e) {
+            if (sshClient != null) {
+                untrackClosedSessions(sshClient, -1);
+            }
+            throw e;
+        }
+    }
+
+    public SFTPClient newSFTPClientWrapper() throws Exception {
+
+        final SSHClient sshClient = leaseSSHClient();
+
+        try {
+            return new SFTPClientWrapper(sshClient.newSFTPClient(), (id) -> untrackClosedSessions(sshClient, id));
+        } catch (Exception e) {
+
+            if (sshClient != null) {
+                untrackClosedSessions(sshClient, -1);
+            }
+            throw e;
+        }
+    }
+
+    public class SSHClientInfo {
+
+        private int sessionCount;
+        private long lastAccessedTime;
+
+        public SSHClientInfo(int sessionCount, long lastAccessedTime) {
+            this.sessionCount = sessionCount;
+            this.lastAccessedTime = lastAccessedTime;
+        }
+
+        public int getSessionCount() {
+            return sessionCount;
+        }
+
+        public SSHClientInfo setSessionCount(int sessionCount) {
+            this.sessionCount = sessionCount;
+            return this;
+        }
+
+        public long getLastAccessedTime() {
+            return lastAccessedTime;
+        }
+
+        public SSHClientInfo setLastAccessedTime(long lastAccessedTime) {
+            this.lastAccessedTime = lastAccessedTime;
+            return this;
+        }
+    }
+
+    public HostKeyVerifier getHostKeyVerifier() {
+        return hostKeyVerifier;
+    }
+
+    public PoolingSSHJClient setHostKeyVerifier(HostKeyVerifier hostKeyVerifier) {
+        this.hostKeyVerifier = hostKeyVerifier;
+        return this;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public PoolingSSHJClient setUsername(String username) {
+        this.username = username;
+        return this;
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+    public PoolingSSHJClient setConfig(Config config) {
+        this.config = config;
+        return this;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public PoolingSSHJClient setHost(String host) {
+        this.host = host;
+        return this;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public PoolingSSHJClient setPort(int port) {
+        this.port = port;
+        return this;
+    }
+
+    public int getMaxSessionsForConnection() {
+        return maxSessionsForConnection;
+    }
+
+    public PoolingSSHJClient setMaxSessionsForConnection(int maxSessionsForConnection) {
+        this.maxSessionsForConnection = maxSessionsForConnection;
+        return this;
+    }
+
+    public Map<SSHClient, SSHClientInfo> getClientInfoMap() {
+        return clientInfoMap;
+    }
+}
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
new file mode 100644
index 0000000..9b3d18c
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
@@ -0,0 +1,217 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.adaptor;
+
+import net.schmizz.keepalive.KeepAliveProvider;
+import net.schmizz.sshj.DefaultConfig;
+import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.sftp.*;
+import net.schmizz.sshj.userauth.keyprovider.KeyProvider;
+import net.schmizz.sshj.userauth.method.AuthKeyboardInteractive;
+import net.schmizz.sshj.userauth.method.AuthMethod;
+import net.schmizz.sshj.userauth.method.AuthPublickey;
+import net.schmizz.sshj.userauth.method.ChallengeResponseProvider;
+import net.schmizz.sshj.userauth.password.PasswordFinder;
+import net.schmizz.sshj.userauth.password.PasswordUtils;
+import net.schmizz.sshj.userauth.password.Resource;
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.AgentUtils;
+import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
+import org.apache.airavata.helix.agent.ssh.StandardOutReader;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.credential.store.SSHCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SSHJAgentAdaptor implements AgentAdaptor {
+
+    private final static Logger logger = LoggerFactory.getLogger(SSHJAgentAdaptor.class);
+
+    private PoolingSSHJClient sshjClient;
+
+    protected void createPoolingSSHJClient(String user, String host, String publicKey, String privateKey, String passphrase) throws IOException {
+        DefaultConfig defaultConfig = new DefaultConfig();
+        defaultConfig.setKeepAliveProvider(KeepAliveProvider.KEEP_ALIVE);
+
+        sshjClient = new PoolingSSHJClient(defaultConfig, host, 22);
+        sshjClient.addHostKeyVerifier((hostname, port, key) -> true);
+
+        sshjClient.setMaxSessionsForConnection(10);
+
+        PasswordFinder passwordFinder = passphrase != null ? PasswordUtils.createOneOff(passphrase.toCharArray()) : null;
+
+        KeyProvider keyProvider = sshjClient.loadKeys(privateKey, publicKey, passwordFinder);
+
+        final List<AuthMethod> am = new LinkedList<>();
+        am.add(new AuthPublickey(keyProvider));
+
+        am.add(new AuthKeyboardInteractive(new ChallengeResponseProvider() {
+            @Override
+            public List<String> getSubmethods() {
+                return null;
+            }
+
+            @Override
+            public void init(Resource resource, String name, String instruction) {
+
+            }
+
+            @Override
+            public char[] getResponse(String prompt, boolean echo) {
+                return new char[0];
+            }
+
+            @Override
+            public boolean shouldRetry() {
+                return false;
+            }
+        }));
+
+        sshjClient.auth(user, am);
+    }
+
+    @Override
+    public void init(String computeResource, String gatewayId, String userId, String token) throws AgentException {
+        try {
+            ComputeResourceDescription computeResourceDescription = AgentUtils.getRegistryServiceClient().getComputeResource(computeResource);
+
+            logger.info("Fetching credentials for cred store token " + token);
+
+            SSHCredential sshCredential = AgentUtils.getCredentialClient().getSSHCredential(token, gatewayId);
+            if (sshCredential == null) {
+                throw new AgentException("Null credential for token " + token);
+            }
+            logger.info("Description for token : " + token + " : " + sshCredential.getDescription());
+
+            createPoolingSSHJClient(userId, computeResourceDescription.getHostName(),
+                    sshCredential.getPublicKey(), sshCredential.getPrivateKey(), sshCredential.getPassphrase());
+
+        } catch (Exception e) {
+            logger.error("Error while initializing ssh agent for compute resource " + computeResource + " to token " + token, e);
+            throw new AgentException("Error while initializing ssh agent for compute resource " + computeResource + " to token " + token, e);
+        }
+    }
+
+    @Override
+    public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException {
+        try (Session session = sshjClient.startSessionWrapper()) {
+            Session.Command exec = session.exec((workingDirectory != null ? "cd " + workingDirectory + "; " : "") + command);
+            StandardOutReader standardOutReader = new StandardOutReader();
+            standardOutReader.readStdOutFromStream(exec.getInputStream());
+            standardOutReader.readStdErrFromStream(exec.getErrorStream());
+            standardOutReader.setExitCode(exec.getExitStatus());
+            return standardOutReader;
+        } catch (Exception e) {
+            throw new AgentException(e);
+        }
+    }
+
+    @Override
+    public void createDirectory(String path) throws AgentException {
+        try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+            sftpClient.mkdir(path);
+        } catch (Exception e) {
+            throw new AgentException(e);
+        }
+    }
+
+    @Override
+    public void copyFileTo(String localFile, String remoteFile) throws AgentException {
+        try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+            fileTransfer.upload(localFile, remoteFile);
+        } catch (Exception e) {
+            throw new AgentException(e);
+        }
+    }
+
+    @Override
+    public void copyFileFrom(String remoteFile, String localFile) throws AgentException {
+        try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+            fileTransfer.download(remoteFile, localFile);
+        } catch (Exception e) {
+            throw new AgentException(e);
+        }
+    }
+
+    @Override
+    public List<String> listDirectory(String path) throws AgentException {
+        try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+            List<RemoteResourceInfo> ls = sftpClient.ls(path);
+            return ls.stream().map(RemoteResourceInfo::getName).collect(Collectors.toList());
+        } catch (Exception e) {
+            throw new AgentException(e);
+        }
+    }
+
+    @Override
+    public Boolean doesFileExist(String filePath) throws AgentException {
+        try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+            return sftpClient.statExistence(filePath) != null;
+        } catch (Exception e) {
+            throw new AgentException(e);
+        }
+    }
+
+    @Override
+    public List<String> getFileNameFromExtension(String fileName, String parentPath) throws AgentException {
+
+        try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+            List<RemoteResourceInfo> ls = sftpClient.ls(parentPath, resource -> isMatch(resource.getName(), fileName));
+            return ls.stream().map(RemoteResourceInfo::getPath).collect(Collectors.toList());
+        } catch (Exception e) {
+            throw new AgentException(e);
+        }
+    }
+
+    private boolean isMatch(String s, String p) {
+        int i = 0;
+        int j = 0;
+        int starIndex = -1;
+        int iIndex = -1;
+
+        while (i < s.length()) {
+            if (j < p.length() && (p.charAt(j) == '?' || p.charAt(j) == s.charAt(i))) {
+                ++i;
+                ++j;
+            } else if (j < p.length() && p.charAt(j) == '*') {
+                starIndex = j;
+                iIndex = i;
+                j++;
+            } else if (starIndex != -1) {
+                j = starIndex + 1;
+                i = iIndex+1;
+                iIndex++;
+            } else {
+                return false;
+            }
+        }
+        while (j < p.length() && p.charAt(j) == '*') {
+            ++j;
+        }
+        return j == p.length();
+    }
+}
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
similarity index 72%
copy from modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
copy to modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
index 2e62a63..811cac2 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
@@ -17,46 +17,38 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.airavata.helix.agent.storage;
+package org.apache.airavata.helix.adaptor;
 
 import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.AgentUtils;
 import org.apache.airavata.agents.api.CommandOutput;
 import org.apache.airavata.agents.api.StorageResourceAdaptor;
-import org.apache.airavata.helix.agent.ssh.SshAdaptorParams;
-import org.apache.airavata.helix.agent.ssh.SshAgentAdaptor;
 import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
 import org.apache.airavata.model.credential.store.SSHCredential;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StorageResourceAdaptorImpl extends SshAgentAdaptor implements StorageResourceAdaptor  {
+public class SSHJStorageAdaptor extends SSHJAgentAdaptor implements StorageResourceAdaptor {
 
-    private final static Logger logger = LoggerFactory.getLogger(StorageResourceAdaptorImpl.class);
+    private final static Logger logger = LoggerFactory.getLogger(SSHJAgentAdaptor.class);
 
     @Override
     public void init(String storageResourceId, String gatewayId, String loginUser, String token) throws AgentException {
-
         try {
             logger.info("Initializing Storage Resource Adaptor for storage resource : "+ storageResourceId + ", gateway : " +
                     gatewayId +", user " + loginUser + ", token : " + token);
-            StorageResourceDescription storageResource = getRegistryServiceClient().getStorageResource(storageResourceId);
+            StorageResourceDescription storageResource = AgentUtils.getRegistryServiceClient().getStorageResource(storageResourceId);
 
             logger.info("Fetching credentials for cred store token " + token);
 
-            SSHCredential sshCredential = getCredentialClient().getSSHCredential(token, gatewayId);
+            SSHCredential sshCredential = AgentUtils.getCredentialClient().getSSHCredential(token, gatewayId);
             if (sshCredential == null) {
                 throw new AgentException("Null credential for token " + token);
             }
             logger.info("Description for token : " + token + " : " + sshCredential.getDescription());
 
-            SshAdaptorParams adaptorParams = new SshAdaptorParams();
-            adaptorParams.setHostName(storageResource.getHostName());
-            adaptorParams.setUserName(loginUser);
-            adaptorParams.setPassphrase(sshCredential.getPassphrase());
-            adaptorParams.setPrivateKey(sshCredential.getPrivateKey().getBytes());
-            adaptorParams.setPublicKey(sshCredential.getPublicKey().getBytes());
-            adaptorParams.setStrictHostKeyChecking(false);
-            init(adaptorParams);
+            createPoolingSSHJClient(loginUser, storageResource.getHostName(), sshCredential.getPublicKey(),
+                    sshCredential.getPrivateKey(), sshCredential.getPassphrase());
 
         } catch (Exception e) {
             logger.error("Error while initializing ssh agent for storage resource " + storageResourceId + " to token " + token, e);
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java
new file mode 100644
index 0000000..ff49e5b
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.adaptor.wrapper;
+
+import net.schmizz.sshj.xfer.FileTransfer;
+import net.schmizz.sshj.xfer.LocalDestFile;
+import net.schmizz.sshj.xfer.LocalSourceFile;
+import net.schmizz.sshj.xfer.TransferListener;
+import net.schmizz.sshj.xfer.scp.SCPFileTransfer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.function.Consumer;
+
+public class SCPFileTransferWrapper implements FileTransfer, Closeable {
+
+    private SCPFileTransfer scpFileTransfer;
+    private Consumer<Integer> onCloseFunction;
+
+    public SCPFileTransferWrapper(SCPFileTransfer scpFileTransfer, Consumer<Integer> onCloseFunction) {
+        this.scpFileTransfer = scpFileTransfer;
+        this.onCloseFunction = onCloseFunction;
+    }
+
+    @Override
+    public void upload(String localPath, String remotePath) throws IOException {
+        scpFileTransfer.upload(localPath, remotePath);
+    }
+
+    @Override
+    public void download(String remotePath, String localPath) throws IOException {
+        scpFileTransfer.download(remotePath, localPath);
+    }
+
+    @Override
+    public void upload(LocalSourceFile localFile, String remotePath) throws IOException {
+        scpFileTransfer.upload(localFile, remotePath);
+    }
+
+    @Override
+    public void download(String remotePath, LocalDestFile localFile) throws IOException {
+        scpFileTransfer.download(remotePath, localFile);
+    }
+
+    @Override
+    public TransferListener getTransferListener() {
+        return scpFileTransfer.getTransferListener();
+    }
+
+    @Override
+    public void setTransferListener(TransferListener listener) {
+        scpFileTransfer.setTransferListener(listener);
+    }
+
+    @Override
+    public void close() throws IOException {
+        onCloseFunction.accept(-1);
+    }
+}
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java
new file mode 100644
index 0000000..ed21510
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.adaptor.wrapper;
+
+import net.schmizz.sshj.sftp.SFTPClient;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+public class SFTPClientWrapper extends SFTPClient {
+    private SFTPClient sftpClient;
+    private Consumer<Integer> onCloseFunction;
+
+    public SFTPClientWrapper(SFTPClient sftpClient, Consumer<Integer> onCloseFunction) {
+        super(sftpClient.getSFTPEngine());
+        this.onCloseFunction = onCloseFunction;
+    }
+
+    @Override
+    public void close() throws IOException {
+        onCloseFunction.accept(-1);
+        super.close();
+    }
+}
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java
new file mode 100644
index 0000000..a1899b1
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java
@@ -0,0 +1,183 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.adaptor.wrapper;
+
+import net.schmizz.sshj.common.LoggerFactory;
+import net.schmizz.sshj.common.Message;
+import net.schmizz.sshj.common.SSHException;
+import net.schmizz.sshj.common.SSHPacket;
+import net.schmizz.sshj.connection.ConnectionException;
+import net.schmizz.sshj.connection.channel.direct.PTYMode;
+import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.transport.TransportException;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+public class SessionWrapper implements Session {
+
+    private Session session;
+    private Consumer<Integer> onCloseFunction;
+
+    public SessionWrapper(Session session, Consumer<Integer> onCloseFunction) {
+        this.session = session;
+        this.onCloseFunction = onCloseFunction;
+    }
+
+    @Override
+    public void allocateDefaultPTY() throws ConnectionException, TransportException {
+        session.allocateDefaultPTY();
+    }
+
+    @Override
+    public void allocatePTY(String term, int cols, int rows, int width, int height, Map<PTYMode, Integer> modes) throws ConnectionException, TransportException {
+        session.allocatePTY(term, cols, rows, width, height, modes);
+    }
+
+    @Override
+    public Command exec(String command) throws ConnectionException, TransportException {
+        return session.exec(command);
+    }
+
+    @Override
+    public void reqX11Forwarding(String authProto, String authCookie, int screen) throws ConnectionException, TransportException {
+        session.reqX11Forwarding(authProto, authCookie, screen);
+    }
+
+    @Override
+    public void setEnvVar(String name, String value) throws ConnectionException, TransportException {
+        session.setEnvVar(name, value);
+    }
+
+    @Override
+    public Shell startShell() throws ConnectionException, TransportException {
+        return session.startShell();
+    }
+
+    @Override
+    public Subsystem startSubsystem(String name) throws ConnectionException, TransportException {
+        return session.startSubsystem(name);
+    }
+
+    @Override
+    public void close() throws TransportException, ConnectionException {
+        onCloseFunction.accept(getID());
+        session.close();
+    }
+
+    @Override
+    public boolean getAutoExpand() {
+        return session.getAutoExpand();
+    }
+
+    @Override
+    public int getID() {
+        return session.getID();
+    }
+
+    @Override
+    public InputStream getInputStream() {
+        return session.getInputStream();
+    }
+
+    @Override
+    public int getLocalMaxPacketSize() {
+        return session.getLocalMaxPacketSize();
+    }
+
+    @Override
+    public long getLocalWinSize() {
+        return session.getLocalWinSize();
+    }
+
+    @Override
+    public OutputStream getOutputStream() {
+        return session.getOutputStream();
+    }
+
+    @Override
+    public int getRecipient() {
+        return session.getRecipient();
+    }
+
+    @Override
+    public Charset getRemoteCharset() {
+        return session.getRemoteCharset();
+    }
+
+    @Override
+    public int getRemoteMaxPacketSize() {
+        return session.getRemoteMaxPacketSize();
+    }
+
+    @Override
+    public long getRemoteWinSize() {
+        return session.getRemoteWinSize();
+    }
+
+    @Override
+    public String getType() {
+        return session.getType();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return session.isOpen();
+    }
+
+    @Override
+    public void setAutoExpand(boolean autoExpand) {
+        session.setAutoExpand(autoExpand);
+    }
+
+    @Override
+    public void join() throws ConnectionException {
+        session.join();
+    }
+
+    @Override
+    public void join(long timeout, TimeUnit unit) throws ConnectionException {
+        session.join(timeout, unit);
+    }
+
+    @Override
+    public boolean isEOF() {
+        return session.isEOF();
+    }
+
+    @Override
+    public LoggerFactory getLoggerFactory() {
+        return session.getLoggerFactory();
+    }
+
+    @Override
+    public void notifyError(SSHException error) {
+        session.notifyError(error);
+    }
+
+    @Override
+    public void handle(Message msg, SSHPacket buf) throws SSHException {
+        session.handle(msg, buf);
+    }
+}
diff --git a/modules/airavata-helix/helix-spectator/pom.xml b/modules/airavata-helix/helix-spectator/pom.xml
index 357c472..d479713 100644
--- a/modules/airavata-helix/helix-spectator/pom.xml
+++ b/modules/airavata-helix/helix-spectator/pom.xml
@@ -95,5 +95,15 @@
             <artifactId>logstash-logback-encoder</artifactId>
             <version>5.0</version>
         </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcpkix-jdk15on</artifactId>
+            <version>1.56</version>
+        </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk15on</artifactId>
+            <version>1.56</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/modules/airavata-helix/task-core/pom.xml b/modules/airavata-helix/task-core/pom.xml
index cdebe2f..e0e62e0 100644
--- a/modules/airavata-helix/task-core/pom.xml
+++ b/modules/airavata-helix/task-core/pom.xml
@@ -46,7 +46,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
-            <artifactId>ssh-agent</artifactId>
+            <artifactId>sshj-agent</artifactId>
             <version>0.17-SNAPSHOT</version>
         </dependency>
     </dependencies>
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
index f1bb854..e06f150 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
@@ -20,6 +20,8 @@
 package org.apache.airavata.helix.core.support;
 
 import org.apache.airavata.agents.api.*;
+import org.apache.airavata.helix.adaptor.SSHJAgentAdaptor;
+import org.apache.airavata.helix.adaptor.SSHJStorageAdaptor;
 import org.apache.airavata.helix.agent.ssh.SshAgentAdaptor;
 import org.apache.airavata.helix.agent.storage.StorageResourceAdaptorImpl;
 import org.apache.airavata.helix.task.api.support.AdaptorSupport;
@@ -55,7 +57,7 @@ public class AdaptorSupportImpl implements AdaptorSupport {
 
         switch (protocol) {
             case SSH:
-                SshAgentAdaptor agentAdaptor = new SshAgentAdaptor();
+                SSHJAgentAdaptor agentAdaptor = new SSHJAgentAdaptor();
                 agentAdaptor.init(computeResource, gatewayId, userId, authToken);
                 return agentAdaptor;
             default:
@@ -66,7 +68,7 @@ public class AdaptorSupportImpl implements AdaptorSupport {
 
     @Override
     public StorageResourceAdaptor fetchStorageAdaptor(String gatewayId, String storageResourceId, String protocol, String authToken, String userId) throws AgentException {
-        StorageResourceAdaptor storageResourceAdaptor = new StorageResourceAdaptorImpl();
+        SSHJStorageAdaptor storageResourceAdaptor = new SSHJStorageAdaptor();
         storageResourceAdaptor.init(storageResourceId, gatewayId, userId, authToken);
         return storageResourceAdaptor;
     }

-- 
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.