You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/01/04 17:05:40 UTC

[airavata-mft] branch master updated: Adding MFT Controller

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a1a326  Adding MFT Controller
3a1a326 is described below

commit 3a1a3262b69c749a84d69a0bd094953d2dbb087f
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Sat Jan 4 12:05:28 2020 -0500

    Adding MFT Controller
---
 .../org/apache/airavata/mft/admin/MFTAdmin.java    |  20 +-
 .../{TransferRequest.java => TransferCommand.java} |  93 +++++++---
 .../airavata/mft/admin/models/TransferRequest.java | 105 ++++++++---
 agent/pom.xml                                      |   2 +-
 .../org/apache/airavata/mft/agent/MFTAgent.java    |   9 +-
 {agent => controller}/pom.xml                      |  71 +++----
 .../mft/controller/ControllerException.java        |  41 +++++
 .../airavata/mft/controller/MFTController.java     | 158 ++++++++++++++++
 .../controller/db/entities/TargetAgentEntity.java  |  73 ++++++++
 .../mft/controller/db/entities/TargetAgentPK.java  |  48 +++++
 .../mft/controller/db/entities/TransferEntity.java | 205 +++++++++++++++++++++
 .../db/entities/TransferExecutionEntity.java       |  70 +++++++
 .../db/entities/TransferStatusEntity.java          |  70 +++++++
 .../db/repositories/TargetAgentRepository.java     |  24 +++
 .../db/repositories/TransferRepository.java        |  24 +++
 .../src/main/resources/application.properties      |   0
 pom.xml                                            |   2 +
 17 files changed, 915 insertions(+), 100 deletions(-)

diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java b/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java
index bf0ae79..6868e9e 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java
@@ -24,6 +24,7 @@ import com.orbitz.consul.ConsulException;
 import com.orbitz.consul.KeyValueClient;
 import com.orbitz.consul.model.kv.Value;
 import org.apache.airavata.mft.admin.models.AgentInfo;
+import org.apache.airavata.mft.admin.models.TransferCommand;
 import org.apache.airavata.mft.admin.models.TransferRequest;
 import org.apache.airavata.mft.admin.models.TransferState;
 
