You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/12/10 05:40:47 UTC

[airavata-mft] branch master updated: Added initial Agent implementation

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new db118e3  Added initial Agent implementation
db118e3 is described below

commit db118e3f0cfcc44c91f32a560fc04be3d5912a51
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Dec 10 00:40:35 2019 -0500

    Added initial Agent implementation
---
 .../org/apache/airavata/mft/agent/MFTAgent.java    | 67 ++++++++++++++++++++++
 .../apache/airavata/mft/agent/TransferRequest.java | 58 +++++++++++++++++++
 .../airavata/mft/agent/TransportMediator.java      | 58 +++++++++++++++++++
 .../apache/airavata/mft/core/api/Connector.java    |  2 +-
 .../airavata/mft/transport/scp/SCPReceiver.java    |  7 ++-
 .../airavata/mft/transport/scp/SCPSender.java      |  6 +-
 .../mft/transport/scp/TransportMediator.java       | 44 --------------
 7 files changed, 195 insertions(+), 47 deletions(-)

diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 43da9b6..de4fe72 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -1,5 +1,72 @@
 package org.apache.airavata.mft.agent;
 
+import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.transport.scp.SCPMetadataCollector;
+import org.apache.airavata.mft.transport.scp.SCPReceiver;
+import org.apache.airavata.mft.transport.scp.SCPSender;
+
+import java.util.ArrayList;
+import java.util.List;
+
 public class MFTAgent {
 
+    private List<TransferRequest> requests = new ArrayList<>();
+    private TransportMediator mediator = new TransportMediator();
+
+    public void acceptRequests() {
+        for (TransferRequest request : requests) {
+
+            try {
+                Connector inConnector = resolveConnector(request.getSourceType(), "IN");
+                inConnector.init(request.getSourceId(), request.getSourceToken());
+                Connector outConnector = resolveConnector(request.getDestinationType(), "OUT");
+                outConnector.init(request.getDestinationId(), request.getDestinationToken());
+
+                MetadataCollector metadataCollector = resolveMetadataCollector(request.getSourceType());
+                ResourceMetadata metadata = metadataCollector.getGetResourceMetadata(request.getSourceId(), request.getSourceToken());
+
+                mediator.transfer(inConnector, outConnector, metadata);
+
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+    public static void main(String args[]) {
+        TransferRequest request = new TransferRequest();
+        request.setSourceId("1");
+        request.setSourceType("SCP");
+        request.setDestinationId("2");
+        request.setDestinationType("SCP");
+
+        MFTAgent agent = new MFTAgent();
+        agent.requests.add(request);
+        agent.acceptRequests();
+    }
+
+    // TODO load from reflection to avoid dependencies
+    private Connector resolveConnector(String type, String direction) {
+        switch (type) {
+            case "SCP":
+                switch (direction) {
+                    case "IN":
+                        return new SCPReceiver();
+                    case "OUT":
+                        return new SCPSender();
+                }
+                break;
+        }
+        return null;
+    }
+
+    // TODO load from reflection to avoid dependencies
+    private MetadataCollector resolveMetadataCollector(String type) {
+        switch (type) {
+            case "SCP":
+                return new SCPMetadataCollector();
+        }
+        return null;
+    }
 }
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransferRequest.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransferRequest.java
new file mode 100644
index 0000000..42c4b40
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransferRequest.java
@@ -0,0 +1,58 @@
+package org.apache.airavata.mft.agent;
+
+public class TransferRequest {
+    private String sourceId;
+    private String sourceType;
+    private String sourceToken;
+    private String destinationId;
+    private String destinationType;
+    private String destinationToken;
+
+    public String getSourceId() {
+        return sourceId;
+    }
+
+    public void setSourceId(String sourceId) {
+        this.sourceId = sourceId;
+    }
+
+    public String getDestinationId() {
+        return destinationId;
+    }
+
+    public void setDestinationId(String destinationId) {
+        this.destinationId = destinationId;
+    }
+
+    public String getSourceType() {
+        return sourceType;
+    }
+
+    public void setSourceType(String sourceType) {
+        this.sourceType = sourceType;
+    }
+
+    public String getDestinationType() {
+        return destinationType;
+    }
+
+    public void setDestinationType(String destinationType) {
+        this.destinationType = destinationType;
+    }
+
+    public String getSourceToken() {
+        return sourceToken;
+    }
+
+    public void setSourceToken(String sourceToken) {
+        this.sourceToken = sourceToken;
+    }
+
+    public String getDestinationToken() {
+        return destinationToken;
+    }
+
+    public void setDestinationToken(String destinationToken) {
+        this.destinationToken = destinationToken;
+    }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
new file mode 100644
index 0000000..aef03d1
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -0,0 +1,58 @@
+package org.apache.airavata.mft.agent;
+
+import net.ladenthin.streambuffer.StreamBuffer;
+import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.TransferTask;
+import org.apache.airavata.mft.core.api.Connector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+public class TransportMediator {
+
+    private ExecutorService executor = Executors.newFixedThreadPool(10);
+
+    public void destroy() {
+        executor.shutdown();
+    }
+
+    public void transfer(Connector inConnector, Connector outConnector, ResourceMetadata metadata) throws Exception {
+
+        StreamBuffer streamBuffer = new StreamBuffer();
+        ConnectorContext context = new ConnectorContext();
+        context.setMetadata(metadata);
+        context.setStreamBuffer(streamBuffer);
+
+        TransferTask recvTask = new TransferTask(inConnector, context);
+        TransferTask sendTask = new TransferTask(outConnector, context);
+        List<Future<Integer>> futureList = new ArrayList<>();
+
+        ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
+
+        futureList.add(completionService.submit(recvTask));
+        futureList.add(completionService.submit(sendTask));
+
+        for (int i = 0; i < futureList.size(); i++) {
+            Future<Integer> ft = completionService.take();
+            futureList.remove(ft);
+            try {
+                ft.get();
+            } catch(InterruptedException e){
+                // Interrupted
+            } catch(ExecutionException e){
+                // Snap, something went wrong in the task! Abort! Abort! Abort!
+                System.out.println("One task failed with error: " + e.getMessage() );
+                e.printStackTrace();
+                for(Future<Integer> f : futureList){
+                    f.cancel(true);
+                }
+                futureList.clear();
+            }
+        }
+
+        //inConnector.destroy();
+        //outConnector.destroy();
+    }
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java b/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
index 57532fb..d801956 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
@@ -3,7 +3,7 @@ package org.apache.airavata.mft.core.api;
 import org.apache.airavata.mft.core.ConnectorContext;
 
 public interface Connector {
-    public void init(String resourceId, String credentialToken);
+    public void init(String resourceId, String credentialToken) throws Exception;
     public void destroy();
     void startStream(ConnectorContext context) throws Exception;
 }
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
index 091df74..3122436 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
@@ -14,7 +14,7 @@ public class SCPReceiver implements Connector {
     private Session session;
     private SSHResourceIdentifier sshResourceIdentifier;
 
-    public void init(String resourceId, String credentialToken) {
+    public void init(String resourceId, String credentialToken) throws Exception {
         this.sshResourceIdentifier = SCPTransportUtil.getSSHResourceIdentifier(resourceId);
         this.session = SCPTransportUtil.createSession(sshResourceIdentifier.getUser(), sshResourceIdentifier.getHost(),
                 sshResourceIdentifier.getPort(),
@@ -27,6 +27,11 @@ public class SCPReceiver implements Connector {
     }
 
     public void startStream(ConnectorContext context) throws Exception {
+        if (session == null) {
+            System.out.println("Session can not be null. Make sure that SCP Receiver is properly initialized");
+            throw new Exception("Session can not be null. Make sure that SCP Receiver is properly initialized");
+        }
+
         transferRemoteToStream(session, sshResourceIdentifier.getRemotePath(), context.getStreamBuffer());
     }
 
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
index 75413ca..9c7cea0 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
@@ -27,11 +27,15 @@ public class SCPSender implements Connector {
     }
 
     public void startStream(ConnectorContext context) throws Exception {
+        if (session == null) {
+            System.out.println("Session can not be null. Make sure that SCP Sender is properly initialized");
+            throw new Exception("Session can not be null. Make sure that SCP Sender is properly initialized");
+        }
         copyLocalToRemote(this.session, sshResourceIdentifier.getRemotePath(), context.getStreamBuffer(), context.getMetadata().getResourceSize());
     }
 
     private void copyLocalToRemote(Session session, String to, StreamBuffer streamBuffer, long fileSize) throws JSchException, IOException {
-
+        System.out.println("Starting scp send");
         InputStream inputStream = streamBuffer.getInputStream();
 
         boolean ptimestamp = true;
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/TransportMediator.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/TransportMediator.java
deleted file mode 100644
index 7d92ef0..0000000
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/TransportMediator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.apache.airavata.mft.transport.scp;
-
-import net.ladenthin.streambuffer.StreamBuffer;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.ResourceMetadata;
-import org.apache.airavata.mft.core.TransferTask;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-public class TransportMediator {
-
-    private ExecutorService executor = Executors.newFixedThreadPool(10);
-
-    public static void main(String args[]) throws Exception {
-        TransportMediator mediator = new TransportMediator();
-        SCPReceiver receiver = new SCPReceiver();
-        receiver.init("1", "");
-
-        SCPSender scpSender = new SCPSender();
-        scpSender.init("2", "");
-
-        SCPMetadataCollector metadataCollector = new SCPMetadataCollector();
-        ResourceMetadata metadata = metadataCollector.getGetResourceMetadata("1", "");
-        mediator.transfer(receiver, scpSender, metadata);
-    }
-
-    public void transfer(SCPReceiver receiver, SCPSender sender, ResourceMetadata metadata) throws Exception {
-
-        StreamBuffer streamBuffer = new StreamBuffer();
-        ConnectorContext context = new ConnectorContext();
-        context.setMetadata(metadata);
-        context.setStreamBuffer(streamBuffer);
-
-        TransferTask recvTask = new TransferTask(receiver, context);
-        TransferTask sendTask = new TransferTask(sender, context);
-
-        Future<Integer> recvFuture = executor.submit(recvTask);
-        Future<Integer> sendFuture = executor.submit(sendTask);
-
-        executor.shutdown();
-    }
-}