You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/04/13 17:49:54 UTC
[airavata] branch develop updated: Adding sshj adaptor and
replacing jsch adaptor for SSH communication
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push:
new a2acaac Adding sshj adaptor and replacing jsch adaptor for SSH communication
a2acaac is described below
commit a2acaac097dfd24c85c1acbb4f041a4ee65a7d95
Author: dimuthu <di...@gmail.com>
AuthorDate: Fri Apr 13 13:49:47 2018 -0400
Adding sshj adaptor and replacing jsch adaptor for SSH communication
---
modules/airavata-helix/agent-api/pom.xml | 13 +-
.../org/apache/airavata/agents/api/AgentUtils.java | 34 ++
modules/airavata-helix/agent-impl/pom.xml | 1 +
.../airavata-helix/agent-impl/ssh-agent/pom.xml | 30 +-
.../helix/agent/local/LocalAgentAdaptor.java | 74 -----
.../airavata/helix/agent/ssh/SshAgentAdaptor.java | 36 +--
.../agent/storage/StorageResourceAdaptorImpl.java | 5 +-
.../{agent-api => agent-impl/sshj-agent}/pom.xml | 45 +--
.../airavata/helix/adaptor/PoolingSSHJClient.java | 345 +++++++++++++++++++++
.../airavata/helix/adaptor/SSHJAgentAdaptor.java | 217 +++++++++++++
.../helix/adaptor/SSHJStorageAdaptor.java} | 24 +-
.../adaptor/wrapper/SCPFileTransferWrapper.java | 76 +++++
.../helix/adaptor/wrapper/SFTPClientWrapper.java | 41 +++
.../helix/adaptor/wrapper/SessionWrapper.java | 183 +++++++++++
modules/airavata-helix/helix-spectator/pom.xml | 10 +
modules/airavata-helix/task-core/pom.xml | 2 +-
.../helix/core/support/AdaptorSupportImpl.java | 6 +-
17 files changed, 964 insertions(+), 178 deletions(-)
diff --git a/modules/airavata-helix/agent-api/pom.xml b/modules/airavata-helix/agent-api/pom.xml
index f4ac36e..25e5fd6 100644
--- a/modules/airavata-helix/agent-api/pom.xml
+++ b/modules/airavata-helix/agent-api/pom.xml
@@ -41,9 +41,20 @@
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
- <artifactId>airavata-registry-core</artifactId>
+ <artifactId>registry-api-stubs</artifactId>
<version>0.17-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-credential-store-stubs</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<!--<build>
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java
new file mode 100644
index 0000000..fa6c448
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java
@@ -0,0 +1,34 @@
+package org.apache.airavata.agents.api;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.client.CredentialStoreClientFactory;
+import org.apache.airavata.credential.store.cpi.CredentialStoreService;
+import org.apache.airavata.credential.store.exception.CredentialStoreException;
+import org.apache.airavata.registry.api.RegistryService;
+import org.apache.airavata.registry.api.client.RegistryServiceClientFactory;
+import org.apache.airavata.registry.api.exception.RegistryServiceException;
+
+public class AgentUtils {
+
+ // TODO this is inefficient. Try to use a connection pool
+ public static RegistryService.Client getRegistryServiceClient() throws AgentException {
+ try {
+ final int serverPort = Integer.parseInt(ServerSettings.getRegistryServerPort());
+ final String serverHost = ServerSettings.getRegistryServerHost();
+ return RegistryServiceClientFactory.createRegistryClient(serverHost, serverPort);
+ } catch (RegistryServiceException | ApplicationSettingsException e) {
+ throw new AgentException("Unable to create registry client...", e);
+ }
+ }
+
+ public static CredentialStoreService.Client getCredentialClient() throws AgentException {
+ try {
+ final int serverPort = Integer.parseInt(ServerSettings.getCredentialStoreServerPort());
+ final String serverHost =ServerSettings.getCredentialStoreServerHost();
+ return CredentialStoreClientFactory.createAiravataCSClient(serverHost, serverPort);
+ } catch (CredentialStoreException | ApplicationSettingsException e) {
+ throw new AgentException("Unable to create credential client...", e);
+ }
+ }
+}
diff --git a/modules/airavata-helix/agent-impl/pom.xml b/modules/airavata-helix/agent-impl/pom.xml
index f0b8474..ce6317b 100644
--- a/modules/airavata-helix/agent-impl/pom.xml
+++ b/modules/airavata-helix/agent-impl/pom.xml
@@ -35,6 +35,7 @@
<packaging>pom</packaging>
<modules>
<module>ssh-agent</module>
+ <module>sshj-agent</module>
</modules>
<dependencies>
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/pom.xml b/modules/airavata-helix/agent-impl/ssh-agent/pom.xml
index 837b7f3..61f470d 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/pom.xml
+++ b/modules/airavata-helix/agent-impl/ssh-agent/pom.xml
@@ -12,7 +12,6 @@
<name>SSH Agent</name>
<artifactId>ssh-agent</artifactId>
-
<dependencies>
<dependency>
<groupId>com.jcraft</groupId>
@@ -21,34 +20,13 @@
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
- <artifactId>airavata-registry-core</artifactId>
- <version>0.17-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>registry-api-service</artifactId>
- <version>0.17-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>airavata-credential-store</artifactId>
- <version>0.17-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>airavata-credential-store</artifactId>
+ <artifactId>agent-api</artifactId>
<version>0.17-SNAPSHOT</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>agent-api</artifactId>
- <version>0.17-SNAPSHOT</version>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>1.4</version>
</dependency>
</dependencies>
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java
deleted file mode 100644
index cb640f3..0000000
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/local/LocalAgentAdaptor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.helix.agent.local;
-
-import org.apache.airavata.agents.api.AgentAdaptor;
-import org.apache.airavata.agents.api.AgentException;
-import org.apache.airavata.agents.api.CommandOutput;
-
-import java.util.List;
-
-public class LocalAgentAdaptor implements AgentAdaptor {
-
- public void init(Object agentPams) throws AgentException {
- throw new AgentException("Operation not implemented");
- }
-
- @Override
- public void init(String computeResource, String gatewayId, String userId, String token) throws AgentException {
- throw new AgentException("Operation not implemented");
- }
-
- @Override
- public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException {
- throw new AgentException("Operation not implemented");
- }
-
- @Override
- public void createDirectory(String path) throws AgentException {
- throw new AgentException("Operation not implemented");
- }
-
- @Override
- public void copyFileTo(String localFile, String remoteFile) throws AgentException {
- throw new AgentException("Operation not implemented");
- }
-
- @Override
- public void copyFileFrom(String remoteFile, String localFile) throws AgentException {
- throw new AgentException("Operation not implemented");
- }
-
-
- @Override
- public List<String> listDirectory(String path) throws AgentException {
- throw new AgentException("Operation not implemented");
- }
-
- @Override
- public Boolean doesFileExist(String filePath) throws AgentException {
- throw new AgentException("Operation not implemented");
- }
-
- @Override
- public List<String> getFileNameFromExtension(String fileName, String parentPath) throws AgentException {
- throw new AgentException("Operation not implemented");
- }
-}
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
index bf899e0..9c26f6a 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
@@ -21,25 +21,14 @@ package org.apache.airavata.helix.agent.ssh;
import com.jcraft.jsch.*;
import org.apache.airavata.agents.api.*;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.DBUtil;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.credential.store.client.CredentialStoreClientFactory;
-import org.apache.airavata.credential.store.cpi.CredentialStoreService;
import org.apache.airavata.model.credential.store.SSHCredential;
-import org.apache.airavata.credential.store.exception.CredentialStoreException;
-import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
import org.apache.airavata.model.appcatalog.computeresource.*;
-import org.apache.airavata.registry.api.RegistryService;
-import org.apache.airavata.registry.api.client.RegistryServiceClientFactory;
-import org.apache.airavata.registry.api.exception.RegistryServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.Arrays;
import java.util.List;
-import java.util.Optional;
import java.util.UUID;
/**
@@ -92,11 +81,11 @@ public class SshAgentAdaptor implements AgentAdaptor {
@Override
public void init(String computeResourceId, String gatewayId, String userId, String token) throws AgentException {
try {
- ComputeResourceDescription computeResourceDescription = getRegistryServiceClient().getComputeResource(computeResourceId);
+ ComputeResourceDescription computeResourceDescription = AgentUtils.getRegistryServiceClient().getComputeResource(computeResourceId);
logger.info("Fetching credentials for cred store token " + token);
- SSHCredential sshCredential = getCredentialClient().getSSHCredential(token, gatewayId);
+ SSHCredential sshCredential = AgentUtils.getCredentialClient().getSSHCredential(token, gatewayId);
if (sshCredential == null) {
throw new AgentException("Null credential for token " + token);
}
@@ -117,27 +106,6 @@ public class SshAgentAdaptor implements AgentAdaptor {
}
}
- // TODO this is inefficient. Try to use a connection pool
- public static RegistryService.Client getRegistryServiceClient() throws AgentException {
- try {
- final int serverPort = Integer.parseInt(ServerSettings.getRegistryServerPort());
- final String serverHost = ServerSettings.getRegistryServerHost();
- return RegistryServiceClientFactory.createRegistryClient(serverHost, serverPort);
- } catch (RegistryServiceException | ApplicationSettingsException e) {
- throw new AgentException("Unable to create registry client...", e);
- }
- }
-
- public static CredentialStoreService.Client getCredentialClient() throws AgentException {
- try {
- final int serverPort = Integer.parseInt(ServerSettings.getCredentialStoreServerPort());
- final String serverHost =ServerSettings.getCredentialStoreServerHost();
- return CredentialStoreClientFactory.createAiravataCSClient(serverHost, serverPort);
- } catch (CredentialStoreException | ApplicationSettingsException e) {
- throw new AgentException("Unable to create credential client...", e);
- }
- }
-
public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException {
StandardOutReader commandOutput = new StandardOutReader();
ChannelExec channelExec = null;
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
index 2e62a63..c36e34b 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
@@ -20,6 +20,7 @@
package org.apache.airavata.helix.agent.storage;
import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.AgentUtils;
import org.apache.airavata.agents.api.CommandOutput;
import org.apache.airavata.agents.api.StorageResourceAdaptor;
import org.apache.airavata.helix.agent.ssh.SshAdaptorParams;
@@ -39,11 +40,11 @@ public class StorageResourceAdaptorImpl extends SshAgentAdaptor implements Stora
try {
logger.info("Initializing Storage Resource Adaptor for storage resource : "+ storageResourceId + ", gateway : " +
gatewayId +", user " + loginUser + ", token : " + token);
- StorageResourceDescription storageResource = getRegistryServiceClient().getStorageResource(storageResourceId);
+ StorageResourceDescription storageResource = AgentUtils.getRegistryServiceClient().getStorageResource(storageResourceId);
logger.info("Fetching credentials for cred store token " + token);
- SSHCredential sshCredential = getCredentialClient().getSSHCredential(token, gatewayId);
+ SSHCredential sshCredential = AgentUtils.getCredentialClient().getSSHCredential(token, gatewayId);
if (sshCredential == null) {
throw new AgentException("Null credential for token " + token);
}
diff --git a/modules/airavata-helix/agent-api/pom.xml b/modules/airavata-helix/agent-impl/sshj-agent/pom.xml
similarity index 61%
copy from modules/airavata-helix/agent-api/pom.xml
copy to modules/airavata-helix/agent-impl/sshj-agent/pom.xml
index f4ac36e..78351d5 100644
--- a/modules/airavata-helix/agent-api/pom.xml
+++ b/modules/airavata-helix/agent-impl/sshj-agent/pom.xml
@@ -24,40 +24,41 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>airavata-helix</artifactId>
+ <artifactId>agent-impl</artifactId>
<groupId>org.apache.airavata</groupId>
<version>0.17-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
- <name>Agent API</name>
- <artifactId>agent-api</artifactId>
+
+ <artifactId>sshj-agent</artifactId>
<dependencies>
<dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- <version>1.8.5</version>
+ <groupId>com.hierynomus</groupId>
+ <artifactId>sshj</artifactId>
+ <version>0.23.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcpkix-jdk15on</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk15on</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
- <artifactId>airavata-registry-core</artifactId>
+ <artifactId>ssh-agent</artifactId>
<version>0.17-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
- <!--<build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.5.1</version>
- <configuration>
- <source>${java.version}</source>
- <target>${java.version}</target>
- </configuration>
- </plugin>
- </plugins>
- </build>-->
-
</project>
\ No newline at end of file
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
new file mode 100644
index 0000000..768b4e0
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/PoolingSSHJClient.java
@@ -0,0 +1,345 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.adaptor;
+
+import net.schmizz.sshj.Config;
+import net.schmizz.sshj.SSHClient;
+import net.schmizz.sshj.common.DisconnectReason;
+import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.sftp.RemoteFile;
+import net.schmizz.sshj.sftp.SFTPClient;
+import net.schmizz.sshj.transport.DisconnectListener;
+import net.schmizz.sshj.transport.TransportException;
+import net.schmizz.sshj.transport.verification.HostKeyVerifier;
+import net.schmizz.sshj.userauth.UserAuthException;
+import net.schmizz.sshj.userauth.method.AuthMethod;
+import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
+import org.apache.airavata.helix.adaptor.wrapper.SFTPClientWrapper;
+import org.apache.airavata.helix.adaptor.wrapper.SessionWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This class will keep a pool of {@link SSHClient} and scale them according to the number of SSH requests.
+ * This pool is MaxSessions per connection aware and thread safe. It is intelligent to decide the number of connections
+ * that it should create and number of sessions should be used in each created connection to avoid possible connection
+ * refusals from the server side.
+ */
+public class PoolingSSHJClient extends SSHClient {
+
+ private final static Logger logger = LoggerFactory.getLogger(PoolingSSHJClient.class);
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Map<SSHClient, SSHClientInfo> clientInfoMap = new HashMap<>();
+
+ private HostKeyVerifier hostKeyVerifier;
+ private String username;
+ private List<AuthMethod> authMethods;
+ private Config config;
+ private String host;
+ private int port;
+
+ private int maxSessionsForConnection = 10;
+
+ public void addHostKeyVerifier(HostKeyVerifier verifier) {
+ this.hostKeyVerifier = verifier;
+ }
+
+ public void auth(String username, List<AuthMethod> methods) throws UserAuthException, TransportException {
+ this.username = username;
+ this.authMethods = methods;
+ }
+
+ public PoolingSSHJClient(Config config, String host, int port) {
+ this.config = config;
+ this.host = host;
+ this.port = port;
+ }
+
+ ////////////////// client specific operations ///////
+
+ private SSHClient leaseSSHClient() throws Exception {
+ lock.writeLock().lock();
+
+ try {
+ if (clientInfoMap.isEmpty()) {
+ SSHClient newClient = createNewSSHClient();
+ SSHClientInfo info = new SSHClientInfo(1, System.currentTimeMillis());
+ clientInfoMap.put(newClient, info);
+
+ /* if this is the very first connection that is created to the compute host, fetch the MaxSessions
+ * value form SSHD config file in order to tune the pool
+ */
+ logger.info("Fetching max sessions for the connection of " + host);
+ try (SFTPClient sftpClient = newClient.newSFTPClient()) {
+ RemoteFile remoteFile = sftpClient.open("/etc/ssh/sshd_config");
+ byte[] readContent = new byte[(int) remoteFile.length()];
+ remoteFile.read(0, readContent, 0, readContent.length);
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("SSHD config file content : " + new String(readContent));
+ }
+ String[] lines = new String(readContent).split("\n");
+
+ for (String line : lines) {
+ if (line.trim().startsWith("MaxSessions")) {
+ String[] splits = line.split(" ");
+ if (splits.length == 2) {
+ int sessionCount = Integer.parseInt(splits[1]);
+ logger.info("Max session count is : " + sessionCount + " for " + host);
+ setMaxSessionsForConnection(sessionCount);
+ }
+ break;
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("Failed to fetch max session count for " + host + ". Continuing with default value 10. " + e.getMessage() );
+ }
+ return newClient;
+
+ } else {
+
+ Optional<Map.Entry<SSHClient, SSHClientInfo>> minEntryOp = clientInfoMap.entrySet().stream().min(Comparator.comparing(entry -> entry.getValue().sessionCount));
+ if (minEntryOp.isPresent()) {
+ Map.Entry<SSHClient, SSHClientInfo> minEntry = minEntryOp.get();
+ // use the connection with least amount of sessions created.
+
+ logger.debug("Session count for selected connection {}. Threshold {}", minEntry.getValue().getSessionCount(), maxSessionsForConnection);
+ if (minEntry.getValue().getSessionCount() >= maxSessionsForConnection) {
+ // if it exceeds the maximum session count, create a new connection
+ logger.debug("Connection with least amount of sessions exceeds the threshold. So creating a new connection");
+ SSHClient newClient = createNewSSHClient();
+ SSHClientInfo info = new SSHClientInfo(1, System.currentTimeMillis());
+ clientInfoMap.put(newClient, info);
+ return newClient;
+
+ } else {
+ // otherwise reuse the same connetion
+ logger.debug("Reusing the same connection as it doesn't exceed the threshold");
+ minEntry.getValue().setSessionCount(minEntry.getValue().getSessionCount() + 1);
+ minEntry.getValue().setLastAccessedTime(System.currentTimeMillis());
+ return minEntry.getKey();
+ }
+ } else {
+ throw new Exception("Failed to find a connection in the pool for " + host);
+ }
+ }
+
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private void removeDisconnectedClients(SSHClient client) {
+ lock.writeLock().lock();
+
+ try {
+ if (clientInfoMap.containsKey(client)) {
+ clientInfoMap.remove(client);
+ }
+
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private void untrackClosedSessions(SSHClient client, int sessionId) {
+ lock.writeLock().lock();
+
+ try {
+ if (clientInfoMap.containsKey(client)) {
+ SSHClientInfo sshClientInfo = clientInfoMap.get(client);
+ sshClientInfo.setSessionCount(sshClientInfo.getSessionCount() - 1);
+ }
+
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private SSHClient createNewSSHClient() throws IOException {
+
+ SSHClient sshClient;
+ if (config != null) {
+ sshClient = new SSHClient(config);
+ } else {
+ sshClient = new SSHClient();
+ }
+
+ sshClient.getConnection().getTransport().setDisconnectListener(new DisconnectListener() {
+ @Override
+ public void notifyDisconnect(DisconnectReason reason, String message) {
+ logger.warn("Connection disconnected " + message + " due to " + reason.name());
+ removeDisconnectedClients(sshClient);
+ }
+ });
+
+ if (hostKeyVerifier != null) {
+ sshClient.addHostKeyVerifier(hostKeyVerifier);
+ }
+
+ sshClient.connect(host);
+
+ sshClient.getConnection().getKeepAlive().setKeepAliveInterval(5); //send keep alive signal every 5sec
+
+ if (authMethods != null) {
+ sshClient.auth(username, authMethods);
+ }
+
+ return sshClient;
+ }
+
+ public Session startSessionWrapper() throws Exception {
+
+ final SSHClient sshClient = leaseSSHClient();
+
+ try {
+ return new SessionWrapper(sshClient.startSession(), (id) -> untrackClosedSessions(sshClient, id));
+
+ } catch (Exception e) {
+ if (sshClient != null) {
+ untrackClosedSessions(sshClient, -1);
+ }
+ throw e;
+ }
+ }
+
+ public SCPFileTransferWrapper newSCPFileTransferWrapper() throws Exception {
+
+ final SSHClient sshClient = leaseSSHClient();
+
+ try {
+ return new SCPFileTransferWrapper(sshClient.newSCPFileTransfer(), (id) -> untrackClosedSessions(sshClient, id));
+
+ } catch (Exception e) {
+ if (sshClient != null) {
+ untrackClosedSessions(sshClient, -1);
+ }
+ throw e;
+ }
+ }
+
+ public SFTPClient newSFTPClientWrapper() throws Exception {
+
+ final SSHClient sshClient = leaseSSHClient();
+
+ try {
+ return new SFTPClientWrapper(sshClient.newSFTPClient(), (id) -> untrackClosedSessions(sshClient, id));
+ } catch (Exception e) {
+
+ if (sshClient != null) {
+ untrackClosedSessions(sshClient, -1);
+ }
+ throw e;
+ }
+ }
+
+ public class SSHClientInfo {
+
+ private int sessionCount;
+ private long lastAccessedTime;
+
+ public SSHClientInfo(int sessionCount, long lastAccessedTime) {
+ this.sessionCount = sessionCount;
+ this.lastAccessedTime = lastAccessedTime;
+ }
+
+ public int getSessionCount() {
+ return sessionCount;
+ }
+
+ public SSHClientInfo setSessionCount(int sessionCount) {
+ this.sessionCount = sessionCount;
+ return this;
+ }
+
+ public long getLastAccessedTime() {
+ return lastAccessedTime;
+ }
+
+ public SSHClientInfo setLastAccessedTime(long lastAccessedTime) {
+ this.lastAccessedTime = lastAccessedTime;
+ return this;
+ }
+ }
+
+ public HostKeyVerifier getHostKeyVerifier() {
+ return hostKeyVerifier;
+ }
+
+ public PoolingSSHJClient setHostKeyVerifier(HostKeyVerifier hostKeyVerifier) {
+ this.hostKeyVerifier = hostKeyVerifier;
+ return this;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public PoolingSSHJClient setUsername(String username) {
+ this.username = username;
+ return this;
+ }
+
+ public Config getConfig() {
+ return config;
+ }
+
+ public PoolingSSHJClient setConfig(Config config) {
+ this.config = config;
+ return this;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public PoolingSSHJClient setHost(String host) {
+ this.host = host;
+ return this;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public PoolingSSHJClient setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public int getMaxSessionsForConnection() {
+ return maxSessionsForConnection;
+ }
+
+ public PoolingSSHJClient setMaxSessionsForConnection(int maxSessionsForConnection) {
+ this.maxSessionsForConnection = maxSessionsForConnection;
+ return this;
+ }
+
+ public Map<SSHClient, SSHClientInfo> getClientInfoMap() {
+ return clientInfoMap;
+ }
+}
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
new file mode 100644
index 0000000..9b3d18c
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
@@ -0,0 +1,217 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.adaptor;
+
+import net.schmizz.keepalive.KeepAliveProvider;
+import net.schmizz.sshj.DefaultConfig;
+import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.sftp.*;
+import net.schmizz.sshj.userauth.keyprovider.KeyProvider;
+import net.schmizz.sshj.userauth.method.AuthKeyboardInteractive;
+import net.schmizz.sshj.userauth.method.AuthMethod;
+import net.schmizz.sshj.userauth.method.AuthPublickey;
+import net.schmizz.sshj.userauth.method.ChallengeResponseProvider;
+import net.schmizz.sshj.userauth.password.PasswordFinder;
+import net.schmizz.sshj.userauth.password.PasswordUtils;
+import net.schmizz.sshj.userauth.password.Resource;
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.AgentUtils;
+import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
+import org.apache.airavata.helix.agent.ssh.StandardOutReader;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.credential.store.SSHCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SSHJAgentAdaptor implements AgentAdaptor {
+
+ private final static Logger logger = LoggerFactory.getLogger(SSHJAgentAdaptor.class);
+
+ private PoolingSSHJClient sshjClient;
+
+ protected void createPoolingSSHJClient(String user, String host, String publicKey, String privateKey, String passphrase) throws IOException {
+ DefaultConfig defaultConfig = new DefaultConfig();
+ defaultConfig.setKeepAliveProvider(KeepAliveProvider.KEEP_ALIVE);
+
+ sshjClient = new PoolingSSHJClient(defaultConfig, host, 22);
+ sshjClient.addHostKeyVerifier((hostname, port, key) -> true);
+
+ sshjClient.setMaxSessionsForConnection(10);
+
+ PasswordFinder passwordFinder = passphrase != null ? PasswordUtils.createOneOff(passphrase.toCharArray()) : null;
+
+ KeyProvider keyProvider = sshjClient.loadKeys(privateKey, publicKey, passwordFinder);
+
+ final List<AuthMethod> am = new LinkedList<>();
+ am.add(new AuthPublickey(keyProvider));
+
+ am.add(new AuthKeyboardInteractive(new ChallengeResponseProvider() {
+ @Override
+ public List<String> getSubmethods() {
+ return null;
+ }
+
+ @Override
+ public void init(Resource resource, String name, String instruction) {
+
+ }
+
+ @Override
+ public char[] getResponse(String prompt, boolean echo) {
+ return new char[0];
+ }
+
+ @Override
+ public boolean shouldRetry() {
+ return false;
+ }
+ }));
+
+ sshjClient.auth(user, am);
+ }
+
+ @Override
+ public void init(String computeResource, String gatewayId, String userId, String token) throws AgentException {
+ try {
+ ComputeResourceDescription computeResourceDescription = AgentUtils.getRegistryServiceClient().getComputeResource(computeResource);
+
+ logger.info("Fetching credentials for cred store token " + token);
+
+ SSHCredential sshCredential = AgentUtils.getCredentialClient().getSSHCredential(token, gatewayId);
+ if (sshCredential == null) {
+ throw new AgentException("Null credential for token " + token);
+ }
+ logger.info("Description for token : " + token + " : " + sshCredential.getDescription());
+
+ createPoolingSSHJClient(userId, computeResourceDescription.getHostName(),
+ sshCredential.getPublicKey(), sshCredential.getPrivateKey(), sshCredential.getPassphrase());
+
+ } catch (Exception e) {
+ logger.error("Error while initializing ssh agent for compute resource " + computeResource + " to token " + token, e);
+ throw new AgentException("Error while initializing ssh agent for compute resource " + computeResource + " to token " + token, e);
+ }
+ }
+
+ @Override
+ public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException {
+ try (Session session = sshjClient.startSessionWrapper()) {
+ Session.Command exec = session.exec((workingDirectory != null ? "cd " + workingDirectory + "; " : "") + command);
+ StandardOutReader standardOutReader = new StandardOutReader();
+ standardOutReader.readStdOutFromStream(exec.getInputStream());
+ standardOutReader.readStdErrFromStream(exec.getErrorStream());
+ standardOutReader.setExitCode(exec.getExitStatus());
+ return standardOutReader;
+ } catch (Exception e) {
+ throw new AgentException(e);
+ }
+ }
+
+ @Override
+ public void createDirectory(String path) throws AgentException {
+ try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+ sftpClient.mkdir(path);
+ } catch (Exception e) {
+ throw new AgentException(e);
+ }
+ }
+
+ @Override
+ public void copyFileTo(String localFile, String remoteFile) throws AgentException {
+ try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+ fileTransfer.upload(localFile, remoteFile);
+ } catch (Exception e) {
+ throw new AgentException(e);
+ }
+ }
+
+ @Override
+ public void copyFileFrom(String remoteFile, String localFile) throws AgentException {
+ try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+ fileTransfer.download(remoteFile, localFile);
+ } catch (Exception e) {
+ throw new AgentException(e);
+ }
+ }
+
+ @Override
+ public List<String> listDirectory(String path) throws AgentException {
+ try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+ List<RemoteResourceInfo> ls = sftpClient.ls(path);
+ return ls.stream().map(RemoteResourceInfo::getName).collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new AgentException(e);
+ }
+ }
+
+ @Override
+ public Boolean doesFileExist(String filePath) throws AgentException {
+ try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+ return sftpClient.statExistence(filePath) != null;
+ } catch (Exception e) {
+ throw new AgentException(e);
+ }
+ }
+
+ @Override
+ public List<String> getFileNameFromExtension(String fileName, String parentPath) throws AgentException {
+
+ try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+ List<RemoteResourceInfo> ls = sftpClient.ls(parentPath, resource -> isMatch(resource.getName(), fileName));
+ return ls.stream().map(RemoteResourceInfo::getPath).collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new AgentException(e);
+ }
+ }
+
+ private boolean isMatch(String s, String p) {
+ int i = 0;
+ int j = 0;
+ int starIndex = -1;
+ int iIndex = -1;
+
+ while (i < s.length()) {
+ if (j < p.length() && (p.charAt(j) == '?' || p.charAt(j) == s.charAt(i))) {
+ ++i;
+ ++j;
+ } else if (j < p.length() && p.charAt(j) == '*') {
+ starIndex = j;
+ iIndex = i;
+ j++;
+ } else if (starIndex != -1) {
+ j = starIndex + 1;
+ i = iIndex+1;
+ iIndex++;
+ } else {
+ return false;
+ }
+ }
+ while (j < p.length() && p.charAt(j) == '*') {
+ ++j;
+ }
+ return j == p.length();
+ }
+}
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
similarity index 72%
copy from modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
copy to modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
index 2e62a63..811cac2 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
@@ -17,46 +17,38 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.airavata.helix.agent.storage;
+package org.apache.airavata.helix.adaptor;
import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.AgentUtils;
import org.apache.airavata.agents.api.CommandOutput;
import org.apache.airavata.agents.api.StorageResourceAdaptor;
-import org.apache.airavata.helix.agent.ssh.SshAdaptorParams;
-import org.apache.airavata.helix.agent.ssh.SshAgentAdaptor;
import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
import org.apache.airavata.model.credential.store.SSHCredential;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StorageResourceAdaptorImpl extends SshAgentAdaptor implements StorageResourceAdaptor {
+public class SSHJStorageAdaptor extends SSHJAgentAdaptor implements StorageResourceAdaptor {
- private final static Logger logger = LoggerFactory.getLogger(StorageResourceAdaptorImpl.class);
+ private final static Logger logger = LoggerFactory.getLogger(SSHJAgentAdaptor.class);
@Override
public void init(String storageResourceId, String gatewayId, String loginUser, String token) throws AgentException {
-
try {
logger.info("Initializing Storage Resource Adaptor for storage resource : "+ storageResourceId + ", gateway : " +
gatewayId +", user " + loginUser + ", token : " + token);
- StorageResourceDescription storageResource = getRegistryServiceClient().getStorageResource(storageResourceId);
+ StorageResourceDescription storageResource = AgentUtils.getRegistryServiceClient().getStorageResource(storageResourceId);
logger.info("Fetching credentials for cred store token " + token);
- SSHCredential sshCredential = getCredentialClient().getSSHCredential(token, gatewayId);
+ SSHCredential sshCredential = AgentUtils.getCredentialClient().getSSHCredential(token, gatewayId);
if (sshCredential == null) {
throw new AgentException("Null credential for token " + token);
}
logger.info("Description for token : " + token + " : " + sshCredential.getDescription());
- SshAdaptorParams adaptorParams = new SshAdaptorParams();
- adaptorParams.setHostName(storageResource.getHostName());
- adaptorParams.setUserName(loginUser);
- adaptorParams.setPassphrase(sshCredential.getPassphrase());
- adaptorParams.setPrivateKey(sshCredential.getPrivateKey().getBytes());
- adaptorParams.setPublicKey(sshCredential.getPublicKey().getBytes());
- adaptorParams.setStrictHostKeyChecking(false);
- init(adaptorParams);
+ createPoolingSSHJClient(loginUser, storageResource.getHostName(), sshCredential.getPublicKey(),
+ sshCredential.getPrivateKey(), sshCredential.getPassphrase());
} catch (Exception e) {
logger.error("Error while initializing ssh agent for storage resource " + storageResourceId + " to token " + token, e);
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java
new file mode 100644
index 0000000..ff49e5b
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SCPFileTransferWrapper.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.adaptor.wrapper;
+
+import net.schmizz.sshj.xfer.FileTransfer;
+import net.schmizz.sshj.xfer.LocalDestFile;
+import net.schmizz.sshj.xfer.LocalSourceFile;
+import net.schmizz.sshj.xfer.TransferListener;
+import net.schmizz.sshj.xfer.scp.SCPFileTransfer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.function.Consumer;
+
+public class SCPFileTransferWrapper implements FileTransfer, Closeable {
+
+ private SCPFileTransfer scpFileTransfer;
+ private Consumer<Integer> onCloseFunction;
+
+ public SCPFileTransferWrapper(SCPFileTransfer scpFileTransfer, Consumer<Integer> onCloseFunction) {
+ this.scpFileTransfer = scpFileTransfer;
+ this.onCloseFunction = onCloseFunction;
+ }
+
+ @Override
+ public void upload(String localPath, String remotePath) throws IOException {
+ scpFileTransfer.upload(localPath, remotePath);
+ }
+
+ @Override
+ public void download(String remotePath, String localPath) throws IOException {
+ scpFileTransfer.download(remotePath, localPath);
+ }
+
+ @Override
+ public void upload(LocalSourceFile localFile, String remotePath) throws IOException {
+ scpFileTransfer.upload(localFile, remotePath);
+ }
+
+ @Override
+ public void download(String remotePath, LocalDestFile localFile) throws IOException {
+ scpFileTransfer.download(remotePath, localFile);
+ }
+
+ @Override
+ public TransferListener getTransferListener() {
+ return scpFileTransfer.getTransferListener();
+ }
+
+ @Override
+ public void setTransferListener(TransferListener listener) {
+ scpFileTransfer.setTransferListener(listener);
+ }
+
+ @Override
+ public void close() throws IOException {
+ onCloseFunction.accept(-1);
+ }
+}
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java
new file mode 100644
index 0000000..ed21510
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SFTPClientWrapper.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.adaptor.wrapper;
+
+import net.schmizz.sshj.sftp.SFTPClient;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+public class SFTPClientWrapper extends SFTPClient {
+ private SFTPClient sftpClient;
+ private Consumer<Integer> onCloseFunction;
+
+ public SFTPClientWrapper(SFTPClient sftpClient, Consumer<Integer> onCloseFunction) {
+ super(sftpClient.getSFTPEngine());
+ this.onCloseFunction = onCloseFunction;
+ }
+
+ @Override
+ public void close() throws IOException {
+ onCloseFunction.accept(-1);
+ super.close();
+ }
+}
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java
new file mode 100644
index 0000000..a1899b1
--- /dev/null
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/wrapper/SessionWrapper.java
@@ -0,0 +1,183 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.helix.adaptor.wrapper;
+
+import net.schmizz.sshj.common.LoggerFactory;
+import net.schmizz.sshj.common.Message;
+import net.schmizz.sshj.common.SSHException;
+import net.schmizz.sshj.common.SSHPacket;
+import net.schmizz.sshj.connection.ConnectionException;
+import net.schmizz.sshj.connection.channel.direct.PTYMode;
+import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.transport.TransportException;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+public class SessionWrapper implements Session {
+
+ private Session session;
+ private Consumer<Integer> onCloseFunction;
+
+ public SessionWrapper(Session session, Consumer<Integer> onCloseFunction) {
+ this.session = session;
+ this.onCloseFunction = onCloseFunction;
+ }
+
+ @Override
+ public void allocateDefaultPTY() throws ConnectionException, TransportException {
+ session.allocateDefaultPTY();
+ }
+
+ @Override
+ public void allocatePTY(String term, int cols, int rows, int width, int height, Map<PTYMode, Integer> modes) throws ConnectionException, TransportException {
+ session.allocatePTY(term, cols, rows, width, height, modes);
+ }
+
+ @Override
+ public Command exec(String command) throws ConnectionException, TransportException {
+ return session.exec(command);
+ }
+
+ @Override
+ public void reqX11Forwarding(String authProto, String authCookie, int screen) throws ConnectionException, TransportException {
+ session.reqX11Forwarding(authProto, authCookie, screen);
+ }
+
+ @Override
+ public void setEnvVar(String name, String value) throws ConnectionException, TransportException {
+ session.setEnvVar(name, value);
+ }
+
+ @Override
+ public Shell startShell() throws ConnectionException, TransportException {
+ return session.startShell();
+ }
+
+ @Override
+ public Subsystem startSubsystem(String name) throws ConnectionException, TransportException {
+ return session.startSubsystem(name);
+ }
+
+ @Override
+ public void close() throws TransportException, ConnectionException {
+ onCloseFunction.accept(getID());
+ session.close();
+ }
+
+ @Override
+ public boolean getAutoExpand() {
+ return session.getAutoExpand();
+ }
+
+ @Override
+ public int getID() {
+ return session.getID();
+ }
+
+ @Override
+ public InputStream getInputStream() {
+ return session.getInputStream();
+ }
+
+ @Override
+ public int getLocalMaxPacketSize() {
+ return session.getLocalMaxPacketSize();
+ }
+
+ @Override
+ public long getLocalWinSize() {
+ return session.getLocalWinSize();
+ }
+
+ @Override
+ public OutputStream getOutputStream() {
+ return session.getOutputStream();
+ }
+
+ @Override
+ public int getRecipient() {
+ return session.getRecipient();
+ }
+
+ @Override
+ public Charset getRemoteCharset() {
+ return session.getRemoteCharset();
+ }
+
+ @Override
+ public int getRemoteMaxPacketSize() {
+ return session.getRemoteMaxPacketSize();
+ }
+
+ @Override
+ public long getRemoteWinSize() {
+ return session.getRemoteWinSize();
+ }
+
+ @Override
+ public String getType() {
+ return session.getType();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return session.isOpen();
+ }
+
+ @Override
+ public void setAutoExpand(boolean autoExpand) {
+ session.setAutoExpand(autoExpand);
+ }
+
+ @Override
+ public void join() throws ConnectionException {
+ session.join();
+ }
+
+ @Override
+ public void join(long timeout, TimeUnit unit) throws ConnectionException {
+ session.join(timeout, unit);
+ }
+
+ @Override
+ public boolean isEOF() {
+ return session.isEOF();
+ }
+
+ @Override
+ public LoggerFactory getLoggerFactory() {
+ return session.getLoggerFactory();
+ }
+
+ @Override
+ public void notifyError(SSHException error) {
+ session.notifyError(error);
+ }
+
+ @Override
+ public void handle(Message msg, SSHPacket buf) throws SSHException {
+ session.handle(msg, buf);
+ }
+}
diff --git a/modules/airavata-helix/helix-spectator/pom.xml b/modules/airavata-helix/helix-spectator/pom.xml
index 357c472..d479713 100644
--- a/modules/airavata-helix/helix-spectator/pom.xml
+++ b/modules/airavata-helix/helix-spectator/pom.xml
@@ -95,5 +95,15 @@
<artifactId>logstash-logback-encoder</artifactId>
<version>5.0</version>
</dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcpkix-jdk15on</artifactId>
+ <version>1.56</version>
+ </dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk15on</artifactId>
+ <version>1.56</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/modules/airavata-helix/task-core/pom.xml b/modules/airavata-helix/task-core/pom.xml
index cdebe2f..e0e62e0 100644
--- a/modules/airavata-helix/task-core/pom.xml
+++ b/modules/airavata-helix/task-core/pom.xml
@@ -46,7 +46,7 @@
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
- <artifactId>ssh-agent</artifactId>
+ <artifactId>sshj-agent</artifactId>
<version>0.17-SNAPSHOT</version>
</dependency>
</dependencies>
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
index f1bb854..e06f150 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
@@ -20,6 +20,8 @@
package org.apache.airavata.helix.core.support;
import org.apache.airavata.agents.api.*;
+import org.apache.airavata.helix.adaptor.SSHJAgentAdaptor;
+import org.apache.airavata.helix.adaptor.SSHJStorageAdaptor;
import org.apache.airavata.helix.agent.ssh.SshAgentAdaptor;
import org.apache.airavata.helix.agent.storage.StorageResourceAdaptorImpl;
import org.apache.airavata.helix.task.api.support.AdaptorSupport;
@@ -55,7 +57,7 @@ public class AdaptorSupportImpl implements AdaptorSupport {
switch (protocol) {
case SSH:
- SshAgentAdaptor agentAdaptor = new SshAgentAdaptor();
+ SSHJAgentAdaptor agentAdaptor = new SSHJAgentAdaptor();
agentAdaptor.init(computeResource, gatewayId, userId, authToken);
return agentAdaptor;
default:
@@ -66,7 +68,7 @@ public class AdaptorSupportImpl implements AdaptorSupport {
@Override
public StorageResourceAdaptor fetchStorageAdaptor(String gatewayId, String storageResourceId, String protocol, String authToken, String userId) throws AgentException {
- StorageResourceAdaptor storageResourceAdaptor = new StorageResourceAdaptorImpl();
+ SSHJStorageAdaptor storageResourceAdaptor = new SSHJStorageAdaptor();
storageResourceAdaptor.init(storageResourceId, gatewayId, userId, authToken);
return storageResourceAdaptor;
}
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.