You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/01/02 04:12:38 UTC

[airavata-mft] branch master updated: Admin client, live agent monitor and agent register

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e11193  Admin client, live agent monitor and agent register
2e11193 is described below

commit 2e111938139e5d76ec56840dcde08bfcb9a415bf
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Wed Jan 1 23:12:25 2020 -0500

    Admin client, live agent monitor and agent register
---
 admin/pom.xml                                      |  26 +++++
 .../org/apache/airavata/mft/admin/MFTAdmin.java    | 110 +++++++++++++++++++++
 .../airavata/mft/admin/MFTAdminException.java      |  40 ++++++++
 .../airavata/mft/admin/models/AgentInfo.java       |  73 ++++++++++++++
 .../mft/admin/models}/TransferRequest.java         |   2 +-
 agent/pom.xml                                      |   6 ++
 .../apache/airavata/mft/agent/ConsulTester.java    |   2 +
 .../org/apache/airavata/mft/agent/MFTAgent.java    |  95 +++++++++++++++---
 .../airavata/mft/core}/TransportMediator.java      |   6 +-
 pom.xml                                            |   1 +
 10 files changed, 343 insertions(+), 18 deletions(-)

diff --git a/admin/pom.xml b/admin/pom.xml
new file mode 100644
index 0000000..f1dd35b
--- /dev/null
+++ b/admin/pom.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>airavata-mft</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>0.01-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>mft-admin</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>mft-core</artifactId>
+            <version>0.01-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>com.orbitz.consul</groupId>
+            <artifactId>consul-client</artifactId>
+            <version>${consul.client}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java b/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java
new file mode 100644
index 0000000..27098a1
--- /dev/null
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.admin;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.ConsulException;
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.model.kv.Value;
+import org.apache.airavata.mft.admin.models.AgentInfo;
+import org.apache.airavata.mft.admin.models.TransferRequest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/*
+ mft/agents/messages/{agent-id} -> message
+ mft/agent/info/{agent-id} -> agent infos
+ mft/agent/live/{agent-id} -> live agent
+ */
+
+public class MFTAdmin {
+
+    private Consul client = Consul.builder().build();
+    private KeyValueClient kvClient = client.keyValueClient();
+    private ObjectMapper mapper = new ObjectMapper();
+
+    public void submitTransfer(String agentId, TransferRequest transferRequest) throws MFTAdminException {
+        try {
+            String asString = mapper.writeValueAsString(transferRequest);
+            kvClient.putValue("mft/agents/messages/"  + agentId + "/" + transferRequest.getTransferId(), asString);
+        } catch (JsonProcessingException e) {
+            throw new MFTAdminException("Error in serializing transfer request", e);
+        }
+    }
+
+    public List<AgentInfo> listAgents() {
+        List<AgentInfo> agents = new ArrayList<>();
+        List<String> keys = kvClient.getKeys("mft/agents/info");
+        for (String key : keys) {
+            Optional<AgentInfo> agentInfo = getAgentInfo(key.substring(key.lastIndexOf("/") + 1));
+            agentInfo.ifPresent(agents::add);
+        }
+        return agents;
+    }
+
+    public Optional<AgentInfo> getAgentInfo(String agentId) {
+        Optional<Value> value = kvClient.getValue("mft/agents/info/" + agentId);
+        if (value.isPresent()) {
+            Value absVal = value.get();
+            if (absVal.getValue().isPresent()) {
+                String asStr = absVal.getValue().get();
+                try {
+                    return Optional.of(mapper.readValue(asStr, AgentInfo.class));
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    public void registerAgent(AgentInfo agentInfo) throws MFTAdminException {
+        try {
+            String asString = mapper.writeValueAsString(agentInfo);
+            kvClient.putValue("mft/agents/info/" + agentInfo.getId(), asString);
+        } catch (JsonProcessingException e) {
+            throw new MFTAdminException("Error in serializing agent information", e);
+        }
+    }
+
+    public List<String> getLiveAgentIds() throws MFTAdminException {
+        try {
+            List<String> keys = kvClient.getKeys("mft/agent/live/");
+            return keys.stream().map(key -> key.substring(key.lastIndexOf("/") + 1)).collect(Collectors.toList());
+        } catch (ConsulException e) {
+            if (e.getCode() == 404) {
+                return Collections.emptyList();
+            }
+            throw new MFTAdminException("Error in fetching live agents", e);
+        } catch (Exception e) {
+            throw new MFTAdminException("Error in fetching live agents", e);
+        }
+    }
+
+    public List<AgentInfo> getLiveAgentInfos() throws MFTAdminException {
+        List<String> liveAgentIds = getLiveAgentIds();
+        return liveAgentIds.stream().map(id -> getAgentInfo(id).get()).collect(Collectors.toList());
+    }
+}
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdminException.java b/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdminException.java
new file mode 100644
index 0000000..f9fb2d9
--- /dev/null
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdminException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.admin;
+
+public class MFTAdminException extends Exception {
+    public MFTAdminException() {
+        super();
+    }
+
+    public MFTAdminException(String message) {
+        super(message);
+    }
+
+    public MFTAdminException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MFTAdminException(Throwable cause) {
+        super(cause);
+    }
+
+    protected MFTAdminException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
new file mode 100644
index 0000000..9bba816
--- /dev/null
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
@@ -0,0 +1,73 @@
+package org.apache.airavata.mft.admin.models;
+
+import java.util.List;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class AgentInfo {
+    private String id;
+    private String host;
+    private String user;
+    private boolean sudo;
+    private List<String> supportedProtocols;
+
+    public String getId() {
+        return id;
+    }
+
+    public AgentInfo setId(String id) {
+        this.id = id;
+        return this;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public AgentInfo setHost(String host) {
+        this.host = host;
+        return this;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public AgentInfo setUser(String user) {
+        this.user = user;
+        return this;
+    }
+
+    public boolean isSudo() {
+        return sudo;
+    }
+
+    public AgentInfo setSudo(boolean sudo) {
+        this.sudo = sudo;
+        return this;
+    }
+
+    public List<String> getSupportedProtocols() {
+        return supportedProtocols;
+    }
+
+    public AgentInfo setSupportedProtocols(List<String> supportedProtocols) {
+        this.supportedProtocols = supportedProtocols;
+        return this;
+    }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransferRequest.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
similarity index 98%
rename from agent/src/main/java/org/apache/airavata/mft/agent/TransferRequest.java
rename to admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
index 14ed4c9..5183a1c 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransferRequest.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.airavata.mft.agent;
+package org.apache.airavata.mft.admin.models;
 
 import java.util.List;
 
diff --git a/agent/pom.xml b/agent/pom.xml
index 24dfbfd..77b2679 100644
--- a/agent/pom.xml
+++ b/agent/pom.xml
@@ -44,9 +44,15 @@
             <version>0.01-SNAPSHOT</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>mft-admin</artifactId>
+            <version>0.01-SNAPSHOT</version>
+        </dependency>
+        <dependency>
             <groupId>com.orbitz.consul</groupId>
             <artifactId>consul-client</artifactId>
             <version>${consul.client}</version>
         </dependency>
+
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/ConsulTester.java b/agent/src/main/java/org/apache/airavata/mft/agent/ConsulTester.java
index 0a0f1c8..499e603 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/ConsulTester.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/ConsulTester.java
@@ -20,6 +20,8 @@ package org.apache.airavata.mft.agent;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.orbitz.consul.Consul;
 import com.orbitz.consul.KeyValueClient;
+import org.apache.airavata.mft.admin.models.TransferRequest;
+
 import java.util.Collections;
 
 public class ConsulTester {
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 92e851b..e52dd3d 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -19,9 +19,16 @@ package org.apache.airavata.mft.agent;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.orbitz.consul.Consul;
+import com.orbitz.consul.ConsulException;
 import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.cache.ConsulCache;
 import com.orbitz.consul.cache.KVCache;
+import com.orbitz.consul.model.kv.Value;
+import com.orbitz.consul.model.session.ImmutableSession;
+import com.orbitz.consul.model.session.SessionCreatedResponse;
+import org.apache.airavata.mft.admin.models.TransferRequest;
 import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.TransportMediator;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.transport.local.LocalMetadataCollector;
@@ -32,20 +39,34 @@ import org.apache.airavata.mft.transport.scp.SCPReceiver;
 import org.apache.airavata.mft.transport.scp.SCPSender;
 
 import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 public class MFTAgent {
 
-    private TransportMediator mediator = new TransportMediator();
+    private final TransportMediator mediator = new TransportMediator();
     private String agentId = "agent0";
-    private Semaphore mainHold = new Semaphore(0);
+    private final Semaphore mainHold = new Semaphore(0);
 
-    private void acceptRequests() {
-        Consul client = Consul.builder().build();
-        KeyValueClient kvClient = client.keyValueClient();
+    private Consul client;
+    private KeyValueClient kvClient;
+    private KVCache messageCache;
+    private ConsulCache.Listener<String, Value> cacheListener;
+
+    private final ScheduledExecutorService sessionRenewPool = Executors.newSingleThreadScheduledExecutor();
+    private long sessionRenewSeconds = 4;
+    private long sessionTTLSeconds = 10;
 
-        KVCache messageCache = KVCache.newCache(kvClient, agentId + "/messages");
-        messageCache.addListener(newValues -> {
+    public void init() {
+        client = Consul.builder().build();
+        kvClient = client.keyValueClient();
+        messageCache = KVCache.newCache(kvClient, "mft/agents/messages/" + agentId );
+    }
+
+    private void acceptRequests() {
+        cacheListener = newValues -> {
             // Cache notifies all paths with "foo" the root path
             // If you want to watch only "foo" value, you must filter other paths
 
@@ -60,12 +81,12 @@ public class MFTAgent {
                         TransferRequest request = mapper.readValue(v, TransferRequest.class);
                         System.out.println("Received request " + request.getTransferId());
 
-                        Connector inConnector = resolveConnector(request.getSourceType(), "IN");
+                        Connector inConnector = MFTAgent.this.resolveConnector(request.getSourceType(), "IN");
                         inConnector.init(request.getSourceId(), request.getSourceToken());
-                        Connector outConnector = resolveConnector(request.getDestinationType(), "OUT");
+                        Connector outConnector = MFTAgent.this.resolveConnector(request.getDestinationType(), "OUT");
                         outConnector.init(request.getDestinationId(), request.getDestinationToken());
 
-                        MetadataCollector metadataCollector = resolveMetadataCollector(request.getSourceType());
+                        MetadataCollector metadataCollector = MFTAgent.this.resolveMetadataCollector(request.getSourceType());
                         ResourceMetadata metadata = metadataCollector.getGetResourceMetadata(request.getSourceId(), request.getSourceToken());
                         System.out.println("File size " + metadata.getResourceSize());
                         String transferId = mediator.transfer(inConnector, outConnector, metadata);
@@ -80,15 +101,65 @@ public class MFTAgent {
                 });
 
             });
-        });
+        };
+        messageCache.addListener(cacheListener);
         messageCache.start();
     }
 
+    public void connectAgent() {
+        ImmutableSession session = ImmutableSession.builder().name(agentId).behavior("delete").ttl(sessionTTLSeconds + "s").build();
+        SessionCreatedResponse sessResp = client.sessionClient().createSession(session);
+        String lockPath = "mft/agent/live/" + agentId;
+        boolean acquired = kvClient.acquireLock(lockPath, sessResp.getId());
+
+        if (acquired) {
+            sessionRenewPool.scheduleAtFixedRate(() -> {
+                try {
+                    client.sessionClient().renewSession(sessResp.getId());
+                } catch (ConsulException e) {
+                    if (e.getCode() == 404) {
+                        stop();
+                    }
+                    e.printStackTrace();
+                } catch (Exception e) {
+                    try {
+                        boolean status = kvClient.acquireLock(lockPath, sessResp.getId());
+                        if (!status) {
+                            stop();
+                        }
+                    } catch (Exception ex) {
+                        stop();
+                    }
+                }
+        }, sessionRenewSeconds, sessionRenewSeconds, TimeUnit.SECONDS);
+        }
+
+        System.out.println("Lock status " + acquired);
+    }
+
+    public void disconnectAgent() {
+        sessionRenewPool.shutdown();
+        if (cacheListener != null) {
+            messageCache.removeListener(cacheListener);
+        }
+    }
+
+    public void stop() {
+        disconnectAgent();
+        mainHold.release();
+    }
+
+    public void start() {
+        init();
+        connectAgent();
+        acceptRequests();
+    }
 
     public static void main(String args[]) throws InterruptedException {
         MFTAgent agent = new MFTAgent();
-        agent.acceptRequests();
+        agent.start();
         agent.mainHold.acquire();
+        System.out.println("Shutting down agent");
     }
 
     // TODO load from reflection to avoid dependencies
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/core/src/main/java/org/apache/airavata/mft/core/TransportMediator.java
similarity index 94%
rename from agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
rename to core/src/main/java/org/apache/airavata/mft/core/TransportMediator.java
index 04f75a7..2644bb0 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/TransportMediator.java
@@ -15,12 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.airavata.mft.agent;
+package org.apache.airavata.mft.core;
 
-import org.apache.airavata.mft.core.CircularStreamingBuffer;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.ResourceMetadata;
-import org.apache.airavata.mft.core.TransferTask;
 import org.apache.airavata.mft.core.api.Connector;
 
 import java.util.ArrayList;
diff --git a/pom.xml b/pom.xml
index 3ade4e2..9127fb9 100755
--- a/pom.xml
+++ b/pom.xml
@@ -45,6 +45,7 @@
         <module>transport</module>
         <module>agent</module>
         <module>services</module>
+        <module>admin</module>
     </modules>
 
     <url>http://airavata.apache.org/</url>