You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/01/02 14:41:40 UTC

[airavata] branch mft-integration created (now 5133dbf)

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a change to branch mft-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git.


      at 5133dbf  Initial integration of OutputDataStagingTask with MFT

This branch includes the following new commits:

     new 5133dbf  Initial integration of OutputDataStagingTask with MFT

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[airavata] 01/01: Initial integration of OutputDataStagingTask with MFT

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch mft-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 5133dbf819191d33b4f3ffeb47f01a52d885d0b7
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Jan 2 09:41:18 2020 -0500

    Initial integration of OutputDataStagingTask with MFT
---
 modules/airavata-helix/helix-spectator/pom.xml     | 43 ++++++++++++++++---
 .../impl/task/staging/OutputDataStagingTask.java   | 49 +++++++++++++++++++++-
 .../apache/airavata/common/utils/Constants.java    |  1 +
 .../airavata/common/utils/ServerSettings.java      |  4 ++
 modules/ide-integration/pom.xml                    | 10 +++++
 5 files changed, 100 insertions(+), 7 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/pom.xml b/modules/airavata-helix/helix-spectator/pom.xml
index aef6002..e69db65 100644
--- a/modules/airavata-helix/helix-spectator/pom.xml
+++ b/modules/airavata-helix/helix-spectator/pom.xml
@@ -35,6 +35,12 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.airavata</groupId>
+            <artifactId>mft-admin</artifactId>
+            <version>0.01-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
             <artifactId>task-core</artifactId>
             <version>0.19-SNAPSHOT</version>
             <exclusions>
@@ -42,12 +48,26 @@
                     <groupId>commons-io</groupId>
                     <artifactId>commons-io</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
             <artifactId>registry-api-service</artifactId>
             <version>0.19-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId> com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
@@ -63,6 +83,12 @@
             <groupId>org.apache.airavata</groupId>
             <artifactId>services-security</artifactId>
             <version>0.19-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
@@ -96,11 +122,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
-            <artifactId>services-security</artifactId>
-            <version>0.19-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
             <artifactId>airavata-messaging-core</artifactId>
             <version>0.19-SNAPSHOT</version>
         </dependency>
@@ -118,11 +139,23 @@
             <groupId>net.logstash.logback</groupId>
             <artifactId>logstash-logback-encoder</artifactId>
             <version>5.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.github.docker-java</groupId>
             <artifactId>docker-java</artifactId>
             <version>3.0.14</version>
+            <exclusions>
+                <exclusion>
+                    <groupId> com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
 </project>
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index a5d9d74..4c4f9cc 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -22,17 +22,21 @@ package org.apache.airavata.helix.impl.task.staging;
 import org.apache.airavata.agents.api.AgentAdaptor;
 import org.apache.airavata.agents.api.AgentException;
 import org.apache.airavata.agents.api.StorageResourceAdaptor;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.helix.impl.task.TaskContext;
 import org.apache.airavata.helix.impl.task.TaskOnFailException;
 import org.apache.airavata.helix.task.api.TaskHelper;
 import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.mft.admin.MFTAdmin;
+import org.apache.airavata.mft.admin.models.AgentInfo;
+import org.apache.airavata.mft.admin.models.TransferRequest;
+import org.apache.airavata.mft.admin.models.TransferState;
 import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
 import org.apache.airavata.model.application.io.DataType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.task.DataStagingTaskModel;
 import org.apache.helix.task.TaskResult;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -176,7 +180,48 @@ public class OutputDataStagingTask extends DataStagingTask {
             } else {
                 // Uploading output file to the storage resource
                 assert processOutput != null;
-                boolean transferred = transferFileToStorage(sourceURI.getPath(), destinationURI.getPath(), sourceFileName, adaptor, storageResourceAdaptor);
+                boolean transferred = false;
+                if (ServerSettings.isAgentTransferEnabled()) {
+                    String sourceId = "CLUSTER:" + sourceURI.getPath() + ":" + getGatewayId() + ":" + getTaskContext().getComputeResourceId();
+                    String sourceToken = getTaskContext().getComputeResourceCredentialToken() + ":" + getTaskContext().getComputeResourceLoginUserName() + ":" + getGatewayId();
+
+                    String destId = "STORAGE:" + destinationURI.getPath() + ":" + getGatewayId() + ":" + getTaskContext().getStorageResourceId();
+                    String destToken = getTaskContext().getStorageResourceCredentialToken() + ":" + getTaskContext().getStorageResourceLoginUserName() + ":" + getGatewayId();
+
+                    TransferRequest request = new TransferRequest();
+                    request.setSourceId(sourceId);
+                    request.setSourceToken(sourceToken);
+                    request.setSourceType("SCP");
+
+                    request.setDestinationId(destId);
+                    request.setDestinationToken(destToken);
+                    request.setDestinationType("SCP");
+
+                    MFTAdmin mftAdmin = new MFTAdmin();
+                    List<AgentInfo> liveAgentInfos = mftAdmin.getLiveAgentInfos();
+                    if (liveAgentInfos.size() == 0) {
+                        throw new TaskOnFailException("No active agent available", false, null);
+                    }
+
+                    String transferId = mftAdmin.submitTransfer(liveAgentInfos.get(0).getId(), request);
+                    logger.info("Submitted to Agent " + liveAgentInfos.get(0).getId() + ". Transfer id " + transferId);
+
+                    while (true) {
+                        TransferState transferState = mftAdmin.getTransferState(transferId);
+                        logger.info("Transfer status of " + transferId + " is " + transferState.getState());
+                        if ("COMPLETED".equals(transferState.getState())) {
+                            transferred = true;
+                            break;
+                        } else if ("FAILED".equals(transferState.getState())) {
+                            throw new TaskOnFailException("Transfer " + transferId + " failed", false, null);
+                        }
+                        Thread.sleep(1000);
+                    }
+
+                } else {
+                    transferred = transferFileToStorage(sourceURI.getPath(), destinationURI.getPath(), sourceFileName, adaptor, storageResourceAdaptor);
+                }
+
                 if (transferred) {
                     saveExperimentOutput(processOutput.getName(), destinationURI.toString());
                 } else {
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
index 63c99ba..205d9ac 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -71,4 +71,5 @@ public final class Constants {
     public static final String NEWLINE = System.getProperty("line.separator");
 
     public static final String ENABLE_STREAMING_TRANSFER = "enable.streaming.transfer";
+    public static final String ENABLE_AGENT_TRANSFER = "enable.agent.transfer";
 }
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index e740cf5..6834e2b 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -506,4 +506,8 @@ public class ServerSettings extends ApplicationSettings {
     public static Boolean isSteamingEnabled() {
         return Boolean.valueOf(getSetting(Constants.ENABLE_STREAMING_TRANSFER, "True"));
     }
+
+    public static Boolean isAgentTransferEnabled() {
+        return Boolean.valueOf(getSetting(Constants.ENABLE_AGENT_TRANSFER, "True"));
+    }
 }
diff --git a/modules/ide-integration/pom.xml b/modules/ide-integration/pom.xml
index f9462ca..c58b790 100644
--- a/modules/ide-integration/pom.xml
+++ b/modules/ide-integration/pom.xml
@@ -12,6 +12,16 @@
 
     <dependencies>
         <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <version>2.9.9</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>2.9.9</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.airavata</groupId>
             <artifactId>db-event-manager</artifactId>
             <version>0.19-SNAPSHOT</version>