You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2023/01/04 01:34:54 UTC
[airavata-mft] branch master updated: Reusing AWS agents and cleaning up after a idle time threshold
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new d317836 Reusing AWS agents and cleaning up after a idle time threshold
d317836 is described below
commit d317836325f787516659b35b74f7c30524cb619b
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Jan 3 20:34:36 2023 -0500
Reusing AWS agents and cleaning up after a idle time threshold
---
.../airavata/mft/api/handler/MFTApiHandler.java | 2 +-
.../apache/airavata/mft/admin/MFTConsulClient.java | 18 +-
.../mft/controller/AgentTransferDispatcher.java | 262 ------------------
.../apache/airavata/mft/controller/AppConfig.java | 4 +-
.../airavata/mft/controller/MFTController.java | 9 +-
.../mft/controller/TransferDispatcher.java | 160 +++++++++++
.../mft/controller/spawner/AgentOrchestrator.java | 294 +++++++++++++++++++++
.../{CloudAgentSpawner.java => AgentSpawner.java} | 4 +-
.../mft/controller/spawner/EC2AgentSpawner.java | 43 ++-
.../mft/controller/spawner/SpawnerSelector.java | 2 +-
10 files changed, 513 insertions(+), 285 deletions(-)
diff --git a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
index cc00835..7230619 100644
--- a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
+++ b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
@@ -115,7 +115,7 @@ public class MFTApiHandler extends MFTTransferServiceGrpc.MFTTransferServiceImpl
String transferId = mftConsulClient.submitTransfer(request);
logger.info("Submitted the transfer request {}", transferId);
- mftConsulClient.saveTransferState(transferId, new TransferState()
+ mftConsulClient.saveTransferState(transferId, null, new TransferState()
.setUpdateTimeMils(System.currentTimeMillis())
.setState("RECEIVED").setPercentage(0)
.setPublisher("api")
diff --git a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
index 89239c1..bf4b7eb 100644
--- a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
+++ b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
@@ -92,7 +92,7 @@ public class MFTConsulClient {
5. Agent publishes the transfer state to controller
mft/controller/messages/states/{transferId}/{agentId}/{requestId}/{md5sum(sourcePath:destinationPath)}/{timeMils()} -> Transfer State
6. Controller saves the transfer state in
- mft/transfer/state/{transferId}/{UUID} -> Transfer State
+ mft/transfer/state/{transferId}/{requestId}/{UUID} -> Transfer State
*/
@@ -291,10 +291,14 @@ public class MFTConsulClient {
* @param transferState
* @throws MFTConsulClientException
*/
- public void saveTransferState(String transferId, TransferState transferState) throws MFTConsulClientException {
+ public void saveTransferState(String transferId, String agentRequestId, TransferState transferState) throws MFTConsulClientException {
try {
String asStr = mapper.writeValueAsString(transferState);
- kvClient.putValue(TRANSFER_STATE_PATH + transferId + "/" + UUID.randomUUID().toString(), asStr);
+ if (agentRequestId == null) {
+ kvClient.putValue(TRANSFER_STATE_PATH + transferId + "/" + UUID.randomUUID().toString(), asStr);
+ } else {
+ kvClient.putValue(TRANSFER_STATE_PATH + transferId + "/" + agentRequestId + "/" + UUID.randomUUID().toString(), asStr);
+ }
logger.info("Saved transfer status " + asStr);
} catch (Exception e) {
@@ -339,7 +343,11 @@ public class MFTConsulClient {
* @throws IOException
*/
public List<TransferState> getTransferStates(String transferId) throws IOException {
- List<String> keys = kvClient.getKeys(TRANSFER_STATE_PATH + transferId);
+ return getTransferStates(transferId, null);
+ }
+
+ public List<TransferState> getTransferStates(String transferId, String agentRequestId) throws IOException {
+ List<String> keys = kvClient.getKeys(TRANSFER_STATE_PATH + transferId + (agentRequestId == null? "" : "/" + agentRequestId));
List<TransferState> allStates = new ArrayList<>();
@@ -351,7 +359,7 @@ public class MFTConsulClient {
}
List<TransferState> sortedStates = allStates.stream().sorted((o1, o2) ->
(o1.getUpdateTimeMils() - o2.getUpdateTimeMils()) < 0 ? -1 :
- (o1.getUpdateTimeMils() - o2.getUpdateTimeMils()) == 0 ? 0 : 1).collect(Collectors.toList());
+ (o1.getUpdateTimeMils() - o2.getUpdateTimeMils()) == 0 ? 0 : 1).collect(Collectors.toList());
return sortedStates;
}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/AgentTransferDispatcher.java b/controller/src/main/java/org/apache/airavata/mft/controller/AgentTransferDispatcher.java
deleted file mode 100644
index 21cf35d..0000000
--- a/controller/src/main/java/org/apache/airavata/mft/controller/AgentTransferDispatcher.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.controller;
-
-import org.apache.airavata.mft.admin.MFTConsulClient;
-import org.apache.airavata.mft.admin.models.TransferState;
-import org.apache.airavata.mft.agent.stub.AgentTransferRequest;
-import org.apache.airavata.mft.api.service.EndpointPaths;
-import org.apache.airavata.mft.api.service.TransferApiRequest;
-import org.apache.airavata.mft.controller.spawner.CloudAgentSpawner;
-import org.apache.airavata.mft.controller.spawner.SpawnerSelector;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.stream.Collectors;
-
-public class AgentTransferDispatcher {
- private static final Logger logger = LoggerFactory.getLogger(AgentTransferDispatcher.class);
-
- //getId(transferRequest):Pair<TransferApiRequest, AgentTransferRequest.Builder>
-
- private final Map<String, Pair<TransferApiRequest, AgentTransferRequest.Builder>> pendingTransferRequests = new ConcurrentHashMap<>();
- private final Map<String, String> pendingTransferIds = new ConcurrentHashMap<>();
- //getId(transferRequest):consulKey
-
- private final Map<String, String> pendingTransferConsulKeys = new ConcurrentHashMap<>();
-
- //getId(transferRequest):CloudAgentSpawner
- private final Map<String, CloudAgentSpawner> pendingAgentSpawners = new ConcurrentHashMap<>();
-
- // getId(transferRequest):Set(TransferId)
- private final Map<String, Set<String>> runningAgentCache = new ConcurrentHashMap<>();
-
- // AgentID:Spawner - Use this to keep track of agent spawners. This is required to terminate agent
- private final Map<String, CloudAgentSpawner> agentSpawners = new ConcurrentHashMap<>();
-
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-
- // Temporarily store consul key until the optimizer spins up Agents. This will block the same pending transfer
- // being handled twice
- private final Set<String> optimizingConsulKeys = new ConcurrentSkipListSet<>();
-
- @Autowired
- private MFTConsulClient mftConsulClient;
-
- public void init() {
- scheduler.scheduleWithFixedDelay(() -> {
- pendingAgentSpawners.forEach((key, spawner) -> {
- if (spawner.getLaunchState().isDone()) {
- String transferId = pendingTransferIds.get(key);
- Pair<TransferApiRequest, AgentTransferRequest.Builder> transferRequests = pendingTransferRequests.get(key);
- String consulKey = pendingTransferConsulKeys.get(key);
-
- try {
- String agentId = spawner.getLaunchState().get();
- List<String> liveAgentIds = mftConsulClient.getLiveAgentIds();
- if (liveAgentIds.stream().noneMatch(id -> id.equals(agentId))) {
- throw new Exception("Agent was not registered even though the agent is up");
- }
-
- submitTransferToAgent(Collections.singletonList(agentId), transferId,
- transferRequests.getLeft(), transferRequests.getRight(), consulKey);
-
- // Use this to terminate agent in future
- agentSpawners.put(agentId, spawner);
-
- } catch (Exception e) {
- logger.error("Failed to launch agent for key {}", key, e);
- try {
- mftConsulClient.saveTransferState(transferId, new TransferState()
- .setUpdateTimeMils(System.currentTimeMillis())
- .setState("FAILED").setPercentage(0)
- .setPublisher("controller")
- .setDescription("Failed to launch the agent. " + ExceptionUtils.getRootCauseMessage(e)));
- } catch (Exception e2) {
- logger.error("Failed to submit transfer fail error for transfer id {}", transferId, e2);
- }
-
- logger.info("Removing consul key {}", consulKey);
- mftConsulClient.getKvClient().deleteKey(consulKey);
- logger.info("Terminating the spawner");
- spawner.terminate();
-
- } finally {
- pendingTransferIds.remove(key);
- pendingTransferRequests.remove(key);
- pendingAgentSpawners.remove(key);
- pendingTransferConsulKeys.remove(key);
- optimizingConsulKeys.remove(consulKey);
- }
- }
- });
- }, 3, 5, TimeUnit.SECONDS);
- }
-
-
- public void submitTransferToAgent(List<String> filteredAgents, String transferId,
- TransferApiRequest transferRequest,
- AgentTransferRequest.Builder agentTransferRequestTemplate, String consulKey)
- throws Exception {
-
- try {
- if (filteredAgents.isEmpty()) {
- mftConsulClient.saveTransferState(transferId, new TransferState()
- .setUpdateTimeMils(System.currentTimeMillis())
- .setState("FAILED").setPercentage(0)
- .setPublisher("controller")
- .setDescription("No qualifying agent was found to orchestrate the transfer"));
- return;
- }
-
- mftConsulClient.saveTransferState(transferId, new TransferState()
- .setState("STARTING")
- .setPercentage(0)
- .setUpdateTimeMils(System.currentTimeMillis())
- .setPublisher("controller")
- .setDescription("Initializing the transfer"));
-
- AgentTransferRequest.Builder agentTransferRequest = agentTransferRequestTemplate.clone();
-
- agentTransferRequest.setRequestId(UUID.randomUUID().toString());
- for (EndpointPaths ep : transferRequest.getEndpointPathsList()) {
- agentTransferRequest.addEndpointPaths(org.apache.airavata.mft.agent.stub.EndpointPaths.newBuilder()
- .setSourcePath(ep.getSourcePath())
- .setDestinationPath(ep.getDestinationPath()).buildPartial());
- }
-
- // TODO use a better way to select the right agent
- mftConsulClient.commandTransferToAgent(filteredAgents.get(0), transferId, agentTransferRequest.build());
- mftConsulClient.markTransferAsProcessed(transferId, transferRequest);
- logger.info("Marked transfer {} as processed", transferId);
- } finally {
- mftConsulClient.getKvClient().deleteKey(consulKey);
- }
- }
-
- public void handleTransferRequest(String transferId,
- TransferApiRequest transferRequest,
- AgentTransferRequest.Builder agentTransferRequestTemplate,
- String consulKey) throws Exception{
-
- if (optimizingConsulKeys.contains(consulKey)) {
- logger.info("Ignoring handling transfer id {} as it is already in optimizing stage", transferId);
- return;
- }
-
- logger.info("Handling transfer id {} with consul key {}", transferId, consulKey);
- List<String> liveAgentIds = mftConsulClient.getLiveAgentIds();
-
- Map<String, Integer> targetAgentsMap = transferRequest.getTargetAgentsMap();
- List<String> userProvidedAgents = liveAgentIds.stream().filter(targetAgentsMap::containsKey).collect(Collectors.toList());
- List<String> optimizedAgents = new ArrayList<>();
-
- if (transferRequest.getOptimizeTransferPath()) {
-
- Set<String> sourceAgents = runningAgentCache.get(getId(transferRequest, true));
- if (sourceAgents != null) {
- optimizedAgents.addAll(liveAgentIds.stream().filter(sourceAgents::contains).collect(Collectors.toList()));
- }
-
- Set<String> destAgents = runningAgentCache.get(getId(transferRequest, false));
- if (destAgents != null) {
- optimizedAgents.addAll(liveAgentIds.stream().filter(destAgents::contains).collect(Collectors.toList()));
- }
-
- if (optimizedAgents.isEmpty()) {
- Optional<CloudAgentSpawner> sourceSpawner = SpawnerSelector.selectSpawner(
- agentTransferRequestTemplate.getSourceStorage(),
- agentTransferRequestTemplate.getSourceSecret());
-
- Optional<CloudAgentSpawner> destSpawner = SpawnerSelector.selectSpawner(
- agentTransferRequestTemplate.getDestinationStorage(),
- agentTransferRequestTemplate.getDestinationSecret());
-
- if (sourceSpawner.isPresent()) {
- logger.info("Launching {} spawner in source side for transfer {}",
- sourceSpawner.get().getClass().getName(), transferId);
-
- sourceSpawner.get().launch();
- pendingAgentSpawners.put(getId(transferRequest, true), sourceSpawner.get());
- pendingTransferRequests.put(getId(transferRequest, true),
- Pair.of(transferRequest, agentTransferRequestTemplate));
- pendingTransferIds.put(getId(transferRequest, true), transferId);
- pendingTransferConsulKeys.put(getId(transferRequest, true), consulKey);
- optimizingConsulKeys.add(consulKey);
- return;
- } else if (destSpawner.isPresent()) {
- logger.info("Launching {} spawner in destination side for transfer {}",
- destSpawner.get().getClass().getName(), transferId);
-
- destSpawner.get().launch();
- pendingAgentSpawners.put(getId(transferRequest, false), destSpawner.get());
- pendingTransferRequests.put(getId(transferRequest, false),
- Pair.of(transferRequest, agentTransferRequestTemplate));
- pendingTransferIds.put(getId(transferRequest, false), transferId);
- pendingTransferConsulKeys.put(getId(transferRequest, false), consulKey);
- optimizingConsulKeys.add(consulKey);
- return;
- } else {
- logger.warn("No optimizing path is available. Moving user provided agents");
- submitTransferToAgent(userProvidedAgents, transferId,
- transferRequest,
- agentTransferRequestTemplate,
- consulKey);
- }
- } else {
- logger.info("Using optimized agents for transfer {}", transferId);
- submitTransferToAgent(optimizedAgents, transferId,
- transferRequest,
- agentTransferRequestTemplate,
- consulKey);
- }
- }
-
- if (userProvidedAgents.isEmpty()) {
- if (liveAgentIds.isEmpty()) {
- logger.warn("No live agent available to perform the transfer.");
- return;
- }
- logger.info("No agent selection criteria was provided. Going with the local agent");
- // TODO select the local agent
- submitTransferToAgent(liveAgentIds, transferId,
- transferRequest,
- agentTransferRequestTemplate,
- consulKey);
-
- } else {
- submitTransferToAgent(userProvidedAgents, transferId,
- transferRequest,
- agentTransferRequestTemplate,
- consulKey);
- }
- }
-
- private String getId(TransferApiRequest transferRequest, boolean isSource) {
- if (isSource) {
- return transferRequest.getSourceStorageId() + transferRequest.getSourceSecretId();
- } else {
- return transferRequest.getDestinationStorageId() + transferRequest.getDestinationStorageId();
- }
- }
-}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/AppConfig.java b/controller/src/main/java/org/apache/airavata/mft/controller/AppConfig.java
index ca31060..73fbe0f 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/AppConfig.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/AppConfig.java
@@ -66,8 +66,8 @@ public class AppConfig {
}
@Bean
- public AgentTransferDispatcher pathOptimizer() {
- AgentTransferDispatcher agentTransferDispatcher = new AgentTransferDispatcher();
+ public TransferDispatcher pathOptimizer() {
+ TransferDispatcher agentTransferDispatcher = new TransferDispatcher();
agentTransferDispatcher.init();
return agentTransferDispatcher;
}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
index a4986ab..2870571 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
@@ -24,8 +24,6 @@ import com.orbitz.consul.cache.ConsulCache;
import com.orbitz.consul.cache.KVCache;
import com.orbitz.consul.model.kv.Value;
import org.apache.airavata.mft.admin.MFTConsulClient;
-import org.apache.airavata.mft.admin.MFTConsulClientException;
-import org.apache.airavata.mft.admin.models.AgentInfo;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.agent.stub.AgentTransferRequest;
import org.apache.airavata.mft.api.service.TransferApiRequest;
@@ -40,15 +38,12 @@ import org.springframework.context.annotation.ComponentScan;
import javax.annotation.PreDestroy;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
@SpringBootApplication()
@ComponentScan(basePackages = {"org.apache.airavata.mft"})
@@ -72,7 +67,7 @@ public class MFTController implements CommandLineRunner {
private MFTConsulClient mftConsulClient;
@Autowired
- private AgentTransferDispatcher pathOptimizer;
+ private TransferDispatcher pathOptimizer;
public void init() {
logger.info("Initializing the Controller");
@@ -155,7 +150,7 @@ public class MFTController implements CommandLineRunner {
String time = parts[4];
TransferState transferState = mapper.readValue(valAsStr, TransferState.class);
- mftConsulClient.saveTransferState(transferId, transferState.setChildId(pathHash));
+ mftConsulClient.saveTransferState(transferId, agentRequestId, transferState.setChildId(pathHash));
}
} catch (Exception e) {
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java b/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java
new file mode 100644
index 0000000..bf3b153
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller;
+
+import org.apache.airavata.mft.admin.MFTConsulClient;
+import org.apache.airavata.mft.admin.models.TransferState;
+import org.apache.airavata.mft.agent.stub.AgentTransferRequest;
+import org.apache.airavata.mft.api.service.EndpointPaths;
+import org.apache.airavata.mft.api.service.TransferApiRequest;
+import org.apache.airavata.mft.controller.spawner.AgentOrchestrator;
+import org.apache.airavata.mft.controller.spawner.AgentSpawner;
+import org.apache.airavata.mft.controller.spawner.SpawnerSelector;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+
+public class TransferDispatcher {
+ private static final Logger logger = LoggerFactory.getLogger(TransferDispatcher.class);
+
+ private AgentOrchestrator agentOrchestrator;
+
+ @Autowired
+ private MFTConsulClient mftConsulClient;
+
+ public void init() {
+ agentOrchestrator = new AgentOrchestrator(this);
+ agentOrchestrator.init();
+ }
+
+ public void submitTransferToAgent(List<String> filteredAgents, String transferId,
+ TransferApiRequest transferRequest,
+ AgentTransferRequest agentTransferRequest, String consulKey) {
+
+ try {
+ if (filteredAgents.isEmpty()) {
+ mftConsulClient.saveTransferState(transferId, null, new TransferState()
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setState("FAILED").setPercentage(0)
+ .setPublisher("controller")
+ .setDescription("No qualifying agent was found to orchestrate the transfer"));
+ return;
+ }
+
+ mftConsulClient.saveTransferState(transferId,null, new TransferState()
+ .setState("STARTING")
+ .setPercentage(0)
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setPublisher("controller")
+ .setDescription("Initializing the transfer"));
+
+ // TODO use a better way to select the right agent
+ mftConsulClient.commandTransferToAgent(filteredAgents.get(0), transferId, agentTransferRequest);
+ mftConsulClient.markTransferAsProcessed(transferId, transferRequest);
+ logger.info("Marked transfer {} as processed", transferId);
+
+ } catch (Exception e) {
+
+ logger.error("Failed to submit the transfer {} to agent", transferId, e);
+
+ try {
+ mftConsulClient.saveTransferState(transferId, null, new TransferState()
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setState("FAILED").setPercentage(0)
+ .setPublisher("controller")
+ .setDescription("Failed to submit the transfer to agent. Error: " + ExceptionUtils.getRootCauseMessage(e)));
+ } catch (Exception e2) {
+ // Ignore
+ logger.warn("Failed to update the failed transfer state for transfer id {}", transferId, e);
+ }
+ } finally {
+ mftConsulClient.getKvClient().deleteKey(consulKey);
+ }
+ }
+
+ public void handleTransferRequest(String transferId,
+ TransferApiRequest transferRequest,
+ AgentTransferRequest.Builder agentTransferRequestTemplate,
+ String consulKey) throws Exception{
+
+ if (this.agentOrchestrator.isAnAgentDeploying(consulKey)) {
+ logger.info("Ignoring handling transfer id {} as it is already in optimizing stage", transferId);
+ return;
+ }
+
+ logger.info("Handling transfer id {} with consul key {}", transferId, consulKey);
+ List<String> liveAgentIds = mftConsulClient.getLiveAgentIds();
+
+ Map<String, Integer> targetAgentsMap = transferRequest.getTargetAgentsMap();
+ List<String> userProvidedAgents = liveAgentIds.stream().filter(targetAgentsMap::containsKey).collect(Collectors.toList());
+
+ AgentTransferRequest.Builder agentTransferRequestBuilder = agentTransferRequestTemplate.clone();
+
+ agentTransferRequestBuilder.setRequestId(UUID.randomUUID().toString());
+ for (EndpointPaths ep : transferRequest.getEndpointPathsList()) {
+ agentTransferRequestBuilder.addEndpointPaths(org.apache.airavata.mft.agent.stub.EndpointPaths.newBuilder()
+ .setSourcePath(ep.getSourcePath())
+ .setDestinationPath(ep.getDestinationPath()).buildPartial());
+ }
+
+ AgentTransferRequest agentTransferRequest = agentTransferRequestBuilder.build();
+
+ if (transferRequest.getOptimizeTransferPath()) {
+ boolean agentLaunching = agentOrchestrator.tryLaunchingAgent(
+ transferId, transferRequest,
+ agentTransferRequest,
+ consulKey);
+
+ if (!agentLaunching) {
+ logger.warn("No optimizing path is available. Moving user provided agents");
+ submitTransferToAgent(userProvidedAgents, transferId,
+ transferRequest,
+ agentTransferRequest,
+ consulKey);
+ }
+
+ } else if (userProvidedAgents.isEmpty()) {
+ if (liveAgentIds.isEmpty()) {
+ logger.warn("No live agent available to perform the transfer.");
+ return;
+ }
+ logger.info("No agent selection criteria was provided. Going with the local agent");
+ // TODO select the local agent
+ submitTransferToAgent(liveAgentIds, transferId,
+ transferRequest,
+ agentTransferRequest,
+ consulKey);
+
+ } else {
+ submitTransferToAgent(userProvidedAgents, transferId,
+ transferRequest,
+ agentTransferRequest,
+ consulKey);
+ }
+ }
+
+ public MFTConsulClient getMftConsulClient() {
+ return mftConsulClient;
+ }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/spawner/AgentOrchestrator.java b/controller/src/main/java/org/apache/airavata/mft/controller/spawner/AgentOrchestrator.java
new file mode 100644
index 0000000..0ccec2e
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/spawner/AgentOrchestrator.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.spawner;
+
+import org.apache.airavata.mft.admin.models.TransferState;
+import org.apache.airavata.mft.agent.stub.AgentTransferRequest;
+import org.apache.airavata.mft.api.service.TransferApiRequest;
+import org.apache.airavata.mft.controller.TransferDispatcher;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+public class AgentOrchestrator {
+
+ private static final Logger logger = LoggerFactory.getLogger(AgentOrchestrator.class);
+
+ private final int SPAWNER_MAX_IDLE_SECONDS = 30;
+
+ private class TransferInfo {
+ private final String transferId;
+ private final AgentTransferRequest agentTransferRequest;
+ private final TransferApiRequest transferApiRequest;
+
+ // Temporarily store consul key until the optimizer spins up Agents. This will block the same pending transfer
+ // being handled twice
+ private final String consulKey;
+
+ public TransferInfo(String transferId, AgentTransferRequest agentTransferRequest, TransferApiRequest transferApiRequest, String consulKey) {
+ this.transferId = transferId;
+ this.agentTransferRequest = agentTransferRequest;
+ this.transferApiRequest = transferApiRequest;
+ this.consulKey = consulKey;
+ }
+
+ public String getTransferId() {
+ return transferId;
+ }
+
+ public AgentTransferRequest getAgentTransferRequest() {
+ return agentTransferRequest;
+ }
+
+ public TransferApiRequest getTransferApiRequest() {
+ return transferApiRequest;
+ }
+
+ public String getConsulKey() {
+ return consulKey;
+ }
+ }
+
+ private class LaunchedSpawnerMetadata implements Comparable<LaunchedSpawnerMetadata> {
+
+ private final AgentSpawner spawner;
+
+ private final long createdTime = System.currentTimeMillis();
+ private long lastScannedTime = System.currentTimeMillis();
+
+ //AgentTransferRequestId:TransferInfo
+ private final Map<String, TransferInfo> transferInfos = new ConcurrentHashMap<>();
+
+ public LaunchedSpawnerMetadata(AgentSpawner spawner) {
+ this.spawner = spawner;
+ }
+
+ public AgentSpawner getSpawner() {
+ return spawner;
+ }
+
+ public Map<String, TransferInfo> getTransferInfos() {
+ return transferInfos;
+ }
+
+ @Override
+ public int compareTo(LaunchedSpawnerMetadata o) {
+ if (createdTime == o.createdTime)
+ return 0;
+ return o.createdTime < createdTime? 1 : -1;
+ }
+ }
+
+ private final Map<String, LaunchedSpawnerMetadata> launchedSpawnersMap = new ConcurrentHashMap<>();
+
+ private final TransferDispatcher transferDispatcher;
+
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+
+ public AgentOrchestrator(TransferDispatcher transferDispatcher) {
+ this.transferDispatcher = transferDispatcher;
+ }
+
+ public void init() {
+ scheduler.scheduleWithFixedDelay(() -> {
+
+ try {
+ launchedSpawnersMap.forEach((key, metadata) -> {
+ if (metadata.spawner.getLaunchState().isDone()) {
+ metadata.transferInfos.forEach((agentTransferId, transferInfo) -> {
+ try {
+ String agentId = metadata.spawner.getLaunchState().get();
+ List<String> liveAgentIds = transferDispatcher.getMftConsulClient().getLiveAgentIds();
+
+ if (liveAgentIds.stream().noneMatch(id -> id.equals(agentId))) {
+ throw new Exception("Agent was not registered even though the agent maked as up");
+ }
+
+ this.transferDispatcher.submitTransferToAgent(Collections.singletonList(agentId),
+ transferInfo.transferId,
+ transferInfo.transferApiRequest,
+ transferInfo.agentTransferRequest,
+ transferInfo.consulKey);
+
+ metadata.lastScannedTime = System.currentTimeMillis();
+ } catch (Exception e) {
+ logger.error("Failed to launch agent for agent transfer id {} and transfer {}",
+ agentTransferId, transferInfo.transferId, e);
+ try {
+ transferDispatcher.getMftConsulClient().saveTransferState(transferInfo.transferId, null, new TransferState()
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setState("FAILED").setPercentage(0)
+ .setPublisher("controller")
+ .setDescription("Failed to launch the agent. " + ExceptionUtils.getRootCauseMessage(e)));
+ } catch (Exception e2) {
+ logger.error("Failed to submit transfer fail error for transfer id {}", transferInfo.transferId, e2);
+ }
+
+ logger.info("Removing consul key {}", transferInfo.consulKey);
+ transferDispatcher.getMftConsulClient().getKvClient().deleteKey(transferInfo.consulKey);
+ logger.info("Terminating the spawner");
+ metadata.spawner.terminate();
+ } finally {
+ metadata.transferInfos.remove(agentTransferId);
+ }
+ });
+ }
+
+ if ((System.currentTimeMillis() - metadata.lastScannedTime) > SPAWNER_MAX_IDLE_SECONDS * 1000) {
+
+ long totalFiles = 0;
+ long completedOrFailedFiles = 0;
+
+ Map<String, TransferInfo> transferInfos = metadata.transferInfos;
+
+ for (String agentTransferId: transferInfos.keySet()) {
+ TransferInfo transferInfo = transferInfos.get(agentTransferId);
+
+ try {
+ totalFiles += transferInfo.agentTransferRequest.getEndpointPathsCount();
+
+ List<TransferState> transferStates = this.transferDispatcher.getMftConsulClient()
+ .getTransferStates(transferInfo.transferId, agentTransferId);
+
+ completedOrFailedFiles += transferStates.stream()
+ .filter(transferState -> transferState.getState().equals("COMPLETED") ||
+ transferState.getState().equals("FAILED")).count();
+
+
+
+ } catch (Exception e) {
+ logger.error("Failed to fetch transfer states for agent transfer id {}", agentTransferId, e);
+ }
+ }
+
+ logger.info("Spawner with key {} has total {} files to be transferred and {} were completed or failed",
+ key, totalFiles, completedOrFailedFiles);
+
+ if (totalFiles == completedOrFailedFiles) {
+ // TODO create a write lock with reusing agent logic
+
+ logger.info("Killing spawner with key {} as all files were transferred and inactive for {} seconds",
+ key, SPAWNER_MAX_IDLE_SECONDS);
+ metadata.spawner.terminate();
+ launchedSpawnersMap.remove(key);
+ }
+ }
+ });
+
+ } catch (Exception e) {
+ // Just to keep the thread running
+ logger.error("Some error occurred while processing spawners map", e);
+ }
+
+ }, 3, 5, TimeUnit.SECONDS);
+ }
+ public boolean tryLaunchingAgent(String transferId,
+ TransferApiRequest transferRequest,
+ AgentTransferRequest agentTransferRequest,
+ String consulKey) {
+
+
+ List<LaunchedSpawnerMetadata> selectedSpawnerMetadata = new ArrayList<>();
+
+ LaunchedSpawnerMetadata sourceSpawnerMetadata = launchedSpawnersMap.get(getId(transferRequest, true));
+ if (sourceSpawnerMetadata != null) {
+ selectedSpawnerMetadata.add(sourceSpawnerMetadata);
+ }
+
+ LaunchedSpawnerMetadata destSpawnerMetadata = launchedSpawnersMap.get(getId(transferRequest, false));
+ if (destSpawnerMetadata != null) {
+ selectedSpawnerMetadata.add(destSpawnerMetadata);
+ }
+
+ if (selectedSpawnerMetadata.isEmpty()) {
+ Optional<AgentSpawner> sourceSpawner = SpawnerSelector.selectSpawner(
+ agentTransferRequest.getSourceStorage(),
+ agentTransferRequest.getSourceSecret());
+
+ Optional<AgentSpawner> destSpawner = SpawnerSelector.selectSpawner(
+ agentTransferRequest.getDestinationStorage(),
+ agentTransferRequest.getDestinationSecret());
+
+ if (sourceSpawner.isPresent()) {
+ logger.info("Launching {} spawner in source side for transfer {}",
+ sourceSpawner.get().getClass().getName(), transferId);
+
+ sourceSpawner.get().launch();
+ LaunchedSpawnerMetadata lsm = new LaunchedSpawnerMetadata(sourceSpawner.get());
+ lsm.transferInfos.put(agentTransferRequest.getRequestId(),
+ new TransferInfo(
+ transferId,
+ agentTransferRequest,
+ transferRequest,
+ consulKey));
+
+ launchedSpawnersMap.put(getId(transferRequest, true), lsm);
+ return true;
+
+ } else if (destSpawner.isPresent()) {
+ logger.info("Launching {} spawner in destination side for transfer {}",
+ destSpawner.get().getClass().getName(), transferId);
+
+ destSpawner.get().launch();
+ LaunchedSpawnerMetadata lsm = new LaunchedSpawnerMetadata(destSpawner.get());
+ lsm.transferInfos.put(agentTransferRequest.getRequestId(),
+ new TransferInfo(
+ transferId,
+ agentTransferRequest,
+ transferRequest,
+ consulKey));
+
+ launchedSpawnersMap.put(getId(transferRequest, false), lsm);
+ return true;
+
+ } else {
+ return false;
+ }
+ } else {
+ logger.info("Reusing already running optimized agents for transfer {}", transferId);
+
+ // Todo select the spawner having least stransfers. Make this thread safe as some case, the spawner might be
+ // initiating the termination
+ selectedSpawnerMetadata.get(0).transferInfos.put(agentTransferRequest.getRequestId(),
+ new TransferInfo(
+ transferId,
+ agentTransferRequest,
+ transferRequest,
+ consulKey));
+ return true;
+ }
+ }
+
+ public boolean isAnAgentDeploying(String consulKey) {
+ return this.launchedSpawnersMap.values().stream()
+ .anyMatch(launchedSpawnerMetadata ->
+ launchedSpawnerMetadata.transferInfos.values().stream().anyMatch(
+ tinf-> tinf.consulKey.equals(consulKey)));
+ }
+
+ private String getId(TransferApiRequest transferRequest, boolean isSource) {
+ if (isSource) {
+ return transferRequest.getSourceStorageId() + transferRequest.getSourceSecretId();
+ } else {
+ return transferRequest.getDestinationStorageId() + transferRequest.getDestinationStorageId();
+ }
+ }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/spawner/CloudAgentSpawner.java b/controller/src/main/java/org/apache/airavata/mft/controller/spawner/AgentSpawner.java
similarity index 91%
rename from controller/src/main/java/org/apache/airavata/mft/controller/spawner/CloudAgentSpawner.java
rename to controller/src/main/java/org/apache/airavata/mft/controller/spawner/AgentSpawner.java
index 8b435ab..a8a733a 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/spawner/CloudAgentSpawner.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/spawner/AgentSpawner.java
@@ -22,11 +22,11 @@ import org.apache.airavata.mft.agent.stub.StorageWrapper;
import java.util.concurrent.Future;
-public abstract class CloudAgentSpawner {
+public abstract class AgentSpawner {
protected StorageWrapper storageWrapper;
protected SecretWrapper secretWrapper;
- public CloudAgentSpawner(StorageWrapper storageWrapper, SecretWrapper secretWrapper) {
+ public AgentSpawner(StorageWrapper storageWrapper, SecretWrapper secretWrapper) {
this.secretWrapper = secretWrapper;
this.storageWrapper = storageWrapper;
}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/spawner/EC2AgentSpawner.java b/controller/src/main/java/org/apache/airavata/mft/controller/spawner/EC2AgentSpawner.java
index 890e395..a097f92 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/spawner/EC2AgentSpawner.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/spawner/EC2AgentSpawner.java
@@ -40,7 +40,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Stream;
-public class EC2AgentSpawner extends CloudAgentSpawner {
+public class EC2AgentSpawner extends AgentSpawner {
private static final Logger logger = LoggerFactory.getLogger(EC2AgentSpawner.class);
@@ -48,18 +48,49 @@ public class EC2AgentSpawner extends CloudAgentSpawner {
private String instanceId;
private CountDownLatch portForwardLock;
- Future<String> launchFuture;
- Future<Boolean> terminateFuture;
+ private Future<String> launchFuture;
+ private Future<Boolean> terminateFuture;
+ private final Map<String, String> amiMap;
public EC2AgentSpawner(StorageWrapper storageWrapper, SecretWrapper secretWrapper) {
super(storageWrapper, secretWrapper);
+ amiMap = new HashMap<>();
+ amiMap.put("af-south-1","ami-0a9b2225722484002");
+ amiMap.put("ap-east-1","ami-056129cdd30574a9d");
+ amiMap.put("ap-northeast-1","ami-0e2bf1ada70fd3f33");
+ amiMap.put("ap-south-1","ami-0f69bc5520884278e");
+ amiMap.put("ap-southeast-1","ami-029562ad87fe1185c");
+ amiMap.put("ca-central-1","ami-09bbc74353976b63f");
+ amiMap.put("eu-central-1","ami-0039da1f3917fa8e3");
+ amiMap.put("eu-north-1","ami-03df6dea56f8aa618");
+ amiMap.put("eu-south-1","ami-0bac8f6f7c5943df6");
+ amiMap.put("eu-west-1","ami-026e72e4e468afa7b");
+ amiMap.put("me-central-1","ami-0747a6d5ab9621d5e");
+ amiMap.put("me-south-1","ami-000dfd0dafa71a443");
+ amiMap.put("sa-east-1","ami-07ac54771249f286c");
+ amiMap.put("us-east-1","ami-06878d265978313ca");
+ amiMap.put("us-west-1","ami-06bb3ee01d992f30d");
+ amiMap.put("us-gov-east-1","ami-0efd49eddc5639cc5");
+ amiMap.put("us-gov-west-1","ami-061efa908c09c5409");
+ amiMap.put("ap-northeast-2","ami-0f0646a5f59758444");
+ amiMap.put("ap-south-2","ami-021aeec757e935219");
+ amiMap.put("ap-southeast-2","ami-006fd15ab56f0fbe6");
+ amiMap.put("eu-central-2","ami-0d34b3fbb942249d5");
+ amiMap.put("eu-south-2","ami-0762ef22684c93e5c");
+ amiMap.put("eu-west-2","ami-01b8d743224353ffe");
+ amiMap.put("us-east-2","ami-0ff39345bd62c82a5");
+ amiMap.put("us-west-2","ami-03f8756d29f0b5f21");
+ amiMap.put("ap-northeast-3","ami-0d7b1258d728f42e3");
+ amiMap.put("ap-southeast-3","ami-0796a4cfd3b7bec87");
+ amiMap.put("eu-west-3","ami-03c476a1ca8e3ebdc");
}
@Override
public void launch() {
launchFuture = executor.submit( () -> {
- String imageId = "ami-0ecc74eca1d66d8a6"; // Ubuntu base image
+ String region = storageWrapper.getS3().getRegion();
+ String imageId = getAmi(region); // Ubuntu base image
String keyNamePrefix = "mft-aws-agent-key-";
String secGroupName = "MFTAgentSecurityGroup";
String agentId = UUID.randomUUID().toString();
@@ -68,7 +99,6 @@ public class EC2AgentSpawner extends CloudAgentSpawner {
String mftKeyDir = System.getProperty("user.home") + File.separator + ".mft" + File.separator + "keys";
String accessKey = secretWrapper.getS3().getAccessKey();
String secretKey = secretWrapper.getS3().getSecretKey();
- String region = storageWrapper.getS3().getRegion();
try {
BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKey, secretKey);
@@ -272,6 +302,9 @@ public class EC2AgentSpawner extends CloudAgentSpawner {
});
}
+ private String getAmi(String region) {
+ return amiMap.get(region);
+ }
@Override
public Future<String> getLaunchState() {
return launchFuture;
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/spawner/SpawnerSelector.java b/controller/src/main/java/org/apache/airavata/mft/controller/spawner/SpawnerSelector.java
index 1c86446..54d2fa4 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/spawner/SpawnerSelector.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/spawner/SpawnerSelector.java
@@ -24,7 +24,7 @@ import java.util.Optional;
public class SpawnerSelector {
- public static Optional<CloudAgentSpawner> selectSpawner(StorageWrapper storageWrapper, SecretWrapper secretWrapper) {
+ public static Optional<AgentSpawner> selectSpawner(StorageWrapper storageWrapper, SecretWrapper secretWrapper) {
switch (storageWrapper.getStorageCase()) {
case S3:
if (storageWrapper.getS3().getEndpoint().endsWith("amazonaws.com")) {