You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/12/10 05:40:47 UTC
[airavata-mft] branch master updated: Added initial Agent
implementation
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new db118e3 Added initial Agent implementation
db118e3 is described below
commit db118e3f0cfcc44c91f32a560fc04be3d5912a51
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Dec 10 00:40:35 2019 -0500
Added initial Agent implementation
---
.../org/apache/airavata/mft/agent/MFTAgent.java | 67 ++++++++++++++++++++++
.../apache/airavata/mft/agent/TransferRequest.java | 58 +++++++++++++++++++
.../airavata/mft/agent/TransportMediator.java | 58 +++++++++++++++++++
.../apache/airavata/mft/core/api/Connector.java | 2 +-
.../airavata/mft/transport/scp/SCPReceiver.java | 7 ++-
.../airavata/mft/transport/scp/SCPSender.java | 6 +-
.../mft/transport/scp/TransportMediator.java | 44 --------------
7 files changed, 195 insertions(+), 47 deletions(-)
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 43da9b6..de4fe72 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -1,5 +1,72 @@
package org.apache.airavata.mft.agent;
+import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.transport.scp.SCPMetadataCollector;
+import org.apache.airavata.mft.transport.scp.SCPReceiver;
+import org.apache.airavata.mft.transport.scp.SCPSender;
+
+import java.util.ArrayList;
+import java.util.List;
+
public class MFTAgent {
+ private List<TransferRequest> requests = new ArrayList<>();
+ private TransportMediator mediator = new TransportMediator();
+
+ public void acceptRequests() {
+ for (TransferRequest request : requests) {
+
+ try {
+ Connector inConnector = resolveConnector(request.getSourceType(), "IN");
+ inConnector.init(request.getSourceId(), request.getSourceToken());
+ Connector outConnector = resolveConnector(request.getDestinationType(), "OUT");
+ outConnector.init(request.getDestinationId(), request.getDestinationToken());
+
+ MetadataCollector metadataCollector = resolveMetadataCollector(request.getSourceType());
+ ResourceMetadata metadata = metadataCollector.getGetResourceMetadata(request.getSourceId(), request.getSourceToken());
+
+ mediator.transfer(inConnector, outConnector, metadata);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ public static void main(String args[]) {
+ TransferRequest request = new TransferRequest();
+ request.setSourceId("1");
+ request.setSourceType("SCP");
+ request.setDestinationId("2");
+ request.setDestinationType("SCP");
+
+ MFTAgent agent = new MFTAgent();
+ agent.requests.add(request);
+ agent.acceptRequests();
+ }
+
+ // TODO load from reflection to avoid dependencies
+ private Connector resolveConnector(String type, String direction) {
+ switch (type) {
+ case "SCP":
+ switch (direction) {
+ case "IN":
+ return new SCPReceiver();
+ case "OUT":
+ return new SCPSender();
+ }
+ break;
+ }
+ return null;
+ }
+
+ // TODO load from reflection to avoid dependencies
+ private MetadataCollector resolveMetadataCollector(String type) {
+ switch (type) {
+ case "SCP":
+ return new SCPMetadataCollector();
+ }
+ return null;
+ }
}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransferRequest.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransferRequest.java
new file mode 100644
index 0000000..42c4b40
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransferRequest.java
@@ -0,0 +1,58 @@
+package org.apache.airavata.mft.agent;
+
+public class TransferRequest {
+ private String sourceId;
+ private String sourceType;
+ private String sourceToken;
+ private String destinationId;
+ private String destinationType;
+ private String destinationToken;
+
+ public String getSourceId() {
+ return sourceId;
+ }
+
+ public void setSourceId(String sourceId) {
+ this.sourceId = sourceId;
+ }
+
+ public String getDestinationId() {
+ return destinationId;
+ }
+
+ public void setDestinationId(String destinationId) {
+ this.destinationId = destinationId;
+ }
+
+ public String getSourceType() {
+ return sourceType;
+ }
+
+ public void setSourceType(String sourceType) {
+ this.sourceType = sourceType;
+ }
+
+ public String getDestinationType() {
+ return destinationType;
+ }
+
+ public void setDestinationType(String destinationType) {
+ this.destinationType = destinationType;
+ }
+
+ public String getSourceToken() {
+ return sourceToken;
+ }
+
+ public void setSourceToken(String sourceToken) {
+ this.sourceToken = sourceToken;
+ }
+
+ public String getDestinationToken() {
+ return destinationToken;
+ }
+
+ public void setDestinationToken(String destinationToken) {
+ this.destinationToken = destinationToken;
+ }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
new file mode 100644
index 0000000..aef03d1
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -0,0 +1,58 @@
+package org.apache.airavata.mft.agent;
+
+import net.ladenthin.streambuffer.StreamBuffer;
+import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.TransferTask;
+import org.apache.airavata.mft.core.api.Connector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+public class TransportMediator {
+
+ private ExecutorService executor = Executors.newFixedThreadPool(10);
+
+ public void destroy() {
+ executor.shutdown();
+ }
+
+ public void transfer(Connector inConnector, Connector outConnector, ResourceMetadata metadata) throws Exception {
+
+ StreamBuffer streamBuffer = new StreamBuffer();
+ ConnectorContext context = new ConnectorContext();
+ context.setMetadata(metadata);
+ context.setStreamBuffer(streamBuffer);
+
+ TransferTask recvTask = new TransferTask(inConnector, context);
+ TransferTask sendTask = new TransferTask(outConnector, context);
+ List<Future<Integer>> futureList = new ArrayList<>();
+
+ ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
+
+ futureList.add(completionService.submit(recvTask));
+ futureList.add(completionService.submit(sendTask));
+
+ for (int i = 0; i < futureList.size(); i++) {
+ Future<Integer> ft = completionService.take();
+ futureList.remove(ft);
+ try {
+ ft.get();
+ } catch(InterruptedException e){
+ // Interrupted
+ } catch(ExecutionException e){
+ // Snap, something went wrong in the task! Abort! Abort! Abort!
+ System.out.println("One task failed with error: " + e.getMessage() );
+ e.printStackTrace();
+ for(Future<Integer> f : futureList){
+ f.cancel(true);
+ }
+ futureList.clear();
+ }
+ }
+
+ //inConnector.destroy();
+ //outConnector.destroy();
+ }
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java b/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
index 57532fb..d801956 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
@@ -3,7 +3,7 @@ package org.apache.airavata.mft.core.api;
import org.apache.airavata.mft.core.ConnectorContext;
public interface Connector {
- public void init(String resourceId, String credentialToken);
+ public void init(String resourceId, String credentialToken) throws Exception;
public void destroy();
void startStream(ConnectorContext context) throws Exception;
}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
index 091df74..3122436 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
@@ -14,7 +14,7 @@ public class SCPReceiver implements Connector {
private Session session;
private SSHResourceIdentifier sshResourceIdentifier;
- public void init(String resourceId, String credentialToken) {
+ public void init(String resourceId, String credentialToken) throws Exception {
this.sshResourceIdentifier = SCPTransportUtil.getSSHResourceIdentifier(resourceId);
this.session = SCPTransportUtil.createSession(sshResourceIdentifier.getUser(), sshResourceIdentifier.getHost(),
sshResourceIdentifier.getPort(),
@@ -27,6 +27,11 @@ public class SCPReceiver implements Connector {
}
public void startStream(ConnectorContext context) throws Exception {
+ if (session == null) {
+ System.out.println("Session can not be null. Make sure that SCP Receiver is properly initialized");
+ throw new Exception("Session can not be null. Make sure that SCP Receiver is properly initialized");
+ }
+
transferRemoteToStream(session, sshResourceIdentifier.getRemotePath(), context.getStreamBuffer());
}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
index 75413ca..9c7cea0 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
@@ -27,11 +27,15 @@ public class SCPSender implements Connector {
}
public void startStream(ConnectorContext context) throws Exception {
+ if (session == null) {
+ System.out.println("Session can not be null. Make sure that SCP Sender is properly initialized");
+ throw new Exception("Session can not be null. Make sure that SCP Sender is properly initialized");
+ }
copyLocalToRemote(this.session, sshResourceIdentifier.getRemotePath(), context.getStreamBuffer(), context.getMetadata().getResourceSize());
}
private void copyLocalToRemote(Session session, String to, StreamBuffer streamBuffer, long fileSize) throws JSchException, IOException {
-
+ System.out.println("Starting scp send");
InputStream inputStream = streamBuffer.getInputStream();
boolean ptimestamp = true;
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/TransportMediator.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/TransportMediator.java
deleted file mode 100644
index 7d92ef0..0000000
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/TransportMediator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.apache.airavata.mft.transport.scp;
-
-import net.ladenthin.streambuffer.StreamBuffer;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.ResourceMetadata;
-import org.apache.airavata.mft.core.TransferTask;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-public class TransportMediator {
-
- private ExecutorService executor = Executors.newFixedThreadPool(10);
-
- public static void main(String args[]) throws Exception {
- TransportMediator mediator = new TransportMediator();
- SCPReceiver receiver = new SCPReceiver();
- receiver.init("1", "");
-
- SCPSender scpSender = new SCPSender();
- scpSender.init("2", "");
-
- SCPMetadataCollector metadataCollector = new SCPMetadataCollector();
- ResourceMetadata metadata = metadataCollector.getGetResourceMetadata("1", "");
- mediator.transfer(receiver, scpSender, metadata);
- }
-
- public void transfer(SCPReceiver receiver, SCPSender sender, ResourceMetadata metadata) throws Exception {
-
- StreamBuffer streamBuffer = new StreamBuffer();
- ConnectorContext context = new ConnectorContext();
- context.setMetadata(metadata);
- context.setStreamBuffer(streamBuffer);
-
- TransferTask recvTask = new TransferTask(receiver, context);
- TransferTask sendTask = new TransferTask(sender, context);
-
- Future<Integer> recvFuture = executor.submit(recvTask);
- Future<Integer> sendFuture = executor.submit(sendTask);
-
- executor.shutdown();
- }
-}