You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/01/06 07:12:42 UTC
[airavata-mft] branch master updated: Fetching and persisting
transfer status in controller
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new d552005 Fetching and persisting transfer status in controller
d552005 is described below
commit d552005756a4631b27919da4adcaaf37c47a3d58
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon Jan 6 02:12:27 2020 -0500
Fetching and persisting transfer status in controller
---
.../airavata/mft/controller/MFTController.java | 49 ++++++++++++++++++++--
.../db/entities/TransferStatusEntity.java | 36 +++++++++++-----
.../db/repositories/TransferStatusRepository.java | 24 +++++++++++
3 files changed, 95 insertions(+), 14 deletions(-)
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
index ad869e0..2c2fe49 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
@@ -24,11 +24,13 @@ import com.orbitz.consul.cache.ConsulCache;
import com.orbitz.consul.cache.KVCache;
import com.orbitz.consul.model.kv.Value;
import org.apache.airavata.mft.admin.MFTAdmin;
-import org.apache.airavata.mft.admin.MFTAdminException;
import org.apache.airavata.mft.admin.models.TransferCommand;
+import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.controller.db.entities.TransferEntity;
+import org.apache.airavata.mft.controller.db.entities.TransferStatusEntity;
import org.apache.airavata.mft.controller.db.repositories.TransferRepository;
import org.apache.airavata.mft.admin.models.TransferRequest;
+import org.apache.airavata.mft.controller.db.repositories.TransferStatusRepository;
import org.dozer.DozerBeanMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +40,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.PropertySource;
+import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Semaphore;
@@ -53,7 +56,9 @@ public class MFTController implements CommandLineRunner {
private Consul client;
private KeyValueClient kvClient;
private KVCache messageCache;
- private ConsulCache.Listener<String, Value> cacheListener;
+ private KVCache stateCache;
+ private ConsulCache.Listener<String, Value> messageCacheListener;
+ private ConsulCache.Listener<String, Value> stateCacheListener;
private MFTAdmin admin;
@@ -63,15 +68,19 @@ public class MFTController implements CommandLineRunner {
@Autowired
private TransferRepository transferRepository;
+ @Autowired
+ private TransferStatusRepository statusRepository;
+
public void init() {
client = Consul.builder().build();
kvClient = client.keyValueClient();
messageCache = KVCache.newCache(kvClient, "mft/controller/messages");
+ stateCache = KVCache.newCache(kvClient, "mft/transfer/state");
admin = new MFTAdmin();
}
private void acceptRequests() {
- cacheListener = newValues -> {
+ messageCacheListener = newValues -> {
newValues.forEach((key, value) -> {
String transferId = key.substring(key.lastIndexOf("/") + 1);
Optional<String> decodedValue = value.getValueAsString();
@@ -140,15 +149,47 @@ public class MFTController implements CommandLineRunner {
});
});
};
- messageCache.addListener(cacheListener);
+ messageCache.addListener(messageCacheListener);
messageCache.start();
}
+ private void acceptStates() {
+ stateCacheListener = newValues -> {
+ newValues.forEach((key, value) -> {
+ try {
+ if (value.getValueAsString().isPresent()) {
+ String asStr = value.getValueAsString().get();
+ logger.info("Received state {}", asStr);
+ TransferState transferState = jsonMapper.readValue(asStr, TransferState.class);
+ String transferId = key.substring(key.lastIndexOf("/") + 1);
+ Optional<TransferEntity> transferEntity = transferRepository.findById(transferId);
+ if (transferEntity.isPresent()) {
+ TransferStatusEntity ety = new TransferStatusEntity()
+ .setPercentage(transferState.getPercentage())
+ .setStatus(transferState.getState())
+ .setUpdateTimeMils(transferState.getUpdateTimeMils())
+ .setTransfer(transferEntity.get());
+ statusRepository.save(ety);
+ logger.info("Saved state for transfer {}", transferId);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ logger.info("Deleting key " + value.getKey());
+ kvClient.deleteKey(value.getKey()); // Due to bug in consul https://github.com/hashicorp/consul/issues/571
+ }
+ });
+ };
+ stateCache.addListener(stateCacheListener);
+ stateCache.start();
+ }
@Override
public void run(String... args) throws Exception {
init();
acceptRequests();
+ acceptStates();
mainHold.acquire();
}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferStatusEntity.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferStatusEntity.java
index eb920ed..cd8cfe0 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferStatusEntity.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferStatusEntity.java
@@ -30,41 +30,57 @@ public class TransferStatusEntity {
@JoinColumn(name = "TRANSFER_ID", referencedColumnName = "TRANSFER_ID")
private TransferEntity transfer;
- @Column(name = "STATUS")
+ @Column(name = "STATE")
private String status;
- @Column(name = "TIME_OF_CHANGE")
- private long timeOfChange;
+ @Column(name = "UPDATE_TIME")
+ private long updateTimeMils;
+
+ @Column(name = "PERCENTAGE")
+ private double percentage;
public int getId() {
return id;
}
- public void setId(int id) {
+ public TransferStatusEntity setId(int id) {
this.id = id;
+ return this;
}
public TransferEntity getTransfer() {
return transfer;
}
- public void setTransfer(TransferEntity transfer) {
+ public TransferStatusEntity setTransfer(TransferEntity transfer) {
this.transfer = transfer;
+ return this;
}
public String getStatus() {
return status;
}
- public void setStatus(String status) {
+ public TransferStatusEntity setStatus(String status) {
this.status = status;
+ return this;
+ }
+
+ public long getUpdateTimeMils() {
+ return updateTimeMils;
+ }
+
+ public TransferStatusEntity setUpdateTimeMils(long updateTimeMils) {
+ this.updateTimeMils = updateTimeMils;
+ return this;
}
- public long getTimeOfChange() {
- return timeOfChange;
+ public double getPercentage() {
+ return percentage;
}
- public void setTimeOfChange(long timeOfChange) {
- this.timeOfChange = timeOfChange;
+ public TransferStatusEntity setPercentage(double percentage) {
+ this.percentage = percentage;
+ return this;
}
}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TransferStatusRepository.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TransferStatusRepository.java
new file mode 100644
index 0000000..0f03d8e
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TransferStatusRepository.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.db.repositories;
+
+import org.apache.airavata.mft.controller.db.entities.TransferStatusEntity;
+import org.springframework.data.repository.CrudRepository;
+
+public interface TransferStatusRepository extends CrudRepository<TransferStatusEntity, Integer> {
+}