You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2023/01/04 01:34:54 UTC

[airavata-mft] branch master updated: Reusing AWS agents and cleaning up after a idle time threshold

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new d317836  Reusing AWS agents and cleaning up after a idle time threshold
d317836 is described below

commit d317836325f787516659b35b74f7c30524cb619b
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Tue Jan 3 20:34:36 2023 -0500

    Reusing AWS agents and cleaning up after a idle time threshold
---
 .../airavata/mft/api/handler/MFTApiHandler.java    |   2 +-
 .../apache/airavata/mft/admin/MFTConsulClient.java |  18 +-
 .../mft/controller/AgentTransferDispatcher.java    | 262 ------------------
 .../apache/airavata/mft/controller/AppConfig.java  |   4 +-
 .../airavata/mft/controller/MFTController.java     |   9 +-
 .../mft/controller/TransferDispatcher.java         | 160 +++++++++++
 .../mft/controller/spawner/AgentOrchestrator.java  | 294 +++++++++++++++++++++
 .../{CloudAgentSpawner.java => AgentSpawner.java}  |   4 +-
 .../mft/controller/spawner/EC2AgentSpawner.java    |  43 ++-
 .../mft/controller/spawner/SpawnerSelector.java    |   2 +-
 10 files changed, 513 insertions(+), 285 deletions(-)

diff --git a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
index cc00835..7230619 100644
--- a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
+++ b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
@@ -115,7 +115,7 @@ public class MFTApiHandler extends MFTTransferServiceGrpc.MFTTransferServiceImpl
             String transferId = mftConsulClient.submitTransfer(request);
             logger.info("Submitted the transfer request {}", transferId);
 
