You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/01/07 08:11:10 UTC

[airavata] branch mft-integration updated: Migrating MFTAdmin -> MFTApiClient

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch mft-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/mft-integration by this push:
     new d54fdd7  Migrating MFTAdmin -> MFTApiClient
d54fdd7 is described below

commit d54fdd77ea5186e623d627ea5117eeaea1c95489
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Jan 7 03:10:53 2020 -0500

    Migrating MFTAdmin -> MFTApiClient
---
 modules/airavata-helix/helix-spectator/pom.xml     | 26 ++++++++-
 .../impl/task/staging/OutputDataStagingTask.java   | 62 ++++++++++++----------
 modules/ide-integration/pom.xml                    |  5 ++
 3 files changed, 64 insertions(+), 29 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/pom.xml b/modules/airavata-helix/helix-spectator/pom.xml
index e69db65..188c801 100644
--- a/modules/airavata-helix/helix-spectator/pom.xml
+++ b/modules/airavata-helix/helix-spectator/pom.xml
@@ -35,7 +35,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.airavata</groupId>
-            <artifactId>mft-admin</artifactId>
+            <artifactId>mft-api-client</artifactId>
             <version>0.01-SNAPSHOT</version>
         </dependency>
 
@@ -155,6 +155,30 @@
                     <groupId> com.google.guava</groupId>
                     <artifactId>guava</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty-codec-http</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty-handler</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty-handler-proxy</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty-transport-native-epoll</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty-transport-native-kqueue</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty-common</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
     </dependencies>
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index 4c4f9cc..902ebab 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -19,6 +19,7 @@
  */
 package org.apache.airavata.helix.impl.task.staging;
 
+import io.grpc.StatusRuntimeException;
 import org.apache.airavata.agents.api.AgentAdaptor;
 import org.apache.airavata.agents.api.AgentException;
 import org.apache.airavata.agents.api.StorageResourceAdaptor;
@@ -27,10 +28,8 @@ import org.apache.airavata.helix.impl.task.TaskContext;
 import org.apache.airavata.helix.impl.task.TaskOnFailException;
 import org.apache.airavata.helix.task.api.TaskHelper;
 import org.apache.airavata.helix.task.api.annotation.TaskDef;
-import org.apache.airavata.mft.admin.MFTAdmin;
-import org.apache.airavata.mft.admin.models.AgentInfo;
-import org.apache.airavata.mft.admin.models.TransferRequest;
-import org.apache.airavata.mft.admin.models.TransferState;
+import org.apache.airavata.mft.api.client.MFTApiClient;
+import org.apache.airavata.mft.api.service.*;
 import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
 import org.apache.airavata.model.application.io.DataType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
@@ -188,32 +187,39 @@ public class OutputDataStagingTask extends DataStagingTask {
                     String destId = "STORAGE:" + destinationURI.getPath() + ":" + getGatewayId() + ":" + getTaskContext().getStorageResourceId();
                     String destToken = getTaskContext().getStorageResourceCredentialToken() + ":" + getTaskContext().getStorageResourceLoginUserName() + ":" + getGatewayId();
 
-                    TransferRequest request = new TransferRequest();
-                    request.setSourceId(sourceId);
-                    request.setSourceToken(sourceToken);
-                    request.setSourceType("SCP");
-
-                    request.setDestinationId(destId);
-                    request.setDestinationToken(destToken);
-                    request.setDestinationType("SCP");
-
-                    MFTAdmin mftAdmin = new MFTAdmin();
-                    List<AgentInfo> liveAgentInfos = mftAdmin.getLiveAgentInfos();
-                    if (liveAgentInfos.size() == 0) {
-                        throw new TaskOnFailException("No active agent available", false, null);
-                    }
-
-                    String transferId = mftAdmin.submitTransfer(liveAgentInfos.get(0).getId(), request);
-                    logger.info("Submitted to Agent " + liveAgentInfos.get(0).getId() + ". Transfer id " + transferId);
+                    TransferApiRequest request = TransferApiRequest.newBuilder()
+                            .setSourceId(sourceId)
+                            .setSourceToken(sourceToken)
+                            .setSourceType("SCP")
+                            .setDestinationId(destId)
+                            .setDestinationToken(destToken)
+                            .setDestinationType("SCP")
+                            .setSourceResourceBackend("AIRAVATA")
+                            .setSourceCredentialBackend("AIRAVATA")
+                            .setDestResourceBackend("AIRAVATA")
+                            .setDestCredentialBackend("AIRAVATA")
+                            .setAffinityTransfer(false).build();
+
+                    MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient(
+                            ServerSettings.getSetting("mft.server.host"),
+                            Integer.parseInt(ServerSettings.getSetting("mft.server.port")));
+
+                    TransferApiResponse response = mftClient.submitTransfer(request);
+                    logger.info("Submitted file transfer with id " + response.getTransferId());
 
                     while (true) {
-                        TransferState transferState = mftAdmin.getTransferState(transferId);
-                        logger.info("Transfer status of " + transferId + " is " + transferState.getState());
-                        if ("COMPLETED".equals(transferState.getState())) {
-                            transferred = true;
-                            break;
-                        } else if ("FAILED".equals(transferState.getState())) {
-                            throw new TaskOnFailException("Transfer " + transferId + " failed", false, null);
+                        try {
+                            TransferStateApiResponse transferState = mftClient.getTransferState(
+                                    TransferStateApiRequest.newBuilder().setTransferId(response.getTransferId()).build());
+                            logger.info("Transfer status of " + response.getTransferId() + " is " + transferState.getState());
+                            if ("COMPLETED".equals(transferState.getState())) {
+                                transferred = true;
+                                break;
+                            } else if ("FAILED".equals(transferState.getState())) {
+                                throw new TaskOnFailException("Transfer " + response.getTransferId() + " failed", false, null);
+                            }
+                        } catch (StatusRuntimeException e) {
+                            logger.info("No status for transfer " + response.getTransferId());
                         }
                         Thread.sleep(1000);
                     }
diff --git a/modules/ide-integration/pom.xml b/modules/ide-integration/pom.xml
index c58b790..0e7c58b 100644
--- a/modules/ide-integration/pom.xml
+++ b/modules/ide-integration/pom.xml
@@ -17,6 +17,11 @@
             <version>2.9.9</version>
         </dependency>
         <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-common</artifactId>
+            <version>4.1.42.Final</version>
+        </dependency>
+        <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
             <version>2.9.9</version>