You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/01/04 17:05:40 UTC
[airavata-mft] branch master updated: Adding MFT Controller
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new 3a1a326 Adding MFT Controller
3a1a326 is described below
commit 3a1a3262b69c749a84d69a0bd094953d2dbb087f
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Sat Jan 4 12:05:28 2020 -0500
Adding MFT Controller
---
.../org/apache/airavata/mft/admin/MFTAdmin.java | 20 +-
.../{TransferRequest.java => TransferCommand.java} | 93 +++++++---
.../airavata/mft/admin/models/TransferRequest.java | 105 ++++++++---
agent/pom.xml | 2 +-
.../org/apache/airavata/mft/agent/MFTAgent.java | 9 +-
{agent => controller}/pom.xml | 71 +++----
.../mft/controller/ControllerException.java | 41 +++++
.../airavata/mft/controller/MFTController.java | 158 ++++++++++++++++
.../controller/db/entities/TargetAgentEntity.java | 73 ++++++++
.../mft/controller/db/entities/TargetAgentPK.java | 48 +++++
.../mft/controller/db/entities/TransferEntity.java | 205 +++++++++++++++++++++
.../db/entities/TransferExecutionEntity.java | 70 +++++++
.../db/entities/TransferStatusEntity.java | 70 +++++++
.../db/repositories/TargetAgentRepository.java | 24 +++
.../db/repositories/TransferRepository.java | 24 +++
.../src/main/resources/application.properties | 0
pom.xml | 2 +
17 files changed, 915 insertions(+), 100 deletions(-)
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java b/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java
index bf0ae79..6868e9e 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/MFTAdmin.java
@@ -24,6 +24,7 @@ import com.orbitz.consul.ConsulException;
import com.orbitz.consul.KeyValueClient;
import com.orbitz.consul.model.kv.Value;
import org.apache.airavata.mft.admin.models.AgentInfo;
+import org.apache.airavata.mft.admin.models.TransferCommand;
import org.apache.airavata.mft.admin.models.TransferRequest;
import org.apache.airavata.mft.admin.models.TransferState;
@@ -44,20 +45,27 @@ public class MFTAdmin {
private KeyValueClient kvClient = client.keyValueClient();
private ObjectMapper mapper = new ObjectMapper();
- public String submitTransfer(String agentId, TransferRequest transferRequest) throws MFTAdminException {
+ public String submitTransfer(TransferRequest transferRequest) throws MFTAdminException{
try {
-
+ String asStr = mapper.writeValueAsString(transferRequest);
String transferId = UUID.randomUUID().toString();
- updateTransferState(transferId, new TransferState().setState("INITIALIZING").setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
- transferRequest.setTransferId(transferId);
- String asString = mapper.writeValueAsString(transferRequest);
- kvClient.putValue("mft/agents/messages/" + agentId + "/" + transferId, asString);
+ kvClient.putValue("mft/controller/messages/" + transferId, asStr);
return transferId;
} catch (JsonProcessingException e) {
throw new MFTAdminException("Error in serializing transfer request", e);
}
}
+ public void commandTransferToAgent(String agentId, TransferCommand transferCommand) throws MFTAdminException {
+ try {
+ updateTransferState(transferCommand.getTransferId(), new TransferState().setState("INITIALIZING").setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
+ String asString = mapper.writeValueAsString(transferCommand);
+ kvClient.putValue("mft/agents/messages/" + agentId + "/" + transferCommand.getTransferId(), asString);
+ } catch (JsonProcessingException e) {
+ throw new MFTAdminException("Error in serializing transfer request", e);
+ }
+ }
+
public List<AgentInfo> listAgents() {
List<AgentInfo> agents = new ArrayList<>();
List<String> keys = kvClient.getKeys("mft/agents/info");
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferCommand.java
similarity index 51%
copy from admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
copy to admin/src/main/java/org/apache/airavata/mft/admin/models/TransferCommand.java
index 13a549f..175e867 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferCommand.java
@@ -19,69 +19,116 @@ package org.apache.airavata.mft.admin.models;
import java.util.List;
-public class TransferRequest {
+public class TransferCommand {
private String transferId;
private String sourceId;
private String sourceType;
private String sourceToken;
+ private String sourceResourceBackend;
+ private String sourceCredentialBackend;
private String destinationId;
private String destinationType;
private String destinationToken;
+ private String destResourceBackend;
+ private String destCredentialBackend;
- public String getSourceId() {
- return sourceId;
+ public String getTransferId() {
+ return transferId;
}
- public void setSourceId(String sourceId) {
- this.sourceId = sourceId;
+ public TransferCommand setTransferId(String transferId) {
+ this.transferId = transferId;
+ return this;
}
- public String getDestinationId() {
- return destinationId;
+ public String getSourceId() {
+ return sourceId;
}
- public void setDestinationId(String destinationId) {
- this.destinationId = destinationId;
+ public TransferCommand setSourceId(String sourceId) {
+ this.sourceId = sourceId;
+ return this;
}
public String getSourceType() {
return sourceType;
}
- public void setSourceType(String sourceType) {
+ public TransferCommand setSourceType(String sourceType) {
this.sourceType = sourceType;
+ return this;
}
- public String getDestinationType() {
- return destinationType;
+ public String getSourceToken() {
+ return sourceToken;
}
- public void setDestinationType(String destinationType) {
- this.destinationType = destinationType;
+ public TransferCommand setSourceToken(String sourceToken) {
+ this.sourceToken = sourceToken;
+ return this;
}
- public String getSourceToken() {
- return sourceToken;
+ public String getSourceResourceBackend() {
+ return sourceResourceBackend;
}
- public void setSourceToken(String sourceToken) {
- this.sourceToken = sourceToken;
+ public TransferCommand setSourceResourceBackend(String sourceResourceBackend) {
+ this.sourceResourceBackend = sourceResourceBackend;
+ return this;
+ }
+
+ public String getSourceCredentialBackend() {
+ return sourceCredentialBackend;
+ }
+
+ public TransferCommand setSourceCredentialBackend(String sourceCredentialBackend) {
+ this.sourceCredentialBackend = sourceCredentialBackend;
+ return this;
+ }
+
+ public String getDestinationId() {
+ return destinationId;
+ }
+
+ public TransferCommand setDestinationId(String destinationId) {
+ this.destinationId = destinationId;
+ return this;
+ }
+
+ public String getDestinationType() {
+ return destinationType;
+ }
+
+ public TransferCommand setDestinationType(String destinationType) {
+ this.destinationType = destinationType;
+ return this;
}
public String getDestinationToken() {
return destinationToken;
}
- public void setDestinationToken(String destinationToken) {
+ public TransferCommand setDestinationToken(String destinationToken) {
this.destinationToken = destinationToken;
+ return this;
}
- public String getTransferId() {
- return transferId;
+ public String getDestResourceBackend() {
+ return destResourceBackend;
}
- public void setTransferId(String transferId) {
- this.transferId = transferId;
+ public TransferCommand setDestResourceBackend(String destResourceBackend) {
+ this.destResourceBackend = destResourceBackend;
+ return this;
+ }
+
+ public String getDestCredentialBackend() {
+ return destCredentialBackend;
+ }
+
+ public TransferCommand setDestCredentialBackend(String destCredentialBackend) {
+ this.destCredentialBackend = destCredentialBackend;
+ return this;
}
}
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
index 13a549f..ecd5cdc 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
@@ -17,71 +17,128 @@
package org.apache.airavata.mft.admin.models;
-import java.util.List;
+import java.util.Map;
public class TransferRequest {
- private String transferId;
private String sourceId;
private String sourceType;
private String sourceToken;
+ private String sourceResourceBackend;
+ private String sourceCredentialBackend;
private String destinationId;
private String destinationType;
private String destinationToken;
+ private String destResourceBackend;
+ private String destCredentialBackend;
+ private boolean affinityTransfer;
+ private Map<String, Integer> targetAgents;
public String getSourceId() {
return sourceId;
}
- public void setSourceId(String sourceId) {
+ public TransferRequest setSourceId(String sourceId) {
this.sourceId = sourceId;
+ return this;
}
- public String getDestinationId() {
- return destinationId;
+ public String getSourceType() {
+ return sourceType;
}
- public void setDestinationId(String destinationId) {
- this.destinationId = destinationId;
+ public TransferRequest setSourceType(String sourceType) {
+ this.sourceType = sourceType;
+ return this;
}
- public String getSourceType() {
- return sourceType;
+ public String getSourceToken() {
+ return sourceToken;
}
- public void setSourceType(String sourceType) {
- this.sourceType = sourceType;
+ public TransferRequest setSourceToken(String sourceToken) {
+ this.sourceToken = sourceToken;
+ return this;
}
- public String getDestinationType() {
- return destinationType;
+ public String getSourceResourceBackend() {
+ return sourceResourceBackend;
}
- public void setDestinationType(String destinationType) {
- this.destinationType = destinationType;
+ public TransferRequest setSourceResourceBackend(String sourceResourceBackend) {
+ this.sourceResourceBackend = sourceResourceBackend;
+ return this;
}
- public String getSourceToken() {
- return sourceToken;
+ public String getSourceCredentialBackend() {
+ return sourceCredentialBackend;
}
- public void setSourceToken(String sourceToken) {
- this.sourceToken = sourceToken;
+ public TransferRequest setSourceCredentialBackend(String sourceCredentialBackend) {
+ this.sourceCredentialBackend = sourceCredentialBackend;
+ return this;
+ }
+
+ public String getDestinationId() {
+ return destinationId;
+ }
+
+ public TransferRequest setDestinationId(String destinationId) {
+ this.destinationId = destinationId;
+ return this;
+ }
+
+ public String getDestinationType() {
+ return destinationType;
+ }
+
+ public TransferRequest setDestinationType(String destinationType) {
+ this.destinationType = destinationType;
+ return this;
}
public String getDestinationToken() {
return destinationToken;
}
- public void setDestinationToken(String destinationToken) {
+ public TransferRequest setDestinationToken(String destinationToken) {
this.destinationToken = destinationToken;
+ return this;
+ }
+
+ public String getDestResourceBackend() {
+ return destResourceBackend;
+ }
+
+ public TransferRequest setDestResourceBackend(String destResourceBackend) {
+ this.destResourceBackend = destResourceBackend;
+ return this;
+ }
+
+ public String getDestCredentialBackend() {
+ return destCredentialBackend;
+ }
+
+ public TransferRequest setDestCredentialBackend(String destCredentialBackend) {
+ this.destCredentialBackend = destCredentialBackend;
+ return this;
+ }
+
+ public boolean isAffinityTransfer() {
+ return affinityTransfer;
+ }
+
+ public TransferRequest setAffinityTransfer(boolean affinityTransfer) {
+ this.affinityTransfer = affinityTransfer;
+ return this;
}
- public String getTransferId() {
- return transferId;
+ public Map<String, Integer> getTargetAgents() {
+ return targetAgents;
}
- public void setTransferId(String transferId) {
- this.transferId = transferId;
+ public TransferRequest setTargetAgents(Map<String, Integer> targetAgents) {
+ this.targetAgents = targetAgents;
+ return this;
}
}
diff --git a/agent/pom.xml b/agent/pom.xml
index 54a4361..e6e01b8 100644
--- a/agent/pom.xml
+++ b/agent/pom.xml
@@ -56,7 +56,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
- <version>1.7.26</version>
+ <version>${log4j.over.slf4j}</version>
</dependency>
</dependencies>
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index e3a9977..9d414da 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -29,7 +29,7 @@ import com.orbitz.consul.model.session.SessionCreatedResponse;
import org.apache.airavata.mft.admin.MFTAdmin;
import org.apache.airavata.mft.admin.MFTAdminException;
import org.apache.airavata.mft.admin.models.AgentInfo;
-import org.apache.airavata.mft.admin.models.TransferRequest;
+import org.apache.airavata.mft.admin.models.TransferCommand;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.core.ResourceMetadata;
import org.apache.airavata.mft.core.api.Connector;
@@ -83,6 +83,8 @@ public class MFTAgent implements CommandLineRunner {
private long sessionRenewSeconds = 4;
private long sessionTTLSeconds = 10;
+ private ObjectMapper mapper = new ObjectMapper();
+
private MFTAdmin admin;
public void init() {
@@ -99,10 +101,9 @@ public class MFTAgent implements CommandLineRunner {
Optional<String> decodedValue = value.getValueAsString();
decodedValue.ifPresent(v -> {
System.out.println(String.format("Value is: %s", v));
- ObjectMapper mapper = new ObjectMapper();
- TransferRequest request = null;
+ TransferCommand request = null;
try {
- request = mapper.readValue(v, TransferRequest.class);
+ request = mapper.readValue(v, TransferCommand.class);
logger.info("Received request " + request.getTransferId());
admin.updateTransferState(request.getTransferId(), new TransferState().setState("STARTING")
.setPercentage(0).setUpdateTimeMils(System.currentTimeMillis()));
diff --git a/agent/pom.xml b/controller/pom.xml
similarity index 54%
copy from agent/pom.xml
copy to controller/pom.xml
index 54a4361..f7a981e 100644
--- a/agent/pom.xml
+++ b/controller/pom.xml
@@ -30,21 +30,11 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>mft-agent</artifactId>
+ <artifactId>mft-controller</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.airavata</groupId>
- <artifactId>mft-scp-transport</artifactId>
- <version>0.01-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>mft-local-transport</artifactId>
- <version>0.01-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
<artifactId>mft-admin</artifactId>
<version>0.01-SNAPSHOT</version>
</dependency>
@@ -56,38 +46,35 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
- <version>1.7.26</version>
+ <version>${log4j.over.slf4j}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.github.lognet</groupId>
+ <artifactId>grpc-spring-boot-starter</artifactId>
+ <version>${grpc.spring.boot}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-jpa</artifactId>
+ <version>${spring.boot.data.jpa}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <version>${h2}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>net.sf.dozer</groupId>
+ <artifactId>dozer</artifactId>
+ <version>${dozer}</version>
</dependency>
</dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>${maven.assembly.plugin}</version>
- <executions>
- <execution>
- <id>mft-agent-distribution-package</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <tarLongFileMode>posix</tarLongFileMode>
- <finalName>${agent.dist.name}</finalName>
- <descriptors>
- <descriptor>src/main/assembly/agent-bin-assembly.xml</descriptor>
- </descriptors>
- <attach>false</attach>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <properties>
- <agent.dist.name>MFT-Agent-0.01</agent.dist.name>
- <maven.assembly.plugin>3.1.1</maven.assembly.plugin>
- </properties>
</project>
\ No newline at end of file
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/ControllerException.java b/controller/src/main/java/org/apache/airavata/mft/controller/ControllerException.java
new file mode 100644
index 0000000..329a89c
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/ControllerException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller;
+
+public class ControllerException extends Exception {
+
+ public ControllerException() {
+ super();
+ }
+
+ public ControllerException(String message) {
+ super(message);
+ }
+
+ public ControllerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ControllerException(Throwable cause) {
+ super(cause);
+ }
+
+ protected ControllerException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
new file mode 100644
index 0000000..ad869e0
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.cache.ConsulCache;
+import com.orbitz.consul.cache.KVCache;
+import com.orbitz.consul.model.kv.Value;
+import org.apache.airavata.mft.admin.MFTAdmin;
+import org.apache.airavata.mft.admin.MFTAdminException;
+import org.apache.airavata.mft.admin.models.TransferCommand;
+import org.apache.airavata.mft.controller.db.entities.TransferEntity;
+import org.apache.airavata.mft.controller.db.repositories.TransferRepository;
+import org.apache.airavata.mft.admin.models.TransferRequest;
+import org.dozer.DozerBeanMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.PropertySource;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Semaphore;
+
+@PropertySource("classpath:application.properties")
+@SpringBootApplication()
+public class MFTController implements CommandLineRunner {
+
+ private static final Logger logger = LoggerFactory.getLogger(MFTController.class);
+
+ private final Semaphore mainHold = new Semaphore(0);
+
+ private Consul client;
+ private KeyValueClient kvClient;
+ private KVCache messageCache;
+ private ConsulCache.Listener<String, Value> cacheListener;
+
+ private MFTAdmin admin;
+
+ private ObjectMapper jsonMapper = new ObjectMapper();
+ private DozerBeanMapper dozerBeanMapper = new DozerBeanMapper();
+
+ @Autowired
+ private TransferRepository transferRepository;
+
+ public void init() {
+ client = Consul.builder().build();
+ kvClient = client.keyValueClient();
+ messageCache = KVCache.newCache(kvClient, "mft/controller/messages");
+ admin = new MFTAdmin();
+ }
+
+ private void acceptRequests() {
+ cacheListener = newValues -> {
+ newValues.forEach((key, value) -> {
+ String transferId = key.substring(key.lastIndexOf("/") + 1);
+ Optional<String> decodedValue = value.getValueAsString();
+ decodedValue.ifPresent(v -> {
+ logger.info("Value is: {}", v);
+ try {
+ TransferRequest transferRequest = jsonMapper.readValue(v, TransferRequest.class);
+ TransferEntity transferEntity = new TransferEntity();
+ transferEntity.setTransferId(transferId);
+ transferEntity.setSourceId(transferRequest.getSourceId())
+ .setSourceToken(transferRequest.getSourceToken())
+ .setSourceType(transferRequest.getSourceType())
+ .setSourceResourceBackend(transferRequest.getSourceResourceBackend())
+ .setSourceCredentialBackend(transferRequest.getSourceCredentialBackend())
+ .setDestinationId(transferRequest.getDestinationId())
+ .setDestinationToken(transferRequest.getDestinationToken())
+ .setDestinationType(transferRequest.getDestinationType())
+ .setDestResourceBackend(transferRequest.getDestResourceBackend())
+ .setDestCredentialBackend(transferRequest.getDestCredentialBackend())
+ .setAffinityTransfer(transferRequest.isAffinityTransfer());
+
+ TransferEntity savedEntity = transferRepository.save(transferEntity);
+
+ List<String> liveAgentIds = admin.getLiveAgentIds();
+ if (liveAgentIds.isEmpty()) {
+ logger.error("Live agents are not available. Skipping for now");
+ throw new ControllerException("Live agents are not available. Skipping for now");
+ }
+
+ String selectedAgent = null;
+ if (transferRequest.getTargetAgents() != null && !transferRequest.getTargetAgents().isEmpty()) {
+ Optional<String> possibleAgent = transferRequest.getTargetAgents().keySet()
+ .stream().filter(req -> liveAgentIds.stream().anyMatch(agent -> agent.equals(req))).findFirst();
+ if (possibleAgent.isPresent()) {
+ selectedAgent = possibleAgent.get();
+ }
+ } else if (!transferRequest.isAffinityTransfer()){
+ selectedAgent = liveAgentIds.get(0);
+ }
+
+ if (selectedAgent == null) {
+ logger.error("Couldn't find an Agent that meet transfer requirements");
+ throw new ControllerException("Couldn't find an Agent that meet transfer requirements");
+ }
+
+ TransferCommand transferCommand = new TransferCommand();
+ transferCommand.setSourceId(transferRequest.getSourceId())
+ .setSourceToken(transferRequest.getSourceToken())
+ .setSourceType(transferRequest.getSourceType())
+ .setSourceResourceBackend(transferRequest.getSourceResourceBackend())
+ .setSourceCredentialBackend(transferRequest.getSourceCredentialBackend())
+ .setDestinationId(transferRequest.getDestinationId())
+ .setDestinationToken(transferRequest.getDestinationToken())
+ .setDestinationType(transferRequest.getDestinationType())
+ .setDestResourceBackend(transferRequest.getDestResourceBackend())
+ .setDestCredentialBackend(transferRequest.getDestCredentialBackend())
+ .setTransferId(savedEntity.getTransferId());
+
+ admin.commandTransferToAgent(selectedAgent, transferCommand);
+ } catch (Exception e) {
+ logger.error("Failed to process the request", e);
+ } finally {
+ logger.info("Deleting key " + value.getKey());
+ kvClient.deleteKey(value.getKey()); // Due to bug in consul https://github.com/hashicorp/consul/issues/571
+ }
+ });
+ });
+ };
+ messageCache.addListener(cacheListener);
+ messageCache.start();
+ }
+
+
+ @Override
+ public void run(String... args) throws Exception {
+ init();
+ acceptRequests();
+ mainHold.acquire();
+ }
+
+ public static void main(String args[]) {
+ SpringApplication.run(MFTController.class);
+ }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TargetAgentEntity.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TargetAgentEntity.java
new file mode 100644
index 0000000..056df7e
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TargetAgentEntity.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.db.entities;
+
+import javax.persistence.*;
+
+@Entity
+@IdClass(TargetAgentPK.class)
+public class TargetAgentEntity {
+
+ @Id
+ @Column(name = "TRANSFER_ID")
+ private String transferId;
+
+ @ManyToOne()
+ @PrimaryKeyJoinColumn(name="TRANSFER_ID", referencedColumnName="TRANSFER_ID")
+ private TransferEntity transfer;
+
+ @Id
+ @Column(name = "AGENT_ID")
+ private String agentId;
+
+ @Column(name = "PRIORITY")
+ private int priority;
+
+ public TransferEntity getTransfer() {
+ return transfer;
+ }
+
+ public void setTransfer(TransferEntity transfer) {
+ this.transfer = transfer;
+ }
+
+ public String getAgentId() {
+ return agentId;
+ }
+
+ public void setAgentId(String agentId) {
+ this.agentId = agentId;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ public String getTransferId() {
+ return transferId;
+ }
+
+ public TargetAgentEntity setTransferId(String transferId) {
+ this.transferId = transferId;
+ return this;
+ }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TargetAgentPK.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TargetAgentPK.java
new file mode 100644
index 0000000..c0d7eb8
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TargetAgentPK.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.db.entities;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class TargetAgentPK implements Serializable {
+ private String transferId;
+ private String agentId;
+
+ public TargetAgentPK() {
+ }
+
+ public TargetAgentPK(String transferId, String agentId) {
+ this.transferId = transferId;
+ this.agentId = agentId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TargetAgentPK agentId1 = (TargetAgentPK) o;
+ if (!transferId.equals(agentId1.transferId)) return false;
+ return agentId.equals(agentId1.agentId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(transferId, agentId);
+ }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferEntity.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferEntity.java
new file mode 100644
index 0000000..64b12dd
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferEntity.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.db.entities;
+
+import javax.persistence.*;
+import java.util.Set;
+
+@Entity
+public class TransferEntity {
+ @Id
+ @Column(name = "TRANSFER_ID")
+ private String transferId;
+
+ @Column(name = "SOURCE_ID")
+ private String sourceId;
+
+ @Column(name = "SOURCE_TYPE")
+ private String sourceType;
+
+ @Column(name = "SOURCE_TOKEN")
+ private String sourceToken;
+
+ @Column(name = "SOURCE_RESOURCE_BACKEND")
+ private String sourceResourceBackend;
+
+ @Column(name = "SOURCE_CREDENTIAL_BACKEND")
+ private String sourceCredentialBackend;
+
+ @Column(name = "DESTINATION_ID")
+ private String destinationId;
+
+ @Column(name = "DESTINATION_TYPE")
+ private String destinationType;
+
+ @Column(name = "DESTINATION_TOKEN")
+ private String destinationToken;
+
+ @Column(name = "DEST_RESOURCE_BACKEND")
+ private String destResourceBackend;
+
+ @Column(name = "DEST_CREDENTIAL_BACKEND")
+ private String destCredentialBackend;
+
+ @Column(name = "AFFINITY_TRANSFER")
+ private boolean affinityTransfer;
+
+ @OneToMany(targetEntity = TargetAgentEntity.class, cascade = CascadeType.ALL, fetch = FetchType.EAGER)
+ private Set<TargetAgentEntity> targetAgents;
+
+ @OneToMany(targetEntity = TransferExecutionEntity.class, cascade = CascadeType.ALL, fetch = FetchType.EAGER)
+ private Set<TransferExecutionEntity> executions;
+
+ @OneToMany(targetEntity = TransferStatusEntity.class, cascade = CascadeType.ALL, fetch = FetchType.EAGER)
+ private Set<TransferStatusEntity> statuses;
+
+ public String getTransferId() {
+ return transferId;
+ }
+
+ public TransferEntity setTransferId(String transferId) {
+ this.transferId = transferId;
+ return this;
+ }
+
+ public String getSourceId() {
+ return sourceId;
+ }
+
+ public TransferEntity setSourceId(String sourceId) {
+ this.sourceId = sourceId;
+ return this;
+ }
+
+ public String getSourceType() {
+ return sourceType;
+ }
+
+ public TransferEntity setSourceType(String sourceType) {
+ this.sourceType = sourceType;
+ return this;
+ }
+
+ public String getSourceToken() {
+ return sourceToken;
+ }
+
+ public TransferEntity setSourceToken(String sourceToken) {
+ this.sourceToken = sourceToken;
+ return this;
+ }
+
+ public String getSourceResourceBackend() {
+ return sourceResourceBackend;
+ }
+
+ public TransferEntity setSourceResourceBackend(String sourceResourceBackend) {
+ this.sourceResourceBackend = sourceResourceBackend;
+ return this;
+ }
+
+ public String getSourceCredentialBackend() {
+ return sourceCredentialBackend;
+ }
+
+ public TransferEntity setSourceCredentialBackend(String sourceCredentialBackend) {
+ this.sourceCredentialBackend = sourceCredentialBackend;
+ return this;
+ }
+
+ public String getDestinationId() {
+ return destinationId;
+ }
+
+ public TransferEntity setDestinationId(String destinationId) {
+ this.destinationId = destinationId;
+ return this;
+ }
+
+ public String getDestinationType() {
+ return destinationType;
+ }
+
+ public TransferEntity setDestinationType(String destinationType) {
+ this.destinationType = destinationType;
+ return this;
+ }
+
+ public String getDestinationToken() {
+ return destinationToken;
+ }
+
+ public TransferEntity setDestinationToken(String destinationToken) {
+ this.destinationToken = destinationToken;
+ return this;
+ }
+
+ public String getDestResourceBackend() {
+ return destResourceBackend;
+ }
+
+ public TransferEntity setDestResourceBackend(String destResourceBackend) {
+ this.destResourceBackend = destResourceBackend;
+ return this;
+ }
+
+ public String getDestCredentialBackend() {
+ return destCredentialBackend;
+ }
+
+ public TransferEntity setDestCredentialBackend(String destCredentialBackend) {
+ this.destCredentialBackend = destCredentialBackend;
+ return this;
+ }
+
+ public boolean isAffinityTransfer() {
+ return affinityTransfer;
+ }
+
+ public TransferEntity setAffinityTransfer(boolean affinityTransfer) {
+ this.affinityTransfer = affinityTransfer;
+ return this;
+ }
+
+ public Set<TargetAgentEntity> getTargetAgents() {
+ return targetAgents;
+ }
+
+ public TransferEntity setTargetAgents(Set<TargetAgentEntity> targetAgents) {
+ this.targetAgents = targetAgents;
+ return this;
+ }
+
+ public Set<TransferExecutionEntity> getExecutions() {
+ return executions;
+ }
+
+ public TransferEntity setExecutions(Set<TransferExecutionEntity> executions) {
+ this.executions = executions;
+ return this;
+ }
+
+ public Set<TransferStatusEntity> getStatuses() {
+ return statuses;
+ }
+
+ public TransferEntity setStatuses(Set<TransferStatusEntity> statuses) {
+ this.statuses = statuses;
+ return this;
+ }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferExecutionEntity.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferExecutionEntity.java
new file mode 100644
index 0000000..f95f5cd
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferExecutionEntity.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.db.entities;
+
+import javax.persistence.*;
+
+@Entity
+public class TransferExecutionEntity {
+ @Id
+ @Column(name = "ID")
+ @GeneratedValue
+ private int id;
+
+ @ManyToOne()
+ @JoinColumn(name = "TRANSFER_ID", referencedColumnName = "TRANSFER_ID")
+ private TransferEntity transfer;
+
+ @Column(name = "EXECUTING_AGENT")
+ private String executingAgent;
+
+ @Column(name = "TIME_OF_CHANGE")
+ private long timeOfChange;
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public TransferEntity getTransfer() {
+ return transfer;
+ }
+
+ public void setTransfer(TransferEntity transfer) {
+ this.transfer = transfer;
+ }
+
+ public String getExecutingAgent() {
+ return executingAgent;
+ }
+
+ public void setExecutingAgent(String executingAgent) {
+ this.executingAgent = executingAgent;
+ }
+
+ public long getTimeOfChange() {
+ return timeOfChange;
+ }
+
+ public void setTimeOfChange(long timeOfChange) {
+ this.timeOfChange = timeOfChange;
+ }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferStatusEntity.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferStatusEntity.java
new file mode 100644
index 0000000..eb920ed
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferStatusEntity.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.db.entities;
+
+import javax.persistence.*;
+
+@Entity
+public class TransferStatusEntity {
+ @Id
+ @Column(name = "ID")
+ @GeneratedValue
+ private int id;
+
+ @ManyToOne()
+ @JoinColumn(name = "TRANSFER_ID", referencedColumnName = "TRANSFER_ID")
+ private TransferEntity transfer;
+
+ @Column(name = "STATUS")
+ private String status;
+
+ @Column(name = "TIME_OF_CHANGE")
+ private long timeOfChange;
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public TransferEntity getTransfer() {
+ return transfer;
+ }
+
+ public void setTransfer(TransferEntity transfer) {
+ this.transfer = transfer;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public long getTimeOfChange() {
+ return timeOfChange;
+ }
+
+ public void setTimeOfChange(long timeOfChange) {
+ this.timeOfChange = timeOfChange;
+ }
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TargetAgentRepository.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TargetAgentRepository.java
new file mode 100644
index 0000000..d7fa73e
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TargetAgentRepository.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.db.repositories;
+
+import org.apache.airavata.mft.controller.db.entities.TargetAgentEntity;
+import org.springframework.data.repository.CrudRepository;
+
+public interface TargetAgentRepository extends CrudRepository<TargetAgentEntity, String> {
+}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TransferRepository.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TransferRepository.java
new file mode 100644
index 0000000..bf3a704
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TransferRepository.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.db.repositories;
+
+import org.apache.airavata.mft.controller.db.entities.TransferEntity;
+import org.springframework.data.repository.CrudRepository;
+
+public interface TransferRepository extends CrudRepository<TransferEntity, String> {
+}
diff --git a/controller/src/main/resources/application.properties b/controller/src/main/resources/application.properties
new file mode 100644
index 0000000..e69de29
diff --git a/pom.xml b/pom.xml
index 179adfb..26f0bd8 100755
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@
<module>agent</module>
<module>services</module>
<module>admin</module>
+ <module>controller</module>
</modules>
<url>http://airavata.apache.org/</url>
@@ -113,6 +114,7 @@
<protobuf.java>3.9.1</protobuf.java>
<grpc.spring.boot>3.5.1</grpc.spring.boot>
<spring.boot.data.jpa>2.2.1.RELEASE</spring.boot.data.jpa>
+ <log4j.over.slf4j>1.7.26</log4j.over.slf4j>
<dozer>5.5.1</dozer>
<jsch>0.1.55</jsch>
<sshj>0.23.0</sshj>