-            mftConsulClient.saveTransferState(transferId, new TransferState()
+            mftConsulClient.saveTransferState(transferId, null, new TransferState()
                     .setUpdateTimeMils(System.currentTimeMillis())
                     .setState("RECEIVED").setPercentage(0)
                     .setPublisher("api")
diff --git a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
index 89239c1..bf4b7eb 100644
--- a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
+++ b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
@@ -92,7 +92,7 @@ public class MFTConsulClient {
          5. Agent publishes the transfer state to controller
                 mft/controller/messages/states/{transferId}/{agentId}/{requestId}/{md5sum(sourcePath:destinationPath)}/{timeMils()} -> Transfer State
          6. Controller saves the transfer state in
-                mft/transfer/state/{transferId}/{UUID} -> Transfer State
+                mft/transfer/state/{transferId}/{requestId}/{UUID} -> Transfer State
 
      */
 
@@ -291,10 +291,14 @@ public class MFTConsulClient {
      * @param transferState
      * @throws MFTConsulClientException
      */
-    public void saveTransferState(String transferId, TransferState transferState) throws MFTConsulClientException {
+    public void saveTransferState(String transferId, String agentRequestId, TransferState transferState) throws MFTConsulClientException {
         try {
             String asStr = mapper.writeValueAsString(transferState);
-            kvClient.putValue(TRANSFER_STATE_PATH + transferId + "/" + UUID.randomUUID().toString(), asStr);
+            if (agentRequestId == null) {
+                kvClient.putValue(TRANSFER_STATE_PATH + transferId + "/" + UUID.randomUUID().toString(), asStr);
+            } else {
+                kvClient.putValue(TRANSFER_STATE_PATH + transferId + "/" + agentRequestId + "/" + UUID.randomUUID().toString(), asStr);
+            }
             logger.info("Saved transfer status " + asStr);
 
         } catch (Exception e) {
@@ -339,7 +343,11 @@ public class MFTConsulClient {
      * @throws IOException
      */
     public List<TransferState> getTransferStates(String transferId) throws IOException {
-        List<String> keys = kvClient.getKeys(TRANSFER_STATE_PATH + transferId);
+        return getTransferStates(transferId, null);
+    }
+
+    public List<TransferState> getTransferStates(String transferId, String agentRequestId) throws IOException {
+        List<String> keys = kvClient.getKeys(TRANSFER_STATE_PATH + transferId + (agentRequestId == null? "" : "/" + agentRequestId));
 
         List<TransferState> allStates = new ArrayList<>();
 
@@ -351,7 +359,7 @@ public class MFTConsulClient {
         }
         List<TransferState> sortedStates = allStates.stream().sorted((o1, o2) ->
                 (o1.getUpdateTimeMils() - o2.getUpdateTimeMils()) < 0 ? -1 :
-                (o1.getUpdateTimeMils() - o2.getUpdateTimeMils()) == 0 ? 0 : 1).collect(Collectors.toList());
+                        (o1.getUpdateTimeMils() - o2.getUpdateTimeMils()) == 0 ? 0 : 1).collect(Collectors.toList());
         return sortedStates;
     }
 
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/AgentTransferDispatcher.java b/controller/src/main/java/org/apache/airavata/mft/controller/AgentTransferDispatcher.java
deleted file mode 100644
index 21cf35d..0000000
--- a/controller/src/main/java/org/apache/airavata/mft/controller/AgentTransferDispatcher.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.controller;
-
-import org.apache.airavata.mft.admin.MFTConsulClient;
-import org.apache.airavata.mft.admin.models.TransferState;
-import org.apache.airavata.mft.agent.stub.AgentTransferRequest;
-import org.apache.airavata.mft.api.service.EndpointPaths;
-import org.apache.airavata.mft.api.service.TransferApiRequest;
-import org.apache.airavata.mft.controller.spawner.CloudAgentSpawner;
-import org.apache.airavata.mft.controller.spawner.SpawnerSelector;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.stream.Collectors;
-
-public class AgentTransferDispatcher {
-    private static final Logger logger = LoggerFactory.getLogger(AgentTransferDispatcher.class);
-
-    //getId(transferRequest):Pair<TransferApiRequest, AgentTransferRequest.Builder>
-
-    private final Map<String, Pair<TransferApiRequest, AgentTransferRequest.Builder>> pendingTransferRequests = new ConcurrentHashMap<>();
-    private final Map<String, String> pendingTransferIds = new ConcurrentHashMap<>();
-    //getId(transferRequest):consulKey
-
-    private final Map<String, String> pendingTransferConsulKeys = new ConcurrentHashMap<>();
-
-    //getId(transferRequest):CloudAgentSpawner
-    private final Map<String, CloudAgentSpawner> pendingAgentSpawners = new ConcurrentHashMap<>();
-
-    // getId(transferRequest):Set(TransferId)
-    private final Map<String, Set<String>> runningAgentCache = new ConcurrentHashMap<>();
-
-    // AgentID:Spawner - Use this to keep track of agent spawners. This is required to terminate agent
-    private final Map<String, CloudAgentSpawner> agentSpawners = new ConcurrentHashMap<>();
-
-    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-
-    // Temporarily store consul key until the optimizer spins up Agents. This will block the same pending transfer
-    // being handled twice
-    private final Set<String> optimizingConsulKeys = new ConcurrentSkipListSet<>();
-
-    @Autowired
-    private MFTConsulClient mftConsulClient;
-
-    public void init() {
-        scheduler.scheduleWithFixedDelay(() -> {
-            pendingAgentSpawners.forEach((key, spawner) -> {
-                if (spawner.getLaunchState().isDone()) {
-                    String transferId = pendingTransferIds.get(key);
-                    Pair<TransferApiRequest, AgentTransferRequest.Builder> transferRequests = pendingTransferRequests.get(key);
-                    String consulKey = pendingTransferConsulKeys.get(key);
-
-                    try {
-                        String agentId = spawner.getLaunchState().get();
-                        List<String> liveAgentIds = mftConsulClient.getLiveAgentIds();
-                        if (liveAgentIds.stream().noneMatch(id -> id.equals(agentId))) {
-                            throw new Exception("Agent was not registered even though the agent is up");
-                        }
-
-                        submitTransferToAgent(Collections.singletonList(agentId), transferId,
-                                transferRequests.getLeft(), transferRequests.getRight(), consulKey);
-
-                        // Use this to terminate agent in future
-                        agentSpawners.put(agentId, spawner);
-
-                    } catch (Exception e) {
-                        logger.error("Failed to launch agent for key {}", key, e);
-                        try {
-                            mftConsulClient.saveTransferState(transferId, new TransferState()
-                                    .setUpdateTimeMils(System.currentTimeMillis())
-                                    .setState("FAILED").setPercentage(0)
-                                    .setPublisher("controller")
-                                    .setDescription("Failed to launch the agent. " + ExceptionUtils.getRootCauseMessage(e)));
-                        } catch (Exception e2) {
-                            logger.error("Failed to submit transfer fail error for transfer id {}", transferId, e2);
-                        }
-
-                        logger.info("Removing consul key {}", consulKey);
-                        mftConsulClient.getKvClient().deleteKey(consulKey);
-                        logger.info("Terminating the spawner");
-                        spawner.terminate();
-
-                    } finally {
-                        pendingTransferIds.remove(key);
-                        pendingTransferRequests.remove(key);
-                        pendingAgentSpawners.remove(key);
-                        pendingTransferConsulKeys.remove(key);
-                        optimizingConsulKeys.remove(consulKey);
-                    }
-                }
-            });
-        }, 3, 5, TimeUnit.SECONDS);
-    }
-
-
-    public void submitTransferToAgent(List<String> filteredAgents, String transferId,
-                                      TransferApiRequest transferRequest,
-                                      AgentTransferRequest.Builder agentTransferRequestTemplate, String consulKey)
-            throws Exception {
-
-        try {
-            if (filteredAgents.isEmpty()) {
-                mftConsulClient.saveTransferState(transferId, new TransferState()
-                        .setUpdateTimeMils(System.currentTimeMillis())
-                        .setState("FAILED").setPercentage(0)
-                        .setPublisher("controller")
-                        .setDescription("No qualifying agent was found to orchestrate the transfer"));
-                return;
-            }
-
-            mftConsulClient.saveTransferState(transferId, new TransferState()
-                    .setState("STARTING")
-                    .setPercentage(0)
-                    .setUpdateTimeMils(System.currentTimeMillis())
-                    .setPublisher("controller")
-                    .setDescription("Initializing the transfer"));
-
-            AgentTransferRequest.Builder agentTransferRequest = agentTransferRequestTemplate.clone();
-
-            agentTransferRequest.setRequestId(UUID.randomUUID().toString());
-            for (EndpointPaths ep : transferRequest.getEndpointPathsList()) {
-                agentTransferRequest.addEndpointPaths(org.apache.airavata.mft.agent.stub.EndpointPaths.newBuilder()
-                        .setSourcePath(ep.getSourcePath())
-                        .setDestinationPath(ep.getDestinationPath()).buildPartial());
-            }
-
-            // TODO use a better way to select the right agent
-            mftConsulClient.commandTransferToAgent(filteredAgents.get(0), transferId, agentTransferRequest.build());
-            mftConsulClient.markTransferAsProcessed(transferId, transferRequest);
-            logger.info("Marked transfer {} as processed", transferId);
-        } finally {
-            mftConsulClient.getKvClient().deleteKey(consulKey);
-        }
-    }
-
-    public void handleTransferRequest(String transferId,
-                                      TransferApiRequest transferRequest,
-                                      AgentTransferRequest.Builder agentTransferRequestTemplate,
-                                      String consulKey) throws Exception{
-
-        if (optimizingConsulKeys.contains(consulKey)) {
-            logger.info("Ignoring handling transfer id {} as it is already in optimizing stage", transferId);
-            return;
-        }
-
-        logger.info("Handling transfer id {} with consul key {}", transferId, consulKey);
-        List<String> liveAgentIds = mftConsulClient.getLiveAgentIds();
-
-        Map<String, Integer> targetAgentsMap = transferRequest.getTargetAgentsMap();
-        List<String> userProvidedAgents = liveAgentIds.stream().filter(targetAgentsMap::containsKey).collect(Collectors.toList());
-        List<String> optimizedAgents = new ArrayList<>();
-
-        if (transferRequest.getOptimizeTransferPath()) {
-
-            Set<String> sourceAgents = runningAgentCache.get(getId(transferRequest, true));
-            if (sourceAgents != null) {
-                optimizedAgents.addAll(liveAgentIds.stream().filter(sourceAgents::contains).collect(Collectors.toList()));
-            }
-
-            Set<String> destAgents = runningAgentCache.get(getId(transferRequest, false));
-            if (destAgents != null) {
-                optimizedAgents.addAll(liveAgentIds.stream().filter(destAgents::contains).collect(Collectors.toList()));
-            }
-
-            if (optimizedAgents.isEmpty()) {
-                Optional<CloudAgentSpawner> sourceSpawner = SpawnerSelector.selectSpawner(
-                        agentTransferRequestTemplate.getSourceStorage(),
-                        agentTransferRequestTemplate.getSourceSecret());
-
-                Optional<CloudAgentSpawner> destSpawner = SpawnerSelector.selectSpawner(
-                        agentTransferRequestTemplate.getDestinationStorage(),
-                        agentTransferRequestTemplate.getDestinationSecret());
-
-                if (sourceSpawner.isPresent()) {
-                    logger.info("Launching {} spawner in source side for transfer {}",
-                            sourceSpawner.get().getClass().getName(), transferId);
-
-                    sourceSpawner.get().launch();
-                    pendingAgentSpawners.put(getId(transferRequest, true), sourceSpawner.get());
-                    pendingTransferRequests.put(getId(transferRequest, true),
-                            Pair.of(transferRequest, agentTransferRequestTemplate));
-                    pendingTransferIds.put(getId(transferRequest, true), transferId);
-                    pendingTransferConsulKeys.put(getId(transferRequest, true), consulKey);
-                    optimizingConsulKeys.add(consulKey);
-                    return;
-                } else if (destSpawner.isPresent()) {
-                    logger.info("Launching {} spawner in destination side for transfer {}",
-                            destSpawner.get().getClass().getName(), transferId);
-
-                    destSpawner.get().launch();
-                    pendingAgentSpawners.put(getId(transferRequest, false), destSpawner.get());
-                    pendingTransferRequests.put(getId(transferRequest, false),
-                            Pair.of(transferRequest, agentTransferRequestTemplate));
-                    pendingTransferIds.put(getId(transferRequest, false), transferId);
-                    pendingTransferConsulKeys.put(getId(transferRequest, false), consulKey);
-                    optimizingConsulKeys.add(consulKey);
-                    return;
-                } else {
-                    logger.warn("No optimizing path is available. Moving user provided agents");
-                    submitTransferToAgent(userProvidedAgents, transferId,
-                            transferRequest,
-                            agentTransferRequestTemplate,
-                            consulKey);
-                }
-            } else {
-                logger.info("Using optimized agents for transfer {}", transferId);
-                submitTransferToAgent(optimizedAgents, transferId,
-                        transferRequest,
-                        agentTransferRequestTemplate,
-                        consulKey);
-            }
-        }
-
-        if (userProvidedAgents.isEmpty()) {
-            if (liveAgentIds.isEmpty()) {
-                logger.warn("No live agent available to perform the transfer.");
-                return;
-            }
-            logger.info("No agent selection criteria was provided. Going with the local agent");
-            // TODO select the local agent
-            submitTransferToAgent(liveAgentIds, transferId,
-                    transferRequest,
-                    agentTransferRequestTemplate,
-                    consulKey);
-
-        } else {
-            submitTransferToAgent(userProvidedAgents, transferId,
-                    transferRequest,
-                    agentTransferRequestTemplate,
-                    consulKey);
-        }
-    }
-
-    private String getId(TransferApiRequest transferRequest, boolean isSource) {
-        if (isSource) {
-            return transferRequest.getSourceStorageId() + transferRequest.getSourceSecretId();
-        } else {
-            return transferRequest.getDestinationStorageId() + transferRequest.getDestinationStorageId();
-        }
-    }
-}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/AppConfig.java b/controller/src/main/java/org/apache/airavata/mft/controller/AppConfig.java
index ca31060..73fbe0f 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/AppConfig.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/AppConfig.java
@@ -66,8 +66,8 @@ public class AppConfig {
     }
 
     @Bean
-    public AgentTransferDispatcher pathOptimizer() {
-        AgentTransferDispatcher agentTransferDispatcher = new AgentTransferDispatcher();
+    public TransferDispatcher pathOptimizer() {
+        TransferDispatcher agentTransferDispatcher = new TransferDispatcher();
         agentTransferDispatcher.init();
         return agentTransferDispatcher;
     }
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
index a4986ab..2870571 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
@@ -24,8 +24,6 @@ import com.orbitz.consul.cache.ConsulCache;
 import com.orbitz.consul.cache.KVCache;
 import com.orbitz.consul.model.kv.Value;
 import org.apache.airavata.mft.admin.MFTConsulClient;
-import org.apache.airavata.mft.admin.MFTConsulClientException;
-import org.apache.airavata.mft.admin.models.AgentInfo;
 import org.apache.airavata.mft.admin.models.TransferState;
 import org.apache.airavata.mft.agent.stub.AgentTransferRequest;
 import org.apache.airavata.mft.api.service.TransferApiRequest;
@@ -40,15 +38,12 @@ import org.springframework.context.annotation.ComponentScan;
 
 import javax.annotation.PreDestroy;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
-import java.util.Random;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 @SpringBootApplication()
 @ComponentScan(basePackages = {"org.apache.airavata.mft"})
@@ -72,7 +67,7 @@ public class MFTController implements CommandLineRunner {
     private MFTConsulClient mftConsulClient;
 
     @Autowired
-    private AgentTransferDispatcher pathOptimizer;
+    private TransferDispatcher pathOptimizer;
 
     public void init() {
         logger.info("Initializing the Controller");
@@ -155,7 +150,7 @@ public class MFTController implements CommandLineRunner {
                     String time = parts[4];
 
                     TransferState transferState = mapper.readValue(valAsStr, TransferState.class);
-                    mftConsulClient.saveTransferState(transferId, transferState.setChildId(pathHash));
+                    mftConsulClient.saveTransferState(transferId, agentRequestId, transferState.setChildId(pathHash));
 
                 }
             } catch (Exception e) {
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java b/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java
new file mode 100644
index 0000000..bf3b153
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller;
+
+import org.apache.airavata.mft.admin.MFTConsulClient;
+import org.apache.airavata.mft.admin.models.TransferState;
+import org.apache.airavata.mft.agent.stub.AgentTransferRequest;
+import org.apache.airavata.mft.api.service.EndpointPaths;
+import org.apache.airavata.mft.api.service.TransferApiRequest;
+import org.apache.airavata.mft.controller.spawner.AgentOrchestrator;
+import org.apache.airavata.mft.controller.spawner.AgentSpawner;
+import org.apache.airavata.mft.controller.spawner.SpawnerSelector;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+
+public class TransferDispatcher {
+    private static final Logger logger = LoggerFactory.getLogger(TransferDispatcher.class);
+
+    private AgentOrchestrator agentOrchestrator;
+
+    @Autowired
+    private MFTConsulClient mftConsulClient;
+
+    public void init() {
+        agentOrchestrator = new AgentOrchestrator(this);
+        agentOrchestrator.init();
+    }
+
+    public void submitTransferToAgent(List<String> filteredAgents, String transferId,
+                                      TransferApiRequest transferRequest,
+                                      AgentTransferRequest agentTransferRequest, String consulKey) {
+
+        try {
+            if (filteredAgents.isEmpty()) {
+                mftConsulClient.saveTransferState(transferId, null, new TransferState()
+                        .setUpdateTimeMils(System.currentTimeMillis())
+                        .setState("FAILED").setPercentage(0)
+                        .setPublisher("controller")
+                        .setDescription("No qualifying agent was found to orchestrate the transfer"));
+                return;
+            }
+
+            mftConsulClient.saveTransferState(transferId,null, new TransferState()
+                    .setState("STARTING")
+                    .setPercentage(0)
+                    .setUpdateTimeMils(System.currentTimeMillis())
+                    .setPublisher("controller")
+                    .setDescription("Initializing the transfer"));
+
+            // TODO use a better way to select the right agent
+            mftConsulClient.commandTransferToAgent(filteredAgents.get(0), transferId, agentTransferRequest);
+            mftConsulClient.markTransferAsProcessed(transferId, transferRequest);
+            logger.info("Marked transfer {} as processed", transferId);
+
+        } catch (Exception e) {
+
+            logger.error("Failed to submit the transfer {} to agent", transferId, e);
+
+            try {
+                mftConsulClient.saveTransferState(transferId, null, new TransferState()
+                        .setUpdateTimeMils(System.currentTimeMillis())
+                        .setState("FAILED").setPercentage(0)
+                        .setPublisher("controller")
+                        .setDescription("Failed to submit the transfer to agent. Error: " + ExceptionUtils.getRootCauseMessage(e)));
+            } catch (Exception e2) {
+                // Ignore
+                logger.warn("Failed to update the failed transfer state for transfer id {}", transferId, e);
+            }
+        } finally {
+            mftConsulClient.getKvClient().deleteKey(consulKey);
+        }
+    }
+
+    public void handleTransferRequest(String transferId,
+                                      TransferApiRequest transferRequest,
+                                      AgentTransferRequest.Builder agentTransferRequestTemplate,
+                                      String consulKey) throws Exception{
+
+        if (this.agentOrchestrator.isAnAgentDeploying(consulKey)) {
+            logger.info("Ignoring handling transfer id {} as it is already in optimizing stage", transferId);
+            return;
+        }
+
+        logger.info("Handling transfer id {} with consul key {}", transferId, consulKey);
+        List<String> liveAgentIds = mftConsulClient.getLiveAgentIds();
+
+        Map<String, Integer> targetAgentsMap = transferRequest.getTargetAgentsMap();
+        List<String> userProvidedAgents = liveAgentIds.stream().filter(targetAgentsMap::containsKey).collect(Collectors.toList());
+
+        AgentTransferRequest.Builder agentTransferRequestBuilder = agentTransferRequestTemplate.clone();
+
+        agentTransferRequestBuilder.setRequestId(UUID.randomUUID().toString());
+        for (EndpointPaths ep : transferRequest.getEndpointPathsList()) {
+            agentTransferRequestBuilder.addEndpointPaths(org.apache.airavata.mft.agent.stub.EndpointPaths.newBuilder()
+                    .setSourcePath(ep.getSourcePath())
+                    .setDestinationPath(ep.getDestinationPath()).buildPartial());
+        }
+
+        AgentTransferRequest agentTransferRequest = agentTransferRequestBuilder.build();
+
+        if (transferRequest.getOptimizeTransferPath()) {
+            boolean agentLaunching = agentOrchestrator.tryLaunchingAgent(
+                    transferId, transferRequest,
+                    agentTransferRequest,
+                    consulKey);
+
+            if (!agentLaunching) {
+                logger.warn("No optimizing path is available. Moving user provided agents");
+                submitTransferToAgent(userProvidedAgents, transferId,
+                        transferRequest,
+                        agentTransferRequest,
+                        consulKey);
+            }
+
+        } else if (userProvidedAgents.isEmpty()) {
+            if (liveAgentIds.isEmpty()) {
+                logger.warn("No live agent available to perform the transfer.");
+                return;
+            }
+            logger.info("No agent selection criteria was provided. Going with the local agent");
+            // TODO select the local agent
+            submitTransferToAgent(liveAgentIds, transferId,
+                    transferRequest,
+                    agentTransferRequest,
+                    consulKey);
+
+        } else {
+            submitTransferToAgent(userProvidedAgents, transferId,
+                    transferRequest,
+                    agentTransferRequest,
+                    consulKey);
+        }
+    }
+
+    public MFTConsulClient getMftConsulClient() {
+        return mftConsulClient;
+    }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/spawner/AgentOrchestrator.java b/controller/src/main/java/org/apache/airavata/mft/controller/spawner/AgentOrchestrator.java
new file mode 100644
index 0000000..0ccec2e
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/spawner/AgentOrchestrator.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.spawner;
+
+import org.apache.airavata.mft.admin.models.TransferState;
+import org.apache.airavata.mft.agent.stub.AgentTransferRequest;
+import org.apache.airavata.mft.api.service.TransferApiRequest;
+import org.apache.airavata.mft.controller.TransferDispatcher;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+public class AgentOrchestrator {
+
+    private static final Logger logger = LoggerFactory.getLogger(AgentOrchestrator.class);
+
+    private final int SPAWNER_MAX_IDLE_SECONDS = 30;
+
+    private class TransferInfo {
+        private final String transferId;
+        private final AgentTransferRequest agentTransferRequest;
+        private final TransferApiRequest transferApiRequest;
+
+        // Temporarily store consul key until the optimizer spins up Agents. This will block the same pending transfer
+        // being handled twice
+        private final String consulKey;
+
+        public TransferInfo(String transferId, AgentTransferRequest agentTransferRequest, TransferApiRequest transferApiRequest, String consulKey) {
+            this.transferId = transferId;
+            this.agentTransferRequest = agentTransferRequest;
+            this.transferApiRequest = transferApiRequest;
+            this.consulKey = consulKey;
+        }
+
+        public String getTransferId() {
+            return transferId;
+        }
+
+        public AgentTransferRequest getAgentTransferRequest() {
+            return agentTransferRequest;
+        }
+
+        public TransferApiRequest getTransferApiRequest() {
+            return transferApiRequest;
+        }
+
+        public String getConsulKey() {
+            return consulKey;
+        }
+    }
+
+    private class LaunchedSpawnerMetadata implements Comparable<LaunchedSpawnerMetadata> {
+
+        private final AgentSpawner spawner;
+
+        private final long createdTime = System.currentTimeMillis();
+        private long lastScannedTime = System.currentTimeMillis();
+
+        //AgentTransferRequestId:TransferInfo
+        private final Map<String, TransferInfo> transferInfos = new ConcurrentHashMap<>();
+
+        public LaunchedSpawnerMetadata(AgentSpawner spawner) {
+            this.spawner = spawner;
+        }
+
+        public AgentSpawner getSpawner() {
+            return spawner;
+        }
+
+        public Map<String, TransferInfo> getTransferInfos() {
+            return transferInfos;
+        }
+
+        @Override
+        public int compareTo(LaunchedSpawnerMetadata o) {
+            if (createdTime == o.createdTime)
+                return 0;
+            return o.createdTime < createdTime? 1 : -1;
+        }
+    }
+
+    private final Map<String, LaunchedSpawnerMetadata> launchedSpawnersMap = new ConcurrentHashMap<>();
+
+    private final TransferDispatcher transferDispatcher;
+
+    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+
+    public AgentOrchestrator(TransferDispatcher transferDispatcher) {
+        this.transferDispatcher = transferDispatcher;
+    }
+
+    public void init() {
+        scheduler.scheduleWithFixedDelay(() -> {
+
+            try {
+                launchedSpawnersMap.forEach((key, metadata) -> {
+                    if (metadata.spawner.getLaunchState().isDone()) {
+                        metadata.transferInfos.forEach((agentTransferId, transferInfo) -> {
+                            try {
+                                String agentId = metadata.spawner.getLaunchState().get();
+                                List<String> liveAgentIds = transferDispatcher.getMftConsulClient().getLiveAgentIds();
+
+                                if (liveAgentIds.stream().noneMatch(id -> id.equals(agentId))) {
+                                    throw new Exception("Agent was not registered even though the agent maked as up");
+                                }
+
+                                this.transferDispatcher.submitTransferToAgent(Collections.singletonList(agentId),
+                                        transferInfo.transferId,
+                                        transferInfo.transferApiRequest,
+                                        transferInfo.agentTransferRequest,
+                                        transferInfo.consulKey);
+
+                                metadata.lastScannedTime = System.currentTimeMillis();
+                            } catch (Exception e) {
+                                logger.error("Failed to launch agent for agent transfer id {} and transfer {}",
+                                        agentTransferId, transferInfo.transferId, e);
+                                try {
+                                    transferDispatcher.getMftConsulClient().saveTransferState(transferInfo.transferId, null, new TransferState()
+                                            .setUpdateTimeMils(System.currentTimeMillis())
+                                            .setState("FAILED").setPercentage(0)
+                                            .setPublisher("controller")
+                                            .setDescription("Failed to launch the agent. " + ExceptionUtils.getRootCauseMessage(e)));
+                                } catch (Exception e2) {
+                                    logger.error("Failed to submit transfer fail error for transfer id {}", transferInfo.transferId, e2);
+                                }
+
+                                logger.info("Removing consul key {}", transferInfo.consulKey);
+                                transferDispatcher.getMftConsulClient().getKvClient().deleteKey(transferInfo.consulKey);
+                                logger.info("Terminating the spawner");
+                                metadata.spawner.terminate();
+                            } finally {
+                                metadata.transferInfos.remove(agentTransferId);
+                            }
+                        });
+                    }
+
+                    if ((System.currentTimeMillis() - metadata.lastScannedTime) >  SPAWNER_MAX_IDLE_SECONDS * 1000) {
+
+                        long totalFiles = 0;
+                        long completedOrFailedFiles = 0;
+
+                        Map<String, TransferInfo> transferInfos = metadata.transferInfos;
+
+                        for (String agentTransferId: transferInfos.keySet()) {
+                            TransferInfo transferInfo = transferInfos.get(agentTransferId);
+
+                            try {
+                                totalFiles += transferInfo.agentTransferRequest.getEndpointPathsCount();
+
+                                List<TransferState> transferStates = this.transferDispatcher.getMftConsulClient()
+                                        .getTransferStates(transferInfo.transferId, agentTransferId);
+
+                                completedOrFailedFiles += transferStates.stream()
+                                        .filter(transferState -> transferState.getState().equals("COMPLETED") ||
+                                                transferState.getState().equals("FAILED")).count();
+
+
+
+                            } catch (Exception e) {
+                                logger.error("Failed to fetch transfer states for agent transfer id {}", agentTransferId, e);
+                            }
+                        }
+
+                        logger.info("Spawner with key {} has total {} files to be transferred and {} were completed or failed",
+                                key, totalFiles, completedOrFailedFiles);
+
+                        if (totalFiles == completedOrFailedFiles) {
+                            // TODO create a write lock with reusing agent logic
+
+                            logger.info("Killing spawner with key {} as all files were transferred and inactive for {} seconds",
+                                    key, SPAWNER_MAX_IDLE_SECONDS);
+                            metadata.spawner.terminate();
+                            launchedSpawnersMap.remove(key);
+                        }
+                    }
+                });
+
+            } catch (Exception e) {
+                // Just to keep the thread running
+                logger.error("Some error occurred while processing spawners map", e);
+            }
+
+        }, 3, 5, TimeUnit.SECONDS);
+    }
+    public boolean tryLaunchingAgent(String transferId,
+                                     TransferApiRequest transferRequest,
+                                     AgentTransferRequest agentTransferRequest,
+                                     String consulKey) {
+
+
+        List<LaunchedSpawnerMetadata> selectedSpawnerMetadata = new ArrayList<>();
+
+        LaunchedSpawnerMetadata sourceSpawnerMetadata = launchedSpawnersMap.get(getId(transferRequest, true));
+        if (sourceSpawnerMetadata != null) {
+            selectedSpawnerMetadata.add(sourceSpawnerMetadata);
+        }
+
+        LaunchedSpawnerMetadata destSpawnerMetadata = launchedSpawnersMap.get(getId(transferRequest, false));
+        if (destSpawnerMetadata != null) {
+            selectedSpawnerMetadata.add(destSpawnerMetadata);
+        }
+
+        if (selectedSpawnerMetadata.isEmpty()) {
+            Optional<AgentSpawner> sourceSpawner = SpawnerSelector.selectSpawner(
+                    agentTransferRequest.getSourceStorage(),
+                    agentTransferRequest.getSourceSecret());
+
+            Optional<AgentSpawner> destSpawner = SpawnerSelector.selectSpawner(
+                    agentTransferRequest.getDestinationStorage(),
+                    agentTransferRequest.getDestinationSecret());
+
+            if (sourceSpawner.isPresent()) {
+                logger.info("Launching {} spawner in source side for transfer {}",
+                        sourceSpawner.get().getClass().getName(), transferId);
+
+                sourceSpawner.get().launch();
+                LaunchedSpawnerMetadata lsm = new LaunchedSpawnerMetadata(sourceSpawner.get());
+                lsm.transferInfos.put(agentTransferRequest.getRequestId(),
+                        new TransferInfo(
+                                transferId,
+                                agentTransferRequest,
+                                transferRequest,
+                                consulKey));
+
+                launchedSpawnersMap.put(getId(transferRequest, true), lsm);
+                return true;
+
+            } else if (destSpawner.isPresent()) {
+                logger.info("Launching {} spawner in destination side for transfer {}",
+                        destSpawner.get().getClass().getName(), transferId);
+
+                destSpawner.get().launch();
+                LaunchedSpawnerMetadata lsm = new LaunchedSpawnerMetadata(destSpawner.get());
+                lsm.transferInfos.put(agentTransferRequest.getRequestId(),
+                        new TransferInfo(
+                                transferId,
+                                agentTransferRequest,
+                                transferRequest,
+                                consulKey));
+
+                launchedSpawnersMap.put(getId(transferRequest, false), lsm);
+                return true;
+
+            } else {
+                return false;
+            }
+        } else {
+            logger.info("Reusing already running optimized agents for transfer {}", transferId);
+
+            // Todo select the spawner having least stransfers. Make this thread safe as some case, the spawner might be
+            // initiating the termination
+            selectedSpawnerMetadata.get(0).transferInfos.put(agentTransferRequest.getRequestId(),
+                    new TransferInfo(
+                            transferId,
+                            agentTransferRequest,
+                            transferRequest,
+                            consulKey));
+            return true;
+        }
+    }
+
+    public boolean isAnAgentDeploying(String consulKey) {
+        return this.launchedSpawnersMap.values().stream()
+                .anyMatch(launchedSpawnerMetadata ->
+                        launchedSpawnerMetadata.transferInfos.values().stream().anyMatch(
+                                tinf-> tinf.consulKey.equals(consulKey)));
+    }
+
+    private String getId(TransferApiRequest transferRequest, boolean isSource) {
+        if (isSource) {
+            return transferRequest.getSourceStorageId() + transferRequest.getSourceSecretId();
+        } else {
+            return transferRequest.getDestinationStorageId() + transferRequest.getDestinationStorageId();
+        }
+    }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/spawner/CloudAgentSpawner.java b/controller/src/main/java/org/apache/airavata/mft/controller/spawner/AgentSpawner.java
similarity index 91%
rename from controller/src/main/java/org/apache/airavata/mft/controller/spawner/CloudAgentSpawner.java
rename to controller/src/main/java/org/apache/airavata/mft/controller/spawner/AgentSpawner.java
index 8b435ab..a8a733a 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/spawner/CloudAgentSpawner.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/spawner/AgentSpawner.java
@@ -22,11 +22,11 @@ import org.apache.airavata.mft.agent.stub.StorageWrapper;
 
 import java.util.concurrent.Future;
 
-public abstract class CloudAgentSpawner {
+public abstract class AgentSpawner {
 
     protected StorageWrapper storageWrapper;
     protected SecretWrapper secretWrapper;
-    public CloudAgentSpawner(StorageWrapper storageWrapper, SecretWrapper secretWrapper) {
+    public AgentSpawner(StorageWrapper storageWrapper, SecretWrapper secretWrapper) {
         this.secretWrapper = secretWrapper;
         this.storageWrapper = storageWrapper;
     }
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/spawner/EC2AgentSpawner.java b/controller/src/main/java/org/apache/airavata/mft/controller/spawner/EC2AgentSpawner.java
index 890e395..a097f92 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/spawner/EC2AgentSpawner.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/spawner/EC2AgentSpawner.java
@@ -40,7 +40,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.stream.Stream;
 
-public class EC2AgentSpawner extends CloudAgentSpawner {
+public class EC2AgentSpawner extends AgentSpawner {
 
     private static final Logger logger = LoggerFactory.getLogger(EC2AgentSpawner.class);
 
@@ -48,18 +48,49 @@ public class EC2AgentSpawner extends CloudAgentSpawner {
 
     private String instanceId;
     private CountDownLatch portForwardLock;
-    Future<String> launchFuture;
-    Future<Boolean> terminateFuture;
+    private Future<String> launchFuture;
+    private Future<Boolean> terminateFuture;
+    private final Map<String, String> amiMap;
 
     public EC2AgentSpawner(StorageWrapper storageWrapper, SecretWrapper secretWrapper) {
         super(storageWrapper, secretWrapper);
+        amiMap = new HashMap<>();
+        amiMap.put("af-south-1","ami-0a9b2225722484002");
+        amiMap.put("ap-east-1","ami-056129cdd30574a9d");
+        amiMap.put("ap-northeast-1","ami-0e2bf1ada70fd3f33");
+        amiMap.put("ap-south-1","ami-0f69bc5520884278e");
+        amiMap.put("ap-southeast-1","ami-029562ad87fe1185c");
+        amiMap.put("ca-central-1","ami-09bbc74353976b63f");
+        amiMap.put("eu-central-1","ami-0039da1f3917fa8e3");
+        amiMap.put("eu-north-1","ami-03df6dea56f8aa618");
+        amiMap.put("eu-south-1","ami-0bac8f6f7c5943df6");
+        amiMap.put("eu-west-1","ami-026e72e4e468afa7b");
+        amiMap.put("me-central-1","ami-0747a6d5ab9621d5e");
+        amiMap.put("me-south-1","ami-000dfd0dafa71a443");
+        amiMap.put("sa-east-1","ami-07ac54771249f286c");
+        amiMap.put("us-east-1","ami-06878d265978313ca");
+        amiMap.put("us-west-1","ami-06bb3ee01d992f30d");
+        amiMap.put("us-gov-east-1","ami-0efd49eddc5639cc5");
+        amiMap.put("us-gov-west-1","ami-061efa908c09c5409");
+        amiMap.put("ap-northeast-2","ami-0f0646a5f59758444");
+        amiMap.put("ap-south-2","ami-021aeec757e935219");
+        amiMap.put("ap-southeast-2","ami-006fd15ab56f0fbe6");
+        amiMap.put("eu-central-2","ami-0d34b3fbb942249d5");
+        amiMap.put("eu-south-2","ami-0762ef22684c93e5c");
+        amiMap.put("eu-west-2","ami-01b8d743224353ffe");
+        amiMap.put("us-east-2","ami-0ff39345bd62c82a5");
+        amiMap.put("us-west-2","ami-03f8756d29f0b5f21");
+        amiMap.put("ap-northeast-3","ami-0d7b1258d728f42e3");
+        amiMap.put("ap-southeast-3","ami-0796a4cfd3b7bec87");
+        amiMap.put("eu-west-3","ami-03c476a1ca8e3ebdc");
     }
 
     @Override
     public void launch() {
 
         launchFuture = executor.submit( () -> {
-            String imageId = "ami-0ecc74eca1d66d8a6"; // Ubuntu base image
+            String region = storageWrapper.getS3().getRegion();
+            String imageId = getAmi(region); // Ubuntu base image
             String keyNamePrefix = "mft-aws-agent-key-";
             String secGroupName = "MFTAgentSecurityGroup";
             String agentId = UUID.randomUUID().toString();
@@ -68,7 +99,6 @@ public class EC2AgentSpawner extends CloudAgentSpawner {
             String mftKeyDir = System.getProperty("user.home") + File.separator + ".mft" + File.separator + "keys";
             String accessKey = secretWrapper.getS3().getAccessKey();
             String secretKey = secretWrapper.getS3().getSecretKey();
-            String region = storageWrapper.getS3().getRegion();
 
             try {
                 BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKey, secretKey);
@@ -272,6 +302,9 @@ public class EC2AgentSpawner extends CloudAgentSpawner {
         });
     }
 
+    private String getAmi(String region) {
+        return amiMap.get(region);
+    }
     @Override
     public Future<String> getLaunchState() {
         return launchFuture;
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/spawner/SpawnerSelector.java b/controller/src/main/java/org/apache/airavata/mft/controller/spawner/SpawnerSelector.java
index 1c86446..54d2fa4 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/spawner/SpawnerSelector.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/spawner/SpawnerSelector.java
@@ -24,7 +24,7 @@ import java.util.Optional;
 
 public class SpawnerSelector {
 
-    public static Optional<CloudAgentSpawner> selectSpawner(StorageWrapper storageWrapper, SecretWrapper secretWrapper) {
+    public static Optional<AgentSpawner> selectSpawner(StorageWrapper storageWrapper, SecretWrapper secretWrapper) {
         switch (storageWrapper.getStorageCase()) {
             case S3:
                 if (storageWrapper.getS3().getEndpoint().endsWith("amazonaws.com")) {