You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/01/07 08:11:10 UTC
[airavata] branch mft-integration updated: Migrating MFTAdmin ->
MFTApiClient
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch mft-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/mft-integration by this push:
new d54fdd7 Migrating MFTAdmin -> MFTApiClient
d54fdd7 is described below
commit d54fdd77ea5186e623d627ea5117eeaea1c95489
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Jan 7 03:10:53 2020 -0500
Migrating MFTAdmin -> MFTApiClient
---
modules/airavata-helix/helix-spectator/pom.xml | 26 ++++++++-
.../impl/task/staging/OutputDataStagingTask.java | 62 ++++++++++++----------
modules/ide-integration/pom.xml | 5 ++
3 files changed, 64 insertions(+), 29 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/pom.xml b/modules/airavata-helix/helix-spectator/pom.xml
index e69db65..188c801 100644
--- a/modules/airavata-helix/helix-spectator/pom.xml
+++ b/modules/airavata-helix/helix-spectator/pom.xml
@@ -35,7 +35,7 @@
<dependencies>
<dependency>
<groupId>org.apache.airavata</groupId>
- <artifactId>mft-admin</artifactId>
+ <artifactId>mft-api-client</artifactId>
<version>0.01-SNAPSHOT</version>
</dependency>
@@ -155,6 +155,30 @@
<groupId> com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler-proxy</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-kqueue</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index 4c4f9cc..902ebab 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -19,6 +19,7 @@
*/
package org.apache.airavata.helix.impl.task.staging;
+import io.grpc.StatusRuntimeException;
import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.agents.api.AgentException;
import org.apache.airavata.agents.api.StorageResourceAdaptor;
@@ -27,10 +28,8 @@ import org.apache.airavata.helix.impl.task.TaskContext;
import org.apache.airavata.helix.impl.task.TaskOnFailException;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
-import org.apache.airavata.mft.admin.MFTAdmin;
-import org.apache.airavata.mft.admin.models.AgentInfo;
-import org.apache.airavata.mft.admin.models.TransferRequest;
-import org.apache.airavata.mft.admin.models.TransferState;
+import org.apache.airavata.mft.api.client.MFTApiClient;
+import org.apache.airavata.mft.api.service.*;
import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
import org.apache.airavata.model.application.io.DataType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
@@ -188,32 +187,39 @@ public class OutputDataStagingTask extends DataStagingTask {
String destId = "STORAGE:" + destinationURI.getPath() + ":" + getGatewayId() + ":" + getTaskContext().getStorageResourceId();
String destToken = getTaskContext().getStorageResourceCredentialToken() + ":" + getTaskContext().getStorageResourceLoginUserName() + ":" + getGatewayId();
- TransferRequest request = new TransferRequest();
- request.setSourceId(sourceId);
- request.setSourceToken(sourceToken);
- request.setSourceType("SCP");
-
- request.setDestinationId(destId);
- request.setDestinationToken(destToken);
- request.setDestinationType("SCP");
-
- MFTAdmin mftAdmin = new MFTAdmin();
- List<AgentInfo> liveAgentInfos = mftAdmin.getLiveAgentInfos();
- if (liveAgentInfos.size() == 0) {
- throw new TaskOnFailException("No active agent available", false, null);
- }
-
- String transferId = mftAdmin.submitTransfer(liveAgentInfos.get(0).getId(), request);
- logger.info("Submitted to Agent " + liveAgentInfos.get(0).getId() + ". Transfer id " + transferId);
+ TransferApiRequest request = TransferApiRequest.newBuilder()
+ .setSourceId(sourceId)
+ .setSourceToken(sourceToken)
+ .setSourceType("SCP")
+ .setDestinationId(destId)
+ .setDestinationToken(destToken)
+ .setDestinationType("SCP")
+ .setSourceResourceBackend("AIRAVATA")
+ .setSourceCredentialBackend("AIRAVATA")
+ .setDestResourceBackend("AIRAVATA")
+ .setDestCredentialBackend("AIRAVATA")
+ .setAffinityTransfer(false).build();
+
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(
+ ServerSettings.getSetting("mft.server.host"),
+ Integer.parseInt(ServerSettings.getSetting("mft.server.port")));
+
+ TransferApiResponse response = mftClient.submitTransfer(request);
+ logger.info("Submitted file transfer with id " + response.getTransferId());
while (true) {
- TransferState transferState = mftAdmin.getTransferState(transferId);
- logger.info("Transfer status of " + transferId + " is " + transferState.getState());
- if ("COMPLETED".equals(transferState.getState())) {
- transferred = true;
- break;
- } else if ("FAILED".equals(transferState.getState())) {
- throw new TaskOnFailException("Transfer " + transferId + " failed", false, null);
+ try {
+ TransferStateApiResponse transferState = mftClient.getTransferState(
+ TransferStateApiRequest.newBuilder().setTransferId(response.getTransferId()).build());
+ logger.info("Transfer status of " + response.getTransferId() + " is " + transferState.getState());
+ if ("COMPLETED".equals(transferState.getState())) {
+ transferred = true;
+ break;
+ } else if ("FAILED".equals(transferState.getState())) {
+ throw new TaskOnFailException("Transfer " + response.getTransferId() + " failed", false, null);
+ }
+ } catch (StatusRuntimeException e) {
+ logger.info("No status for transfer " + response.getTransferId());
}
Thread.sleep(1000);
}
diff --git a/modules/ide-integration/pom.xml b/modules/ide-integration/pom.xml
index c58b790..0e7c58b 100644
--- a/modules/ide-integration/pom.xml
+++ b/modules/ide-integration/pom.xml
@@ -17,6 +17,11 @@
<version>2.9.9</version>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ <version>4.1.42.Final</version>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.9</version>