You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/01/02 14:43:26 UTC
[airavata-mft] branch master updated: Adding transfer states Admin
API
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new 1f3e190 Adding transfer states Admin API
1f3e190 is described below
commit 1f3e1903bcbf5775124389644d94bd028fb04587
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Jan 2 09:43:08 2020 -0500
Adding transfer states Admin API
---
.../org/apache/airavata/mft/admin/MFTAdmin.java | 34 +++++++++++++++
.../airavata/mft/admin/models/TransferState.java | 51 ++++++++++++++++++++++
.../org/apache/airavata/mft/agent/MFTAgent.java | 33 ++++++++++++--
.../airavata/mft/agent}/TransportMediator.java | 23 ++++++----
4 files changed, 129 insertions(+), 12 deletions(-)
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java b/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java
index 2eb9d66..bf0ae79 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java
@@ -25,6 +25,7 @@ import com.orbitz.consul.KeyValueClient;
import com.orbitz.consul.model.kv.Value;
import org.apache.airavata.mft.admin.models.AgentInfo;
import org.apache.airavata.mft.admin.models.TransferRequest;
+import org.apache.airavata.mft.admin.models.TransferState;
import java.io.IOException;
import java.util.*;
@@ -34,6 +35,7 @@ import java.util.stream.Collectors;
mft/agents/messages/{agent-id} -> message
mft/agent/info/{agent-id} -> agent infos
mft/agent/live/{agent-id} -> live agent
+ mft/transfer/state/{transfer-id} -> transfer state
*/
public class MFTAdmin {
@@ -44,7 +46,9 @@ public class MFTAdmin {
public String submitTransfer(String agentId, TransferRequest transferRequest) throws MFTAdminException {
try {
+
String transferId = UUID.randomUUID().toString();
+ updateTransferState(transferId, new TransferState().setState("INITIALIZING").setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
transferRequest.setTransferId(transferId);
String asString = mapper.writeValueAsString(transferRequest);
kvClient.putValue("mft/agents/messages/" + agentId + "/" + transferId, asString);
@@ -103,6 +107,36 @@ public class MFTAdmin {
}
}
+ public TransferState getTransferState(String transferId) throws MFTAdminException {
+ try {
+ Optional<Value> value = kvClient.getValue("mft/transfer/state/" + transferId);
+ if (value.isPresent()) {
+ if (value.get().getValueAsString().isPresent()) {
+ String asStr = value.get().getValueAsString().get();
+ return mapper.readValue(asStr, TransferState.class);
+ }
+ }
+ return new TransferState().setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()).setState("UNKNOWN");
+
+ } catch (ConsulException e) {
+ if (e.getCode() == 404) {
+ return new TransferState().setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()).setState("UNKNOWN");
+ }
+ throw new MFTAdminException("Error in fetching transfer status " + transferId, e);
+ } catch (Exception e) {
+ throw new MFTAdminException("Error in fetching transfer status " + transferId, e);
+ }
+ }
+
+ public void updateTransferState(String transferId, TransferState transferState) throws MFTAdminException {
+ try {
+ String asStr = mapper.writeValueAsString(transferState);
+ kvClient.putValue("mft/transfer/state/" + transferId, asStr);
+ } catch (JsonProcessingException e) {
+ throw new MFTAdminException("Error in serializing transfer status", e);
+ }
+ }
+
public List<AgentInfo> getLiveAgentInfos() throws MFTAdminException {
List<String> liveAgentIds = getLiveAgentIds();
return liveAgentIds.stream().map(id -> getAgentInfo(id).get()).collect(Collectors.toList());
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferState.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferState.java
new file mode 100644
index 0000000..8c9a194
--- /dev/null
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.admin.models;
+
+public class TransferState {
+ private String state;
+ private long updateTimeMils;
+ private double percentage;
+
+ public String getState() {
+ return state;
+ }
+
+ public TransferState setState(String state) {
+ this.state = state;
+ return this;
+ }
+
+ public long getUpdateTimeMils() {
+ return updateTimeMils;
+ }
+
+ public TransferState setUpdateTimeMils(long updateTimeMils) {
+ this.updateTimeMils = updateTimeMils;
+ return this;
+ }
+
+ public double getPercentage() {
+ return percentage;
+ }
+
+ public TransferState setPercentage(double percentage) {
+ this.percentage = percentage;
+ return this;
+ }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index c19906f..bc6f6ad 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -30,8 +30,8 @@ import org.apache.airavata.mft.admin.MFTAdmin;
import org.apache.airavata.mft.admin.MFTAdminException;
import org.apache.airavata.mft.admin.models.AgentInfo;
import org.apache.airavata.mft.admin.models.TransferRequest;
+import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.core.ResourceMetadata;
-import org.apache.airavata.mft.core.TransportMediator;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.airavata.mft.transport.local.LocalMetadataCollector;
@@ -63,13 +63,17 @@ public class MFTAgent {
private long sessionRenewSeconds = 4;
private long sessionTTLSeconds = 10;
+ private MFTAdmin admin;
+
public void init() {
client = Consul.builder().build();
kvClient = client.keyValueClient();
messageCache = KVCache.newCache(kvClient, "mft/agents/messages/" + agentId );
+ admin = new MFTAdmin();
}
private void acceptRequests() {
+
cacheListener = newValues -> {
// Cache notifies all paths with "foo" the root path
// If you want to watch only "foo" value, you must filter other paths
@@ -81,9 +85,12 @@ public class MFTAgent {
decodedValue.ifPresent(v -> {
System.out.println(String.format("Value is: %s", v));
ObjectMapper mapper = new ObjectMapper();
+ TransferRequest request = null;
try {
- TransferRequest request = mapper.readValue(v, TransferRequest.class);
+ request = mapper.readValue(v, TransferRequest.class);
System.out.println("Received request " + request.getTransferId());
+ admin.updateTransferState(request.getTransferId(), new TransferState().setState("STARTING")
+ .setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
Connector inConnector = MFTAgent.this.resolveConnector(request.getSourceType(), "IN");
inConnector.init(request.getSourceId(), request.getSourceToken());
@@ -93,11 +100,30 @@ public class MFTAgent {
MetadataCollector metadataCollector = MFTAgent.this.resolveMetadataCollector(request.getSourceType());
ResourceMetadata metadata = metadataCollector.getGetResourceMetadata(request.getSourceId(), request.getSourceToken());
System.out.println("File size " + metadata.getResourceSize());
- String transferId = mediator.transfer(inConnector, outConnector, metadata);
+ admin.updateTransferState(request.getTransferId(), new TransferState().setState("STARTED")
+ .setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
+
+ String transferId = mediator.transfer(request.getTransferId(), inConnector, outConnector, metadata, (id, st) -> {
+ try {
+ admin.updateTransferState(id, st);
+ } catch (MFTAdminException e) {
+ e.printStackTrace();
+ }
+ });
+
System.out.println("Submitted transfer " + transferId);
} catch (Exception e) {
e.printStackTrace();
+ if (request != null) {
+ try {
+ admin.updateTransferState(request.getTransferId(), new TransferState().setState("FAILED")
+ .setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
+ } catch (MFTAdminException ex) {
+ ex.printStackTrace();
+ // Ignore
+ }
+ }
} finally {
System.out.println("Deleting key " + value.getKey());
kvClient.deleteKey(value.getKey()); // Due to bug in consul https://github.com/hashicorp/consul/issues/571
@@ -136,7 +162,6 @@ public class MFTAgent {
}
}
}, sessionRenewSeconds, sessionRenewSeconds, TimeUnit.SECONDS);
- MFTAdmin admin = new MFTAdmin();
admin.registerAgent(new AgentInfo()
.setId(agentId)
.setHost("localhost")
diff --git a/core/src/main/java/org/apache/airavata/mft/core/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
similarity index 77%
rename from core/src/main/java/org/apache/airavata/mft/core/TransportMediator.java
rename to agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index 2644bb0..389ea8d 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -15,14 +15,21 @@
* limitations under the License.
*/
-package org.apache.airavata.mft.core;
+package org.apache.airavata.mft.agent;
+import org.apache.airavata.mft.admin.models.TransferState;
+import org.apache.airavata.mft.core.CircularStreamingBuffer;
+import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.TransferTask;
import org.apache.airavata.mft.core.api.Connector;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
public class TransportMediator {
@@ -33,9 +40,8 @@ public class TransportMediator {
executor.shutdown();
}
- public String transfer(Connector inConnector, Connector outConnector, ResourceMetadata metadata) throws Exception {
-
- String transferId = UUID.randomUUID().toString();
+ public String transfer(String transferId, Connector inConnector, Connector outConnector, ResourceMetadata metadata,
+ BiConsumer<String, TransferState> onCallback) throws Exception {
CircularStreamingBuffer streamBuffer = new CircularStreamingBuffer();
ConnectorContext context = new ConnectorContext();
@@ -48,7 +54,7 @@ public class TransportMediator {
ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
- long startTime = System.currentTimeMillis();
+ long startTime = System.nanoTime();
futureList.add(completionService.submit(recvTask));
futureList.add(completionService.submit(sendTask));
@@ -70,6 +76,7 @@ public class TransportMediator {
// Snap, something went wrong in the task! Abort! Abort! Abort!
System.out.println("One task failed with error: " + e.getMessage());
e.printStackTrace();
+ onCallback.accept(transferId, new TransferState().setPercentage(0).setState("FAILED").setUpdateTimeMils(System.currentTimeMillis()));
for (Future<Integer> f : futureList) {
try {
Thread.sleep(1000);
@@ -82,10 +89,10 @@ public class TransportMediator {
}
}
- long endTime = System.currentTimeMillis();
-
- long time = (endTime - startTime) / 1000;
+ long endTime = System.nanoTime();
+ double time = (endTime - startTime) * 1.0 /1000000000;
+ onCallback.accept(transferId, new TransferState().setPercentage(100).setState("COMPLETED").setUpdateTimeMils(System.currentTimeMillis()));
System.out.println("Transfer Speed " + (metadata.getResourceSize() * 1.0 / time) / (1024 * 1024) + " MB/s");
System.out.println("Transfer " + transferId + " completed");
} catch (Exception e) {