You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/01/02 14:41:41 UTC
[airavata] 01/01: Initial integration of OutputDataStagingTask with
MFT
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch mft-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 5133dbf819191d33b4f3ffeb47f01a52d885d0b7
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Jan 2 09:41:18 2020 -0500
Initial integration of OutputDataStagingTask with MFT
---
modules/airavata-helix/helix-spectator/pom.xml | 43 ++++++++++++++++---
.../impl/task/staging/OutputDataStagingTask.java | 49 +++++++++++++++++++++-
.../apache/airavata/common/utils/Constants.java | 1 +
.../airavata/common/utils/ServerSettings.java | 4 ++
modules/ide-integration/pom.xml | 10 +++++
5 files changed, 100 insertions(+), 7 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/pom.xml b/modules/airavata-helix/helix-spectator/pom.xml
index aef6002..e69db65 100644
--- a/modules/airavata-helix/helix-spectator/pom.xml
+++ b/modules/airavata-helix/helix-spectator/pom.xml
@@ -35,6 +35,12 @@
<dependencies>
<dependency>
<groupId>org.apache.airavata</groupId>
+ <artifactId>mft-admin</artifactId>
+ <version>0.01-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
<artifactId>task-core</artifactId>
<version>0.19-SNAPSHOT</version>
<exclusions>
@@ -42,12 +48,26 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
<artifactId>registry-api-service</artifactId>
<version>0.19-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <groupId> com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
@@ -63,6 +83,12 @@
<groupId>org.apache.airavata</groupId>
<artifactId>services-security</artifactId>
<version>0.19-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
@@ -96,11 +122,6 @@
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
- <artifactId>services-security</artifactId>
- <version>0.19-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
<artifactId>airavata-messaging-core</artifactId>
<version>0.19-SNAPSHOT</version>
</dependency>
@@ -118,11 +139,23 @@
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java</artifactId>
<version>3.0.14</version>
+ <exclusions>
+ <exclusion>
+ <groupId> com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
</project>
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index a5d9d74..4c4f9cc 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -22,17 +22,21 @@ package org.apache.airavata.helix.impl.task.staging;
import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.agents.api.AgentException;
import org.apache.airavata.agents.api.StorageResourceAdaptor;
+import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.impl.task.TaskContext;
import org.apache.airavata.helix.impl.task.TaskOnFailException;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.mft.admin.MFTAdmin;
+import org.apache.airavata.mft.admin.models.AgentInfo;
+import org.apache.airavata.mft.admin.models.TransferRequest;
+import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
import org.apache.airavata.model.application.io.DataType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.task.DataStagingTaskModel;
import org.apache.helix.task.TaskResult;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -176,7 +180,48 @@ public class OutputDataStagingTask extends DataStagingTask {
} else {
// Uploading output file to the storage resource
assert processOutput != null;
- boolean transferred = transferFileToStorage(sourceURI.getPath(), destinationURI.getPath(), sourceFileName, adaptor, storageResourceAdaptor);
+ boolean transferred = false;
+ if (ServerSettings.isAgentTransferEnabled()) {
+ String sourceId = "CLUSTER:" + sourceURI.getPath() + ":" + getGatewayId() + ":" + getTaskContext().getComputeResourceId();
+ String sourceToken = getTaskContext().getComputeResourceCredentialToken() + ":" + getTaskContext().getComputeResourceLoginUserName() + ":" + getGatewayId();
+
+ String destId = "STORAGE:" + destinationURI.getPath() + ":" + getGatewayId() + ":" + getTaskContext().getStorageResourceId();
+ String destToken = getTaskContext().getStorageResourceCredentialToken() + ":" + getTaskContext().getStorageResourceLoginUserName() + ":" + getGatewayId();
+
+ TransferRequest request = new TransferRequest();
+ request.setSourceId(sourceId);
+ request.setSourceToken(sourceToken);
+ request.setSourceType("SCP");
+
+ request.setDestinationId(destId);
+ request.setDestinationToken(destToken);
+ request.setDestinationType("SCP");
+
+ MFTAdmin mftAdmin = new MFTAdmin();
+ List<AgentInfo> liveAgentInfos = mftAdmin.getLiveAgentInfos();
+ if (liveAgentInfos.size() == 0) {
+ throw new TaskOnFailException("No active agent available", false, null);
+ }
+
+ String transferId = mftAdmin.submitTransfer(liveAgentInfos.get(0).getId(), request);
+ logger.info("Submitted to Agent " + liveAgentInfos.get(0).getId() + ". Transfer id " + transferId);
+
+ while (true) {
+ TransferState transferState = mftAdmin.getTransferState(transferId);
+ logger.info("Transfer status of " + transferId + " is " + transferState.getState());
+ if ("COMPLETED".equals(transferState.getState())) {
+ transferred = true;
+ break;
+ } else if ("FAILED".equals(transferState.getState())) {
+ throw new TaskOnFailException("Transfer " + transferId + " failed", false, null);
+ }
+ Thread.sleep(1000);
+ }
+
+ } else {
+ transferred = transferFileToStorage(sourceURI.getPath(), destinationURI.getPath(), sourceFileName, adaptor, storageResourceAdaptor);
+ }
+
if (transferred) {
saveExperimentOutput(processOutput.getName(), destinationURI.toString());
} else {
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
index 63c99ba..205d9ac 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -71,4 +71,5 @@ public final class Constants {
public static final String NEWLINE = System.getProperty("line.separator");
public static final String ENABLE_STREAMING_TRANSFER = "enable.streaming.transfer";
+ public static final String ENABLE_AGENT_TRANSFER = "enable.agent.transfer";
}
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index e740cf5..6834e2b 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -506,4 +506,8 @@ public class ServerSettings extends ApplicationSettings {
public static Boolean isSteamingEnabled() {
return Boolean.valueOf(getSetting(Constants.ENABLE_STREAMING_TRANSFER, "True"));
}
+
+ public static Boolean isAgentTransferEnabled() {
+ return Boolean.valueOf(getSetting(Constants.ENABLE_AGENT_TRANSFER, "True"));
+ }
}
diff --git a/modules/ide-integration/pom.xml b/modules/ide-integration/pom.xml
index f9462ca..c58b790 100644
--- a/modules/ide-integration/pom.xml
+++ b/modules/ide-integration/pom.xml
@@ -12,6 +12,16 @@
<dependencies>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>2.9.9</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>2.9.9</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.airavata</groupId>
<artifactId>db-event-manager</artifactId>
<version>0.19-SNAPSHOT</version>