You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/01/02 04:12:38 UTC
[airavata-mft] branch master updated: Admin client,
live agent monitor and agent register
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new 2e11193 Admin client, live agent monitor and agent register
2e11193 is described below
commit 2e111938139e5d76ec56840dcde08bfcb9a415bf
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Wed Jan 1 23:12:25 2020 -0500
Admin client, live agent monitor and agent register
---
admin/pom.xml | 26 +++++
.../org/apache/airavata/mft/admin/MFTAdmin.java | 110 +++++++++++++++++++++
.../airavata/mft/admin/MFTAdminException.java | 40 ++++++++
.../airavata/mft/admin/models/AgentInfo.java | 73 ++++++++++++++
.../mft/admin/models}/TransferRequest.java | 2 +-
agent/pom.xml | 6 ++
.../apache/airavata/mft/agent/ConsulTester.java | 2 +
.../org/apache/airavata/mft/agent/MFTAgent.java | 95 +++++++++++++++---
.../airavata/mft/core}/TransportMediator.java | 6 +-
pom.xml | 1 +
10 files changed, 343 insertions(+), 18 deletions(-)
diff --git a/admin/pom.xml b/admin/pom.xml
new file mode 100644
index 0000000..f1dd35b
--- /dev/null
+++ b/admin/pom.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata-mft</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>0.01-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>mft-admin</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>mft-core</artifactId>
+ <version>0.01-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.orbitz.consul</groupId>
+ <artifactId>consul-client</artifactId>
+ <version>${consul.client}</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java b/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java
new file mode 100644
index 0000000..27098a1
--- /dev/null
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.admin;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.ConsulException;
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.model.kv.Value;
+import org.apache.airavata.mft.admin.models.AgentInfo;
+import org.apache.airavata.mft.admin.models.TransferRequest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/*
+ mft/agents/messages/{agent-id} -> message
+ mft/agent/info/{agent-id} -> agent infos
+ mft/agent/live/{agent-id} -> live agent
+ */
+
+public class MFTAdmin {
+
+ private Consul client = Consul.builder().build();
+ private KeyValueClient kvClient = client.keyValueClient();
+ private ObjectMapper mapper = new ObjectMapper();
+
+ public void submitTransfer(String agentId, TransferRequest transferRequest) throws MFTAdminException {
+ try {
+ String asString = mapper.writeValueAsString(transferRequest);
+ kvClient.putValue("mft/agents/messages/" + agentId + "/" + transferRequest.getTransferId(), asString);
+ } catch (JsonProcessingException e) {
+ throw new MFTAdminException("Error in serializing transfer request", e);
+ }
+ }
+
+ public List<AgentInfo> listAgents() {
+ List<AgentInfo> agents = new ArrayList<>();
+ List<String> keys = kvClient.getKeys("mft/agents/info");
+ for (String key : keys) {
+ Optional<AgentInfo> agentInfo = getAgentInfo(key.substring(key.lastIndexOf("/") + 1));
+ agentInfo.ifPresent(agents::add);
+ }
+ return agents;
+ }
+
+ public Optional<AgentInfo> getAgentInfo(String agentId) {
+ Optional<Value> value = kvClient.getValue("mft/agents/info/" + agentId);
+ if (value.isPresent()) {
+ Value absVal = value.get();
+ if (absVal.getValue().isPresent()) {
+ String asStr = absVal.getValue().get();
+ try {
+ return Optional.of(mapper.readValue(asStr, AgentInfo.class));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ return Optional.empty();
+ }
+
+ public void registerAgent(AgentInfo agentInfo) throws MFTAdminException {
+ try {
+ String asString = mapper.writeValueAsString(agentInfo);
+ kvClient.putValue("mft/agents/info/" + agentInfo.getId(), asString);
+ } catch (JsonProcessingException e) {
+ throw new MFTAdminException("Error in serializing agent information", e);
+ }
+ }
+
+ public List<String> getLiveAgentIds() throws MFTAdminException {
+ try {
+ List<String> keys = kvClient.getKeys("mft/agent/live/");
+ return keys.stream().map(key -> key.substring(key.lastIndexOf("/") + 1)).collect(Collectors.toList());
+ } catch (ConsulException e) {
+ if (e.getCode() == 404) {
+ return Collections.emptyList();
+ }
+ throw new MFTAdminException("Error in fetching live agents", e);
+ } catch (Exception e) {
+ throw new MFTAdminException("Error in fetching live agents", e);
+ }
+ }
+
+ public List<AgentInfo> getLiveAgentInfos() throws MFTAdminException {
+ List<String> liveAgentIds = getLiveAgentIds();
+ return liveAgentIds.stream().map(id -> getAgentInfo(id).get()).collect(Collectors.toList());
+ }
+}
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdminException.java b/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdminException.java
new file mode 100644
index 0000000..f9fb2d9
--- /dev/null
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdminException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.admin;
+
+public class MFTAdminException extends Exception {
+ public MFTAdminException() {
+ super();
+ }
+
+ public MFTAdminException(String message) {
+ super(message);
+ }
+
+ public MFTAdminException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MFTAdminException(Throwable cause) {
+ super(cause);
+ }
+
+ protected MFTAdminException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
new file mode 100644
index 0000000..9bba816
--- /dev/null
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
@@ -0,0 +1,73 @@
+package org.apache.airavata.mft.admin.models;
+
+import java.util.List;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class AgentInfo {
+ private String id;
+ private String host;
+ private String user;
+ private boolean sudo;
+ private List<String> supportedProtocols;
+
+ public String getId() {
+ return id;
+ }
+
+ public AgentInfo setId(String id) {
+ this.id = id;
+ return this;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public AgentInfo setHost(String host) {
+ this.host = host;
+ return this;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public AgentInfo setUser(String user) {
+ this.user = user;
+ return this;
+ }
+
+ public boolean isSudo() {
+ return sudo;
+ }
+
+ public AgentInfo setSudo(boolean sudo) {
+ this.sudo = sudo;
+ return this;
+ }
+
+ public List<String> getSupportedProtocols() {
+ return supportedProtocols;
+ }
+
+ public AgentInfo setSupportedProtocols(List<String> supportedProtocols) {
+ this.supportedProtocols = supportedProtocols;
+ return this;
+ }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransferRequest.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
similarity index 98%
rename from agent/src/main/java/org/apache/airavata/mft/agent/TransferRequest.java
rename to admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
index 14ed4c9..5183a1c 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransferRequest.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.airavata.mft.agent;
+package org.apache.airavata.mft.admin.models;
import java.util.List;
diff --git a/agent/pom.xml b/agent/pom.xml
index 24dfbfd..77b2679 100644
--- a/agent/pom.xml
+++ b/agent/pom.xml
@@ -44,9 +44,15 @@
<version>0.01-SNAPSHOT</version>
</dependency>
<dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>mft-admin</artifactId>
+ <version>0.01-SNAPSHOT</version>
+ </dependency>
+ <dependency>
<groupId>com.orbitz.consul</groupId>
<artifactId>consul-client</artifactId>
<version>${consul.client}</version>
</dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/ConsulTester.java b/agent/src/main/java/org/apache/airavata/mft/agent/ConsulTester.java
index 0a0f1c8..499e603 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/ConsulTester.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/ConsulTester.java
@@ -20,6 +20,8 @@ package org.apache.airavata.mft.agent;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.orbitz.consul.Consul;
import com.orbitz.consul.KeyValueClient;
+import org.apache.airavata.mft.admin.models.TransferRequest;
+
import java.util.Collections;
public class ConsulTester {
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 92e851b..e52dd3d 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -19,9 +19,16 @@ package org.apache.airavata.mft.agent;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.orbitz.consul.Consul;
+import com.orbitz.consul.ConsulException;
import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.cache.ConsulCache;
import com.orbitz.consul.cache.KVCache;
+import com.orbitz.consul.model.kv.Value;
+import com.orbitz.consul.model.session.ImmutableSession;
+import com.orbitz.consul.model.session.SessionCreatedResponse;
+import org.apache.airavata.mft.admin.models.TransferRequest;
import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.TransportMediator;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.airavata.mft.transport.local.LocalMetadataCollector;
@@ -32,20 +39,34 @@ import org.apache.airavata.mft.transport.scp.SCPReceiver;
import org.apache.airavata.mft.transport.scp.SCPSender;
import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
public class MFTAgent {
- private TransportMediator mediator = new TransportMediator();
+ private final TransportMediator mediator = new TransportMediator();
private String agentId = "agent0";
- private Semaphore mainHold = new Semaphore(0);
+ private final Semaphore mainHold = new Semaphore(0);
- private void acceptRequests() {
- Consul client = Consul.builder().build();
- KeyValueClient kvClient = client.keyValueClient();
+ private Consul client;
+ private KeyValueClient kvClient;
+ private KVCache messageCache;
+ private ConsulCache.Listener<String, Value> cacheListener;
+
+ private final ScheduledExecutorService sessionRenewPool = Executors.newSingleThreadScheduledExecutor();
+ private long sessionRenewSeconds = 4;
+ private long sessionTTLSeconds = 10;
- KVCache messageCache = KVCache.newCache(kvClient, agentId + "/messages");
- messageCache.addListener(newValues -> {
+ public void init() {
+ client = Consul.builder().build();
+ kvClient = client.keyValueClient();
+ messageCache = KVCache.newCache(kvClient, "mft/agents/messages/" + agentId );
+ }
+
+ private void acceptRequests() {
+ cacheListener = newValues -> {
// Cache notifies all paths with "foo" the root path
// If you want to watch only "foo" value, you must filter other paths
@@ -60,12 +81,12 @@ public class MFTAgent {
TransferRequest request = mapper.readValue(v, TransferRequest.class);
System.out.println("Received request " + request.getTransferId());
- Connector inConnector = resolveConnector(request.getSourceType(), "IN");
+ Connector inConnector = MFTAgent.this.resolveConnector(request.getSourceType(), "IN");
inConnector.init(request.getSourceId(), request.getSourceToken());
- Connector outConnector = resolveConnector(request.getDestinationType(), "OUT");
+ Connector outConnector = MFTAgent.this.resolveConnector(request.getDestinationType(), "OUT");
outConnector.init(request.getDestinationId(), request.getDestinationToken());
- MetadataCollector metadataCollector = resolveMetadataCollector(request.getSourceType());
+ MetadataCollector metadataCollector = MFTAgent.this.resolveMetadataCollector(request.getSourceType());
ResourceMetadata metadata = metadataCollector.getGetResourceMetadata(request.getSourceId(), request.getSourceToken());
System.out.println("File size " + metadata.getResourceSize());
String transferId = mediator.transfer(inConnector, outConnector, metadata);
@@ -80,15 +101,65 @@ public class MFTAgent {
});
});
- });
+ };
+ messageCache.addListener(cacheListener);
messageCache.start();
}
+ public void connectAgent() {
+ ImmutableSession session = ImmutableSession.builder().name(agentId).behavior("delete").ttl(sessionTTLSeconds + "s").build();
+ SessionCreatedResponse sessResp = client.sessionClient().createSession(session);
+ String lockPath = "mft/agent/live/" + agentId;
+ boolean acquired = kvClient.acquireLock(lockPath, sessResp.getId());
+
+ if (acquired) {
+ sessionRenewPool.scheduleAtFixedRate(() -> {
+ try {
+ client.sessionClient().renewSession(sessResp.getId());
+ } catch (ConsulException e) {
+ if (e.getCode() == 404) {
+ stop();
+ }
+ e.printStackTrace();
+ } catch (Exception e) {
+ try {
+ boolean status = kvClient.acquireLock(lockPath, sessResp.getId());
+ if (!status) {
+ stop();
+ }
+ } catch (Exception ex) {
+ stop();
+ }
+ }
+ }, sessionRenewSeconds, sessionRenewSeconds, TimeUnit.SECONDS);
+ }
+
+ System.out.println("Lock status " + acquired);
+ }
+
+ public void disconnectAgent() {
+ sessionRenewPool.shutdown();
+ if (cacheListener != null) {
+ messageCache.removeListener(cacheListener);
+ }
+ }
+
+ public void stop() {
+ disconnectAgent();
+ mainHold.release();
+ }
+
+ public void start() {
+ init();
+ connectAgent();
+ acceptRequests();
+ }
public static void main(String args[]) throws InterruptedException {
MFTAgent agent = new MFTAgent();
- agent.acceptRequests();
+ agent.start();
agent.mainHold.acquire();
+ System.out.println("Shutting down agent");
}
// TODO load from reflection to avoid dependencies
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/core/src/main/java/org/apache/airavata/mft/core/TransportMediator.java
similarity index 94%
rename from agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
rename to core/src/main/java/org/apache/airavata/mft/core/TransportMediator.java
index 04f75a7..2644bb0 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/TransportMediator.java
@@ -15,12 +15,8 @@
* limitations under the License.
*/
-package org.apache.airavata.mft.agent;
+package org.apache.airavata.mft.core;
-import org.apache.airavata.mft.core.CircularStreamingBuffer;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.ResourceMetadata;
-import org.apache.airavata.mft.core.TransferTask;
import org.apache.airavata.mft.core.api.Connector;
import java.util.ArrayList;
diff --git a/pom.xml b/pom.xml
index 3ade4e2..9127fb9 100755
--- a/pom.xml
+++ b/pom.xml
@@ -45,6 +45,7 @@
<module>transport</module>
<module>agent</module>
<module>services</module>
+ <module>admin</module>
</modules>
<url>http://airavata.apache.org/</url>