You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/01/02 14:43:26 UTC

[airavata-mft] branch master updated: Adding transfer states Admin API

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f3e190  Adding transfer states Admin API
1f3e190 is described below

commit 1f3e1903bcbf5775124389644d94bd028fb04587
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Jan 2 09:43:08 2020 -0500

    Adding transfer states Admin API
---
 .../org/apache/airavata/mft/admin/MFTAdmin.java    | 34 +++++++++++++++
 .../airavata/mft/admin/models/TransferState.java   | 51 ++++++++++++++++++++++
 .../org/apache/airavata/mft/agent/MFTAgent.java    | 33 ++++++++++++--
 .../airavata/mft/agent}/TransportMediator.java     | 23 ++++++----
 4 files changed, 129 insertions(+), 12 deletions(-)

diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java b/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java
index 2eb9d66..bf0ae79 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java
@@ -25,6 +25,7 @@ import com.orbitz.consul.KeyValueClient;
 import com.orbitz.consul.model.kv.Value;
 import org.apache.airavata.mft.admin.models.AgentInfo;
 import org.apache.airavata.mft.admin.models.TransferRequest;
+import org.apache.airavata.mft.admin.models.TransferState;
 
 import java.io.IOException;
 import java.util.*;
@@ -34,6 +35,7 @@ import java.util.stream.Collectors;
  mft/agents/messages/{agent-id} -> message
  mft/agent/info/{agent-id} -> agent infos
  mft/agent/live/{agent-id} -> live agent