@@ -44,20 +45,27 @@ public class MFTAdmin {
     private KeyValueClient kvClient = client.keyValueClient();
     private ObjectMapper mapper = new ObjectMapper();
 
-    public String submitTransfer(String agentId, TransferRequest transferRequest) throws MFTAdminException {
+    public String submitTransfer(TransferRequest transferRequest) throws MFTAdminException{
         try {
-
+            String asStr = mapper.writeValueAsString(transferRequest);
             String transferId = UUID.randomUUID().toString();
-            updateTransferState(transferId, new TransferState().setState("INITIALIZING").setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
-            transferRequest.setTransferId(transferId);
-            String asString = mapper.writeValueAsString(transferRequest);
-            kvClient.putValue("mft/agents/messages/"  + agentId + "/" + transferId, asString);
+            kvClient.putValue("mft/controller/messages/" + transferId, asStr);
             return transferId;
         } catch (JsonProcessingException e) {
             throw new MFTAdminException("Error in serializing transfer request", e);
         }
     }
 
+    public void commandTransferToAgent(String agentId, TransferCommand transferCommand) throws MFTAdminException {
+        try {
+            updateTransferState(transferCommand.getTransferId(), new TransferState().setState("INITIALIZING").setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
+            String asString = mapper.writeValueAsString(transferCommand);
+            kvClient.putValue("mft/agents/messages/"  + agentId + "/" + transferCommand.getTransferId(), asString);
+        } catch (JsonProcessingException e) {
+            throw new MFTAdminException("Error in serializing transfer request", e);
+        }
+    }
+
     public List<AgentInfo> listAgents() {
         List<AgentInfo> agents = new ArrayList<>();
         List<String> keys = kvClient.getKeys("mft/agents/info");
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferCommand.java
similarity index 51%
copy from admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
copy to admin/src/main/java/org/apache/airavata/mft/admin/models/TransferCommand.java
index 13a549f..175e867 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferCommand.java
@@ -19,69 +19,116 @@ package org.apache.airavata.mft.admin.models;
 
 import java.util.List;
 
-public class TransferRequest {
+public class TransferCommand {
 
     private String transferId;
     private String sourceId;
     private String sourceType;
     private String sourceToken;
+    private String sourceResourceBackend;
+    private String sourceCredentialBackend;
     private String destinationId;
     private String destinationType;
     private String destinationToken;
+    private String destResourceBackend;
+    private String destCredentialBackend;
 
-    public String getSourceId() {
-        return sourceId;
+    public String getTransferId() {
+        return transferId;
     }
 
-    public void setSourceId(String sourceId) {
-        this.sourceId = sourceId;
+    public TransferCommand setTransferId(String transferId) {
+        this.transferId = transferId;
+        return this;
     }
 
-    public String getDestinationId() {
-        return destinationId;
+    public String getSourceId() {
+        return sourceId;
     }
 
-    public void setDestinationId(String destinationId) {
-        this.destinationId = destinationId;
+    public TransferCommand setSourceId(String sourceId) {
+        this.sourceId = sourceId;
+        return this;
     }
 
     public String getSourceType() {
         return sourceType;
     }
 
-    public void setSourceType(String sourceType) {
+    public TransferCommand setSourceType(String sourceType) {
         this.sourceType = sourceType;
+        return this;
     }
 
-    public String getDestinationType() {
-        return destinationType;
+    public String getSourceToken() {
+        return sourceToken;
     }
 
-    public void setDestinationType(String destinationType) {
-        this.destinationType = destinationType;
+    public TransferCommand setSourceToken(String sourceToken) {
+        this.sourceToken = sourceToken;
+        return this;
     }
 
-    public String getSourceToken() {
-        return sourceToken;
+    public String getSourceResourceBackend() {
+        return sourceResourceBackend;
     }
 
-    public void setSourceToken(String sourceToken) {
-        this.sourceToken = sourceToken;
+    public TransferCommand setSourceResourceBackend(String sourceResourceBackend) {
+        this.sourceResourceBackend = sourceResourceBackend;
+        return this;
+    }
+
+    public String getSourceCredentialBackend() {
+        return sourceCredentialBackend;
+    }
+
+    public TransferCommand setSourceCredentialBackend(String sourceCredentialBackend) {
+        this.sourceCredentialBackend = sourceCredentialBackend;
+        return this;
+    }
+
+    public String getDestinationId() {
+        return destinationId;
+    }
+
+    public TransferCommand setDestinationId(String destinationId) {
+        this.destinationId = destinationId;
+        return this;
+    }
+
+    public String getDestinationType() {
+        return destinationType;
+    }
+
+    public TransferCommand setDestinationType(String destinationType) {
+        this.destinationType = destinationType;
+        return this;
     }
 
     public String getDestinationToken() {
         return destinationToken;
     }
 
-    public void setDestinationToken(String destinationToken) {
+    public TransferCommand setDestinationToken(String destinationToken) {
         this.destinationToken = destinationToken;
+        return this;
     }
 
-    public String getTransferId() {
-        return transferId;
+    public String getDestResourceBackend() {
+        return destResourceBackend;
     }
 
-    public void setTransferId(String transferId) {
-        this.transferId = transferId;
+    public TransferCommand setDestResourceBackend(String destResourceBackend) {
+        this.destResourceBackend = destResourceBackend;
+        return this;
+    }
+
+    public String getDestCredentialBackend() {
+        return destCredentialBackend;
+    }
+
+    public TransferCommand setDestCredentialBackend(String destCredentialBackend) {
+        this.destCredentialBackend = destCredentialBackend;
+        return this;
     }
 }
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
index 13a549f..ecd5cdc 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
@@ -17,71 +17,128 @@
 
 package org.apache.airavata.mft.admin.models;
 
-import java.util.List;
+import java.util.Map;
 
 public class TransferRequest {
 
-    private String transferId;
     private String sourceId;
     private String sourceType;
     private String sourceToken;
+    private String sourceResourceBackend;
+    private String sourceCredentialBackend;
     private String destinationId;
     private String destinationType;
     private String destinationToken;
+    private String destResourceBackend;
+    private String destCredentialBackend;
+    private boolean affinityTransfer;
+    private Map<String, Integer> targetAgents;
 
     public String getSourceId() {
         return sourceId;
     }
 
-    public void setSourceId(String sourceId) {
+    public TransferRequest setSourceId(String sourceId) {
         this.sourceId = sourceId;
+        return this;
     }
 
-    public String getDestinationId() {
-        return destinationId;
+    public String getSourceType() {
+        return sourceType;
     }
 
-    public void setDestinationId(String destinationId) {
-        this.destinationId = destinationId;
+    public TransferRequest setSourceType(String sourceType) {
+        this.sourceType = sourceType;
+        return this;
     }
 
-    public String getSourceType() {
-        return sourceType;
+    public String getSourceToken() {
+        return sourceToken;
     }
 
-    public void setSourceType(String sourceType) {
-        this.sourceType = sourceType;
+    public TransferRequest setSourceToken(String sourceToken) {
+        this.sourceToken = sourceToken;
+        return this;
     }
 
-    public String getDestinationType() {
-        return destinationType;
+    public String getSourceResourceBackend() {
+        return sourceResourceBackend;
     }
 
-    public void setDestinationType(String destinationType) {
-        this.destinationType = destinationType;
+    public TransferRequest setSourceResourceBackend(String sourceResourceBackend) {
+        this.sourceResourceBackend = sourceResourceBackend;
+        return this;
     }
 
-    public String getSourceToken() {
-        return sourceToken;
+    public String getSourceCredentialBackend() {
+        return sourceCredentialBackend;
     }
 
-    public void setSourceToken(String sourceToken) {
-        this.sourceToken = sourceToken;
+    public TransferRequest setSourceCredentialBackend(String sourceCredentialBackend) {
+        this.sourceCredentialBackend = sourceCredentialBackend;
+        return this;
+    }
+
+    public String getDestinationId() {
+        return destinationId;
+    }
+
+    public TransferRequest setDestinationId(String destinationId) {
+        this.destinationId = destinationId;
+        return this;
+    }
+
+    public String getDestinationType() {
+        return destinationType;
+    }
+
+    public TransferRequest setDestinationType(String destinationType) {
+        this.destinationType = destinationType;
+        return this;
     }
 
     public String getDestinationToken() {
         return destinationToken;
     }
 
-    public void setDestinationToken(String destinationToken) {
+    public TransferRequest setDestinationToken(String destinationToken) {
         this.destinationToken = destinationToken;
+        return this;
+    }
+
+    public String getDestResourceBackend() {
+        return destResourceBackend;
+    }
+
+    public TransferRequest setDestResourceBackend(String destResourceBackend) {
+        this.destResourceBackend = destResourceBackend;
+        return this;
+    }
+
+    public String getDestCredentialBackend() {
+        return destCredentialBackend;
+    }
+
+    public TransferRequest setDestCredentialBackend(String destCredentialBackend) {
+        this.destCredentialBackend = destCredentialBackend;
+        return this;
+    }
+
+    public boolean isAffinityTransfer() {
+        return affinityTransfer;
+    }
+
+    public TransferRequest setAffinityTransfer(boolean affinityTransfer) {
+        this.affinityTransfer = affinityTransfer;
+        return this;
     }
 
-    public String getTransferId() {
-        return transferId;
+    public Map<String, Integer> getTargetAgents() {
+        return targetAgents;
     }
 
-    public void setTransferId(String transferId) {
-        this.transferId = transferId;
+    public TransferRequest setTargetAgents(Map<String, Integer> targetAgents) {
+        this.targetAgents = targetAgents;
+        return this;
     }
 }
diff --git a/agent/pom.xml b/agent/pom.xml
index 54a4361..e6e01b8 100644
--- a/agent/pom.xml
+++ b/agent/pom.xml
@@ -56,7 +56,7 @@
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>log4j-over-slf4j</artifactId>
-            <version>1.7.26</version>
+            <version>${log4j.over.slf4j}</version>
         </dependency>
     </dependencies>
 
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index e3a9977..9d414da 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -29,7 +29,7 @@ import com.orbitz.consul.model.session.SessionCreatedResponse;
 import org.apache.airavata.mft.admin.MFTAdmin;
 import org.apache.airavata.mft.admin.MFTAdminException;
 import org.apache.airavata.mft.admin.models.AgentInfo;
-import org.apache.airavata.mft.admin.models.TransferRequest;
+import org.apache.airavata.mft.admin.models.TransferCommand;
 import org.apache.airavata.mft.admin.models.TransferState;
 import org.apache.airavata.mft.core.ResourceMetadata;
 import org.apache.airavata.mft.core.api.Connector;
@@ -83,6 +83,8 @@ public class MFTAgent implements CommandLineRunner {
     private long sessionRenewSeconds = 4;
     private long sessionTTLSeconds = 10;
 
+    private ObjectMapper mapper = new ObjectMapper();
+
     private MFTAdmin admin;
 
     public void init() {
@@ -99,10 +101,9 @@ public class MFTAgent implements CommandLineRunner {
                 Optional<String> decodedValue = value.getValueAsString();
                 decodedValue.ifPresent(v -> {
                     System.out.println(String.format("Value is: %s", v));
-                    ObjectMapper mapper = new ObjectMapper();
-                    TransferRequest request = null;
+                    TransferCommand request = null;
                     try {
-                        request = mapper.readValue(v, TransferRequest.class);
+                        request = mapper.readValue(v, TransferCommand.class);
                         logger.info("Received request " + request.getTransferId());
                         admin.updateTransferState(request.getTransferId(), new TransferState().setState("STARTING")
                                 .setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
diff --git a/agent/pom.xml b/controller/pom.xml
similarity index 54%
copy from agent/pom.xml
copy to controller/pom.xml
index 54a4361..f7a981e 100644
--- a/agent/pom.xml
+++ b/controller/pom.xml
@@ -30,21 +30,11 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>mft-agent</artifactId>
+    <artifactId>mft-controller</artifactId>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.airavata</groupId>
-            <artifactId>mft-scp-transport</artifactId>
-            <version>0.01-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>mft-local-transport</artifactId>
-            <version>0.01-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
             <artifactId>mft-admin</artifactId>
             <version>0.01-SNAPSHOT</version>
         </dependency>
@@ -56,38 +46,35 @@
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>log4j-over-slf4j</artifactId>
-            <version>1.7.26</version>
+            <version>${log4j.over.slf4j}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.github.lognet</groupId>
+            <artifactId>grpc-spring-boot-starter</artifactId>
+            <version>${grpc.spring.boot}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.springframework.boot</groupId>
+                    <artifactId>spring-boot-starter</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-jpa</artifactId>
+            <version>${spring.boot.data.jpa}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <version>${h2}</version>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>net.sf.dozer</groupId>
+            <artifactId>dozer</artifactId>
+            <version>${dozer}</version>
         </dependency>
     </dependencies>
 
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <version>${maven.assembly.plugin}</version>
-                <executions>
-                    <execution>
-                        <id>mft-agent-distribution-package</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                        <configuration>
-                            <tarLongFileMode>posix</tarLongFileMode>
-                            <finalName>${agent.dist.name}</finalName>
-                            <descriptors>
-                                <descriptor>src/main/assembly/agent-bin-assembly.xml</descriptor>
-                            </descriptors>
-                            <attach>false</attach>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-    <properties>
-        <agent.dist.name>MFT-Agent-0.01</agent.dist.name>
-        <maven.assembly.plugin>3.1.1</maven.assembly.plugin>
-    </properties>
 </project>
\ No newline at end of file
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/ControllerException.java b/controller/src/main/java/org/apache/airavata/mft/controller/ControllerException.java
new file mode 100644
index 0000000..329a89c
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/ControllerException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller;
+
+public class ControllerException extends Exception {
+
+    public ControllerException() {
+        super();
+    }
+
+    public ControllerException(String message) {
+        super(message);
+    }
+
+    public ControllerException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ControllerException(Throwable cause) {
+        super(cause);
+    }
+
+    protected ControllerException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
new file mode 100644
index 0000000..ad869e0
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.cache.ConsulCache;
+import com.orbitz.consul.cache.KVCache;
+import com.orbitz.consul.model.kv.Value;
+import org.apache.airavata.mft.admin.MFTAdmin;
+import org.apache.airavata.mft.admin.MFTAdminException;
+import org.apache.airavata.mft.admin.models.TransferCommand;
+import org.apache.airavata.mft.controller.db.entities.TransferEntity;
+import org.apache.airavata.mft.controller.db.repositories.TransferRepository;
+import org.apache.airavata.mft.admin.models.TransferRequest;
+import org.dozer.DozerBeanMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.PropertySource;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Semaphore;
+
+@PropertySource("classpath:application.properties")
+@SpringBootApplication()
+public class MFTController implements CommandLineRunner {
+
+    private static final Logger logger = LoggerFactory.getLogger(MFTController.class);
+
+    private final Semaphore mainHold = new Semaphore(0);
+
+    private Consul client;
+    private KeyValueClient kvClient;
+    private KVCache messageCache;
+    private ConsulCache.Listener<String, Value> cacheListener;
+
+    private MFTAdmin admin;
+
+    private ObjectMapper jsonMapper = new ObjectMapper();
+    private DozerBeanMapper dozerBeanMapper = new DozerBeanMapper();
+
+    @Autowired
+    private TransferRepository transferRepository;
+
+    public void init() {
+        client = Consul.builder().build();
+        kvClient = client.keyValueClient();
+        messageCache = KVCache.newCache(kvClient, "mft/controller/messages");
+        admin = new MFTAdmin();
+    }
+
+    private void acceptRequests() {
+        cacheListener = newValues -> {
+            newValues.forEach((key, value) -> {
+                String transferId = key.substring(key.lastIndexOf("/") + 1);
+                Optional<String> decodedValue = value.getValueAsString();
+                decodedValue.ifPresent(v -> {
+                    logger.info("Value is: {}", v);
+                    try {
+                        TransferRequest transferRequest = jsonMapper.readValue(v, TransferRequest.class);
+                        TransferEntity transferEntity = new TransferEntity();
+                        transferEntity.setTransferId(transferId);
+                        transferEntity.setSourceId(transferRequest.getSourceId())
+                                .setSourceToken(transferRequest.getSourceToken())
+                                .setSourceType(transferRequest.getSourceType())
+                                .setSourceResourceBackend(transferRequest.getSourceResourceBackend())
+                                .setSourceCredentialBackend(transferRequest.getSourceCredentialBackend())
+                                .setDestinationId(transferRequest.getDestinationId())
+                                .setDestinationToken(transferRequest.getDestinationToken())
+                                .setDestinationType(transferRequest.getDestinationType())
+                                .setDestResourceBackend(transferRequest.getDestResourceBackend())
+                                .setDestCredentialBackend(transferRequest.getDestCredentialBackend())
+                                .setAffinityTransfer(transferRequest.isAffinityTransfer());
+
+                        TransferEntity savedEntity = transferRepository.save(transferEntity);
+
+                        List<String> liveAgentIds = admin.getLiveAgentIds();
+                        if (liveAgentIds.isEmpty()) {
+                            logger.error("Live agents are not available. Skipping for now");
+                            throw new ControllerException("Live agents are not available. Skipping for now");
+                        }
+
+                        String selectedAgent = null;
+                        if (transferRequest.getTargetAgents() != null && !transferRequest.getTargetAgents().isEmpty()) {
+                            Optional<String> possibleAgent = transferRequest.getTargetAgents().keySet()
+                                    .stream().filter(req -> liveAgentIds.stream().anyMatch(agent -> agent.equals(req))).findFirst();
+                            if (possibleAgent.isPresent()) {
+                                selectedAgent = possibleAgent.get();
+                            }
+                        } else if (!transferRequest.isAffinityTransfer()){
+                            selectedAgent = liveAgentIds.get(0);
+                        }
+
+                        if (selectedAgent == null) {
+                            logger.error("Couldn't find an Agent that meet transfer requirements");
+                            throw new ControllerException("Couldn't find an Agent that meet transfer requirements");
+                        }
+
+                        TransferCommand transferCommand = new TransferCommand();
+                        transferCommand.setSourceId(transferRequest.getSourceId())
+                                .setSourceToken(transferRequest.getSourceToken())
+                                .setSourceType(transferRequest.getSourceType())
+                                .setSourceResourceBackend(transferRequest.getSourceResourceBackend())
+                                .setSourceCredentialBackend(transferRequest.getSourceCredentialBackend())
+                                .setDestinationId(transferRequest.getDestinationId())
+                                .setDestinationToken(transferRequest.getDestinationToken())
+                                .setDestinationType(transferRequest.getDestinationType())
+                                .setDestResourceBackend(transferRequest.getDestResourceBackend())
+                                .setDestCredentialBackend(transferRequest.getDestCredentialBackend())
+                                .setTransferId(savedEntity.getTransferId());
+
+                        admin.commandTransferToAgent(selectedAgent, transferCommand);
+                    } catch (Exception e) {
+                        logger.error("Failed to process the request", e);
+                    } finally {
+                        logger.info("Deleting key " + value.getKey());
+                        kvClient.deleteKey(value.getKey()); // Due to bug in consul https://github.com/hashicorp/consul/issues/571
+                    }
+                });
+            });
+        };
+        messageCache.addListener(cacheListener);
+        messageCache.start();
+    }
+
+
+    @Override
+    public void run(String... args) throws Exception {
+        init();
+        acceptRequests();
+        mainHold.acquire();
+    }
+
+    public static void main(String args[]) {
+        SpringApplication.run(MFTController.class);
+    }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TargetAgentEntity.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TargetAgentEntity.java
new file mode 100644
index 0000000..056df7e
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TargetAgentEntity.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.db.entities;
+
+import javax.persistence.*;
+
+@Entity
+@IdClass(TargetAgentPK.class)
+public class TargetAgentEntity {
+
+    @Id
+    @Column(name = "TRANSFER_ID")
+    private String transferId;
+
+    @ManyToOne()
+    @PrimaryKeyJoinColumn(name="TRANSFER_ID", referencedColumnName="TRANSFER_ID")
+    private TransferEntity transfer;
+
+    @Id
+    @Column(name = "AGENT_ID")
+    private String agentId;
+
+    @Column(name = "PRIORITY")
+    private int priority;
+
+    public TransferEntity getTransfer() {
+        return transfer;
+    }
+
+    public void setTransfer(TransferEntity transfer) {
+        this.transfer = transfer;
+    }
+
+    public String getAgentId() {
+        return agentId;
+    }
+
+    public void setAgentId(String agentId) {
+        this.agentId = agentId;
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public void setPriority(int priority) {
+        this.priority = priority;
+    }
+
+    public String getTransferId() {
+        return transferId;
+    }
+
+    public TargetAgentEntity setTransferId(String transferId) {
+        this.transferId = transferId;
+        return this;
+    }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TargetAgentPK.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TargetAgentPK.java
new file mode 100644
index 0000000..c0d7eb8
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TargetAgentPK.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.db.entities;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class TargetAgentPK implements Serializable {
+    private String transferId;
+    private String agentId;
+
+    public TargetAgentPK() {
+    }
+
+    public TargetAgentPK(String transferId, String agentId) {
+        this.transferId = transferId;
+        this.agentId = agentId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        TargetAgentPK agentId1 = (TargetAgentPK) o;
+        if (!transferId.equals(agentId1.transferId)) return false;
+        return agentId.equals(agentId1.agentId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(transferId, agentId);
+    }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferEntity.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferEntity.java
new file mode 100644
index 0000000..64b12dd
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferEntity.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.db.entities;
+
+import javax.persistence.*;
+import java.util.Set;
+
+@Entity
+public class TransferEntity {
+    @Id
+    @Column(name = "TRANSFER_ID")
+    private String transferId;
+
+    @Column(name = "SOURCE_ID")
+    private String sourceId;
+
+    @Column(name = "SOURCE_TYPE")
+    private String sourceType;
+
+    @Column(name = "SOURCE_TOKEN")
+    private String sourceToken;
+
+    @Column(name = "SOURCE_RESOURCE_BACKEND")
+    private String sourceResourceBackend;
+
+    @Column(name = "SOURCE_CREDENTIAL_BACKEND")
+    private String sourceCredentialBackend;
+
+    @Column(name = "DESTINATION_ID")
+    private String destinationId;
+
+    @Column(name = "DESTINATION_TYPE")
+    private String destinationType;
+
+    @Column(name = "DESTINATION_TOKEN")
+    private String destinationToken;
+
+    @Column(name = "DEST_RESOURCE_BACKEND")
+    private String destResourceBackend;
+
+    @Column(name = "DEST_CREDENTIAL_BACKEND")
+    private String destCredentialBackend;
+
+    @Column(name = "AFFINITY_TRANSFER")
+    private boolean affinityTransfer;
+
+    @OneToMany(targetEntity = TargetAgentEntity.class, cascade = CascadeType.ALL, fetch = FetchType.EAGER)
+    private Set<TargetAgentEntity> targetAgents;
+
+    @OneToMany(targetEntity = TransferExecutionEntity.class, cascade = CascadeType.ALL, fetch = FetchType.EAGER)
+    private Set<TransferExecutionEntity> executions;
+
+    @OneToMany(targetEntity = TransferStatusEntity.class, cascade = CascadeType.ALL, fetch = FetchType.EAGER)
+    private Set<TransferStatusEntity> statuses;
+
+    public String getTransferId() {
+        return transferId;
+    }
+
+    public TransferEntity setTransferId(String transferId) {
+        this.transferId = transferId;
+        return this;
+    }
+
+    public String getSourceId() {
+        return sourceId;
+    }
+
+    public TransferEntity setSourceId(String sourceId) {
+        this.sourceId = sourceId;
+        return this;
+    }
+
+    public String getSourceType() {
+        return sourceType;
+    }
+
+    public TransferEntity setSourceType(String sourceType) {
+        this.sourceType = sourceType;
+        return this;
+    }
+
+    public String getSourceToken() {
+        return sourceToken;
+    }
+
+    public TransferEntity setSourceToken(String sourceToken) {
+        this.sourceToken = sourceToken;
+        return this;
+    }
+
+    public String getSourceResourceBackend() {
+        return sourceResourceBackend;
+    }
+
+    public TransferEntity setSourceResourceBackend(String sourceResourceBackend) {
+        this.sourceResourceBackend = sourceResourceBackend;
+        return this;
+    }
+
+    public String getSourceCredentialBackend() {
+        return sourceCredentialBackend;
+    }
+
+    public TransferEntity setSourceCredentialBackend(String sourceCredentialBackend) {
+        this.sourceCredentialBackend = sourceCredentialBackend;
+        return this;
+    }
+
+    public String getDestinationId() {
+        return destinationId;
+    }
+
+    public TransferEntity setDestinationId(String destinationId) {
+        this.destinationId = destinationId;
+        return this;
+    }
+
+    public String getDestinationType() {
+        return destinationType;
+    }
+
+    public TransferEntity setDestinationType(String destinationType) {
+        this.destinationType = destinationType;
+        return this;
+    }
+
+    public String getDestinationToken() {
+        return destinationToken;
+    }
+
+    public TransferEntity setDestinationToken(String destinationToken) {
+        this.destinationToken = destinationToken;
+        return this;
+    }
+
+    public String getDestResourceBackend() {
+        return destResourceBackend;
+    }
+
+    public TransferEntity setDestResourceBackend(String destResourceBackend) {
+        this.destResourceBackend = destResourceBackend;
+        return this;
+    }
+
+    public String getDestCredentialBackend() {
+        return destCredentialBackend;
+    }
+
+    public TransferEntity setDestCredentialBackend(String destCredentialBackend) {
+        this.destCredentialBackend = destCredentialBackend;
+        return this;
+    }
+
+    public boolean isAffinityTransfer() {
+        return affinityTransfer;
+    }
+
+    public TransferEntity setAffinityTransfer(boolean affinityTransfer) {
+        this.affinityTransfer = affinityTransfer;
+        return this;
+    }
+
+    public Set<TargetAgentEntity> getTargetAgents() {
+        return targetAgents;
+    }
+
+    public TransferEntity setTargetAgents(Set<TargetAgentEntity> targetAgents) {
+        this.targetAgents = targetAgents;
+        return this;
+    }
+
+    public Set<TransferExecutionEntity> getExecutions() {
+        return executions;
+    }
+
+    public TransferEntity setExecutions(Set<TransferExecutionEntity> executions) {
+        this.executions = executions;
+        return this;
+    }
+
+    public Set<TransferStatusEntity> getStatuses() {
+        return statuses;
+    }
+
+    public TransferEntity setStatuses(Set<TransferStatusEntity> statuses) {
+        this.statuses = statuses;
+        return this;
+    }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferExecutionEntity.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferExecutionEntity.java
new file mode 100644
index 0000000..f95f5cd
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferExecutionEntity.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.db.entities;
+
+import javax.persistence.*;
+
+@Entity
+public class TransferExecutionEntity {
+    @Id
+    @Column(name = "ID")
+    @GeneratedValue
+    private int id;
+
+    @ManyToOne()
+    @JoinColumn(name = "TRANSFER_ID", referencedColumnName = "TRANSFER_ID")
+    private TransferEntity transfer;
+
+    @Column(name = "EXECUTING_AGENT")
+    private String executingAgent;
+
+    @Column(name = "TIME_OF_CHANGE")
+    private long timeOfChange;
+
+    public int getId() {
+        return id;
+    }
+
+    public void setId(int id) {
+        this.id = id;
+    }
+
+    public TransferEntity getTransfer() {
+        return transfer;
+    }
+
+    public void setTransfer(TransferEntity transfer) {
+        this.transfer = transfer;
+    }
+
+    public String getExecutingAgent() {
+        return executingAgent;
+    }
+
+    public void setExecutingAgent(String executingAgent) {
+        this.executingAgent = executingAgent;
+    }
+
+    public long getTimeOfChange() {
+        return timeOfChange;
+    }
+
+    public void setTimeOfChange(long timeOfChange) {
+        this.timeOfChange = timeOfChange;
+    }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferStatusEntity.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferStatusEntity.java
new file mode 100644
index 0000000..eb920ed
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferStatusEntity.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.db.entities;
+
+import javax.persistence.*;
+
+@Entity
+public class TransferStatusEntity {
+    @Id
+    @Column(name = "ID")
+    @GeneratedValue
+    private int id;
+
+    @ManyToOne()
+    @JoinColumn(name = "TRANSFER_ID", referencedColumnName = "TRANSFER_ID")
+    private TransferEntity transfer;
+
+    @Column(name = "STATUS")
+    private String status;
+
+    @Column(name = "TIME_OF_CHANGE")
+    private long timeOfChange;
+
+    public int getId() {
+        return id;
+    }
+
+    public void setId(int id) {
+        this.id = id;
+    }
+
+    public TransferEntity getTransfer() {
+        return transfer;
+    }
+
+    public void setTransfer(TransferEntity transfer) {
+        this.transfer = transfer;
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+    }
+
+    public long getTimeOfChange() {
+        return timeOfChange;
+    }
+
+    public void setTimeOfChange(long timeOfChange) {
+        this.timeOfChange = timeOfChange;
+    }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TargetAgentRepository.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TargetAgentRepository.java
new file mode 100644
index 0000000..d7fa73e
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TargetAgentRepository.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.db.repositories;
+
+import org.apache.airavata.mft.controller.db.entities.TargetAgentEntity;
+import org.springframework.data.repository.CrudRepository;
+
+public interface TargetAgentRepository extends CrudRepository<TargetAgentEntity, String> {
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TransferRepository.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TransferRepository.java
new file mode 100644
index 0000000..bf3a704
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TransferRepository.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.db.repositories;
+
+import org.apache.airavata.mft.controller.db.entities.TransferEntity;
+import org.springframework.data.repository.CrudRepository;
+
+public interface TransferRepository extends CrudRepository<TransferEntity, String> {
+}
diff --git a/controller/src/main/resources/application.properties b/controller/src/main/resources/application.properties
new file mode 100644
index 0000000..e69de29
diff --git a/pom.xml b/pom.xml
index 179adfb..26f0bd8 100755
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@
         <module>agent</module>
         <module>services</module>
         <module>admin</module>
+        <module>controller</module>
     </modules>
 
     <url>http://airavata.apache.org/</url>
@@ -113,6 +114,7 @@
         <protobuf.java>3.9.1</protobuf.java>
         <grpc.spring.boot>3.5.1</grpc.spring.boot>
         <spring.boot.data.jpa>2.2.1.RELEASE</spring.boot.data.jpa>
+        <log4j.over.slf4j>1.7.26</log4j.over.slf4j>
         <dozer>5.5.1</dozer>
         <jsch>0.1.55</jsch>
         <sshj>0.23.0</sshj>