You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/01/02 14:41:41 UTC

[airavata] 01/01: Initial integration of OutputDataStagingTask with MFT

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch mft-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 5133dbf819191d33b4f3ffeb47f01a52d885d0b7
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Jan 2 09:41:18 2020 -0500

    Initial integration of OutputDataStagingTask with MFT
---
 modules/airavata-helix/helix-spectator/pom.xml     | 43 ++++++++++++++++---
 .../impl/task/staging/OutputDataStagingTask.java   | 49 +++++++++++++++++++++-
 .../apache/airavata/common/utils/Constants.java    |  1 +
 .../airavata/common/utils/ServerSettings.java      |  4 ++
 modules/ide-integration/pom.xml                    | 10 +++++
 5 files changed, 100 insertions(+), 7 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/pom.xml b/modules/airavata-helix/helix-spectator/pom.xml
index aef6002..e69db65 100644
--- a/modules/airavata-helix/helix-spectator/pom.xml
+++ b/modules/airavata-helix/helix-spectator/pom.xml
@@ -35,6 +35,12 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.airavata</groupId>
+            <artifactId>mft-admin</artifactId>
+            <version>0.01-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
             <artifactId>task-core</artifactId>
             <version>0.19-SNAPSHOT</version>
             <exclusions>
@@ -42,12 +48,26 @@
                     <groupId>commons-io</groupId>
                     <artifactId>commons-io</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
             <artifactId>registry-api-service</artifactId>
             <version>0.19-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId> com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
@@ -63,6 +83,12 @@
             <groupId>org.apache.airavata</groupId>
             <artifactId>services-security</artifactId>
             <version>0.19-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
@@ -96,11 +122,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
-            <artifactId>services-security</artifactId>
-            <version>0.19-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
             <artifactId>airavata-messaging-core</artifactId>
             <version>0.19-SNAPSHOT</version>
         </dependency>
@@ -118,11 +139,23 @@
             <groupId>net.logstash.logback</groupId>
             <artifactId>logstash-logback-encoder</artifactId>
             <version>5.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.github.docker-java</groupId>
             <artifactId>docker-java</artifactId>
             <version>3.0.14</version>
+            <exclusions>
+                <exclusion>
+                    <groupId> com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
 </project>
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index a5d9d74..4c4f9cc 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -22,17 +22,21 @@ package org.apache.airavata.helix.impl.task.staging;
 import org.apache.airavata.agents.api.AgentAdaptor;
 import org.apache.airavata.agents.api.AgentException;
 import org.apache.airavata.agents.api.StorageResourceAdaptor;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.helix.impl.task.TaskContext;
 import org.apache.airavata.helix.impl.task.TaskOnFailException;
 import org.apache.airavata.helix.task.api.TaskHelper;
 import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.mft.admin.MFTAdmin;
+import org.apache.airavata.mft.admin.models.AgentInfo;
+import org.apache.airavata.mft.admin.models.TransferRequest;
+import org.apache.airavata.mft.admin.models.TransferState;
 import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
 import org.apache.airavata.model.application.io.DataType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.task.DataStagingTaskModel;
 import org.apache.helix.task.TaskResult;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -176,7 +180,48 @@ public class OutputDataStagingTask extends DataStagingTask {
             } else {
                 // Uploading output file to the storage resource
                 assert processOutput != null;
-                boolean transferred = transferFileToStorage(sourceURI.getPath(), destinationURI.getPath(), sourceFileName, adaptor, storageResourceAdaptor);
+                boolean transferred = false;
+                if (ServerSettings.isAgentTransferEnabled()) {
+                    String sourceId = "CLUSTER:" + sourceURI.getPath() + ":" + getGatewayId() + ":" + getTaskContext().getComputeResourceId();
+                    String sourceToken = getTaskContext().getComputeResourceCredentialToken() + ":" + getTaskContext().getComputeResourceLoginUserName() + ":" + getGatewayId();
+
+                    String destId = "STORAGE:" + destinationURI.getPath() + ":" + getGatewayId() + ":" + getTaskContext().getStorageResourceId();
+                    String destToken = getTaskContext().getStorageResourceCredentialToken() + ":" + getTaskContext().getStorageResourceLoginUserName() + ":" + getGatewayId();
+
+                    TransferRequest request = new TransferRequest();
+                    request.setSourceId(sourceId);
+                    request.setSourceToken(sourceToken);
+                    request.setSourceType("SCP");
+
+                    request.setDestinationId(destId);
+                    request.setDestinationToken(destToken);
+                    request.setDestinationType("SCP");
+
+                    MFTAdmin mftAdmin = new MFTAdmin();
+                    List<AgentInfo> liveAgentInfos = mftAdmin.getLiveAgentInfos();
+                    if (liveAgentInfos.size() == 0) {
+                        throw new TaskOnFailException("No active agent available", false, null);
+                    }
+
+                    String transferId = mftAdmin.submitTransfer(liveAgentInfos.get(0).getId(), request);
+                    logger.info("Submitted to Agent " + liveAgentInfos.get(0).getId() + ". Transfer id " + transferId);
+
+                    while (true) {
+                        TransferState transferState = mftAdmin.getTransferState(transferId);
+                        logger.info("Transfer status of " + transferId + " is " + transferState.getState());
+                        if ("COMPLETED".equals(transferState.getState())) {
+                            transferred = true;
+                            break;
+                        } else if ("FAILED".equals(transferState.getState())) {
+                            throw new TaskOnFailException("Transfer " + transferId + " failed", false, null);
+                        }
+                        Thread.sleep(1000);
+                    }
+
+                } else {
+                    transferred = transferFileToStorage(sourceURI.getPath(), destinationURI.getPath(), sourceFileName, adaptor, storageResourceAdaptor);
+                }
+
                 if (transferred) {
                     saveExperimentOutput(processOutput.getName(), destinationURI.toString());
                 } else {
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
index 63c99ba..205d9ac 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -71,4 +71,5 @@ public final class Constants {
     public static final String NEWLINE = System.getProperty("line.separator");
 
     public static final String ENABLE_STREAMING_TRANSFER = "enable.streaming.transfer";
+    public static final String ENABLE_AGENT_TRANSFER = "enable.agent.transfer";
 }
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index e740cf5..6834e2b 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -506,4 +506,8 @@ public class ServerSettings extends ApplicationSettings {
     public static Boolean isSteamingEnabled() {
         return Boolean.valueOf(getSetting(Constants.ENABLE_STREAMING_TRANSFER, "True"));
     }
+
+    public static Boolean isAgentTransferEnabled() {
+        return Boolean.valueOf(getSetting(Constants.ENABLE_AGENT_TRANSFER, "True"));
+    }
 }
diff --git a/modules/ide-integration/pom.xml b/modules/ide-integration/pom.xml
index f9462ca..c58b790 100644
--- a/modules/ide-integration/pom.xml
+++ b/modules/ide-integration/pom.xml
@@ -12,6 +12,16 @@
 
     <dependencies>
         <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <version>2.9.9</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>2.9.9</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.airavata</groupId>
             <artifactId>db-event-manager</artifactId>
             <version>0.19-SNAPSHOT</version>