+ mft/transfer/state/{transfer-id} -> transfer state
  */
 
 public class MFTAdmin {
@@ -44,7 +46,9 @@ public class MFTAdmin {
 
     public String submitTransfer(String agentId, TransferRequest transferRequest) throws MFTAdminException {
         try {
+
             String transferId = UUID.randomUUID().toString();
+            updateTransferState(transferId, new TransferState().setState("INITIALIZING").setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
             transferRequest.setTransferId(transferId);
             String asString = mapper.writeValueAsString(transferRequest);
             kvClient.putValue("mft/agents/messages/"  + agentId + "/" + transferId, asString);
@@ -103,6 +107,36 @@ public class MFTAdmin {
         }
     }
 
+    public TransferState getTransferState(String transferId) throws MFTAdminException {
+        try {
+            Optional<Value> value = kvClient.getValue("mft/transfer/state/" + transferId);
+            if (value.isPresent()) {
+                if (value.get().getValueAsString().isPresent()) {
+                    String asStr = value.get().getValueAsString().get();
+                    return mapper.readValue(asStr, TransferState.class);
+                }
+            }
+            return new TransferState().setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()).setState("UNKNOWN");
+
+        } catch (ConsulException e) {
+            if (e.getCode() == 404) {
+                return new TransferState().setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()).setState("UNKNOWN");
+            }
+            throw new MFTAdminException("Error in fetching transfer status " + transferId, e);
+        } catch (Exception e) {
+            throw new MFTAdminException("Error in fetching transfer status " + transferId, e);
+        }
+    }
+
+    public void updateTransferState(String transferId, TransferState transferState) throws MFTAdminException {
+        try {
+            String asStr = mapper.writeValueAsString(transferState);
+            kvClient.putValue("mft/transfer/state/" + transferId, asStr);
+        } catch (JsonProcessingException e) {
+            throw new MFTAdminException("Error in serializing transfer status", e);
+        }
+    }
+
     public List<AgentInfo> getLiveAgentInfos() throws MFTAdminException {
         List<String> liveAgentIds = getLiveAgentIds();
         return liveAgentIds.stream().map(id -> getAgentInfo(id).get()).collect(Collectors.toList());
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferState.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferState.java
new file mode 100644
index 0000000..8c9a194
--- /dev/null
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.admin.models;
+
+public class TransferState {
+    private String state;
+    private long updateTimeMils;
+    private double percentage;
+
+    public String getState() {
+        return state;
+    }
+
+    public TransferState setState(String state) {
+        this.state = state;
+        return this;
+    }
+
+    public long getUpdateTimeMils() {
+        return updateTimeMils;
+    }
+
+    public TransferState setUpdateTimeMils(long updateTimeMils) {
+        this.updateTimeMils = updateTimeMils;
+        return this;
+    }
+
+    public double getPercentage() {
+        return percentage;
+    }
+
+    public TransferState setPercentage(double percentage) {
+        this.percentage = percentage;
+        return this;
+    }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index c19906f..bc6f6ad 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -30,8 +30,8 @@ import org.apache.airavata.mft.admin.MFTAdmin;
 import org.apache.airavata.mft.admin.MFTAdminException;
 import org.apache.airavata.mft.admin.models.AgentInfo;
 import org.apache.airavata.mft.admin.models.TransferRequest;
+import org.apache.airavata.mft.admin.models.TransferState;
 import org.apache.airavata.mft.core.ResourceMetadata;
-import org.apache.airavata.mft.core.TransportMediator;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.transport.local.LocalMetadataCollector;
@@ -63,13 +63,17 @@ public class MFTAgent {
     private long sessionRenewSeconds = 4;
     private long sessionTTLSeconds = 10;
 
+    private MFTAdmin admin;
+
     public void init() {
         client = Consul.builder().build();
         kvClient = client.keyValueClient();
         messageCache = KVCache.newCache(kvClient, "mft/agents/messages/" + agentId );
+        admin = new MFTAdmin();
     }
 
     private void acceptRequests() {
+
         cacheListener = newValues -> {
             // Cache notifies all paths with "foo" the root path
             // If you want to watch only "foo" value, you must filter other paths
@@ -81,9 +85,12 @@ public class MFTAgent {
                 decodedValue.ifPresent(v -> {
                     System.out.println(String.format("Value is: %s", v));
                     ObjectMapper mapper = new ObjectMapper();
+                    TransferRequest request = null;
                     try {
-                        TransferRequest request = mapper.readValue(v, TransferRequest.class);
+                        request = mapper.readValue(v, TransferRequest.class);
                         System.out.println("Received request " + request.getTransferId());
+                        admin.updateTransferState(request.getTransferId(), new TransferState().setState("STARTING")
+                                .setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
 
                         Connector inConnector = MFTAgent.this.resolveConnector(request.getSourceType(), "IN");
                         inConnector.init(request.getSourceId(), request.getSourceToken());
@@ -93,11 +100,30 @@ public class MFTAgent {
                         MetadataCollector metadataCollector = MFTAgent.this.resolveMetadataCollector(request.getSourceType());
                         ResourceMetadata metadata = metadataCollector.getGetResourceMetadata(request.getSourceId(), request.getSourceToken());
                         System.out.println("File size " + metadata.getResourceSize());
-                        String transferId = mediator.transfer(inConnector, outConnector, metadata);
+                        admin.updateTransferState(request.getTransferId(), new TransferState().setState("STARTED")
+                                .setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
+
+                        String transferId = mediator.transfer(request.getTransferId(), inConnector, outConnector, metadata, (id, st) -> {
+                            try {
+                                admin.updateTransferState(id, st);
+                            } catch (MFTAdminException e) {
+                                e.printStackTrace();
+                            }
+                        });
+
                         System.out.println("Submitted transfer " + transferId);
 
                     } catch (Exception e) {
                         e.printStackTrace();
+                        if (request != null) {
+                            try {
+                                admin.updateTransferState(request.getTransferId(), new TransferState().setState("FAILED")
+                                        .setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
+                            } catch (MFTAdminException ex) {
+                                ex.printStackTrace();
+                                // Ignore
+                            }
+                        }
                     } finally {
                         System.out.println("Deleting key " + value.getKey());
                         kvClient.deleteKey(value.getKey()); // Due to bug in consul https://github.com/hashicorp/consul/issues/571
@@ -136,7 +162,6 @@ public class MFTAgent {
                     }
                 }
             }, sessionRenewSeconds, sessionRenewSeconds, TimeUnit.SECONDS);
-            MFTAdmin admin = new MFTAdmin();
             admin.registerAgent(new AgentInfo()
                     .setId(agentId)
                     .setHost("localhost")
diff --git a/core/src/main/java/org/apache/airavata/mft/core/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
similarity index 77%
rename from core/src/main/java/org/apache/airavata/mft/core/TransportMediator.java
rename to agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index 2644bb0..389ea8d 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -15,14 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.airavata.mft.core;
+package org.apache.airavata.mft.agent;
 
+import org.apache.airavata.mft.admin.models.TransferState;
+import org.apache.airavata.mft.core.CircularStreamingBuffer;
+import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.TransferTask;
 import org.apache.airavata.mft.core.api.Connector;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.*;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 public class TransportMediator {
 
@@ -33,9 +40,8 @@ public class TransportMediator {
         executor.shutdown();
     }
 
-    public String transfer(Connector inConnector, Connector outConnector, ResourceMetadata metadata) throws Exception {
-
-        String transferId = UUID.randomUUID().toString();
+    public String transfer(String transferId, Connector inConnector, Connector outConnector, ResourceMetadata metadata,
+                           BiConsumer<String, TransferState> onCallback) throws Exception {
 
         CircularStreamingBuffer streamBuffer = new CircularStreamingBuffer();
         ConnectorContext context = new ConnectorContext();
@@ -48,7 +54,7 @@ public class TransportMediator {
 
         ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
 
-        long startTime = System.currentTimeMillis();
+        long startTime = System.nanoTime();
 
         futureList.add(completionService.submit(recvTask));
         futureList.add(completionService.submit(sendTask));
@@ -70,6 +76,7 @@ public class TransportMediator {
                             // Snap, something went wrong in the task! Abort! Abort! Abort!
                             System.out.println("One task failed with error: " + e.getMessage());
                             e.printStackTrace();
+                            onCallback.accept(transferId, new TransferState().setPercentage(0).setState("FAILED").setUpdateTimeMils(System.currentTimeMillis()));
                             for (Future<Integer> f : futureList) {
                                 try {
                                     Thread.sleep(1000);
@@ -82,10 +89,10 @@ public class TransportMediator {
                         }
                     }
 
-                    long endTime = System.currentTimeMillis();
-
-                    long time = (endTime - startTime) / 1000;
+                    long endTime = System.nanoTime();
 
+                    double time = (endTime - startTime) * 1.0 /1000000000;
+                    onCallback.accept(transferId, new TransferState().setPercentage(100).setState("COMPLETED").setUpdateTimeMils(System.currentTimeMillis()));
                     System.out.println("Transfer Speed " + (metadata.getResourceSize() * 1.0 / time) / (1024 * 1024) + " MB/s");
                     System.out.println("Transfer " + transferId + " completed");
                 } catch (Exception e) {