You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/01/06 07:12:42 UTC

[airavata-mft] branch master updated: Fetching and persisting transfer status in controller

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new d552005  Fetching and persisting transfer status in controller
d552005 is described below

commit d552005756a4631b27919da4adcaaf37c47a3d58
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon Jan 6 02:12:27 2020 -0500

    Fetching and persisting transfer status in controller
---
 .../airavata/mft/controller/MFTController.java     | 49 ++++++++++++++++++++--
 .../db/entities/TransferStatusEntity.java          | 36 +++++++++++-----
 .../db/repositories/TransferStatusRepository.java  | 24 +++++++++++
 3 files changed, 95 insertions(+), 14 deletions(-)

diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
index ad869e0..2c2fe49 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
@@ -24,11 +24,13 @@ import com.orbitz.consul.cache.ConsulCache;
 import com.orbitz.consul.cache.KVCache;
 import com.orbitz.consul.model.kv.Value;
 import org.apache.airavata.mft.admin.MFTAdmin;
-import org.apache.airavata.mft.admin.MFTAdminException;
 import org.apache.airavata.mft.admin.models.TransferCommand;
+import org.apache.airavata.mft.admin.models.TransferState;
 import org.apache.airavata.mft.controller.db.entities.TransferEntity;
+import org.apache.airavata.mft.controller.db.entities.TransferStatusEntity;
 import org.apache.airavata.mft.controller.db.repositories.TransferRepository;
 import org.apache.airavata.mft.admin.models.TransferRequest;
+import org.apache.airavata.mft.controller.db.repositories.TransferStatusRepository;
 import org.dozer.DozerBeanMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +40,7 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.context.annotation.PropertySource;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.Semaphore;
@@ -53,7 +56,9 @@ public class MFTController implements CommandLineRunner {
     private Consul client;
     private KeyValueClient kvClient;
     private KVCache messageCache;
-    private ConsulCache.Listener<String, Value> cacheListener;
+    private KVCache stateCache;
+    private ConsulCache.Listener<String, Value> messageCacheListener;
+    private ConsulCache.Listener<String, Value> stateCacheListener;
 
     private MFTAdmin admin;
 
@@ -63,15 +68,19 @@ public class MFTController implements CommandLineRunner {
     @Autowired
     private TransferRepository transferRepository;
 
+    @Autowired
+    private TransferStatusRepository statusRepository;
+
     public void init() {
         client = Consul.builder().build();
         kvClient = client.keyValueClient();
         messageCache = KVCache.newCache(kvClient, "mft/controller/messages");
+        stateCache = KVCache.newCache(kvClient, "mft/transfer/state");
         admin = new MFTAdmin();
     }
 
     private void acceptRequests() {
-        cacheListener = newValues -> {
+        messageCacheListener = newValues -> {
             newValues.forEach((key, value) -> {
                 String transferId = key.substring(key.lastIndexOf("/") + 1);
                 Optional<String> decodedValue = value.getValueAsString();
@@ -140,15 +149,47 @@ public class MFTController implements CommandLineRunner {
                 });
             });
         };
-        messageCache.addListener(cacheListener);
+        messageCache.addListener(messageCacheListener);
         messageCache.start();
     }
 
+    private void acceptStates() {
+        stateCacheListener = newValues -> {
+            newValues.forEach((key, value) -> {
+                try {
+                    if (value.getValueAsString().isPresent()) {
+                        String asStr = value.getValueAsString().get();
+                        logger.info("Received state {}", asStr);
+                        TransferState transferState = jsonMapper.readValue(asStr, TransferState.class);
+                        String transferId = key.substring(key.lastIndexOf("/") + 1);
+                        Optional<TransferEntity> transferEntity = transferRepository.findById(transferId);
+                        if (transferEntity.isPresent()) {
+                            TransferStatusEntity ety = new TransferStatusEntity()
+                                    .setPercentage(transferState.getPercentage())
+                                    .setStatus(transferState.getState())
+                                    .setUpdateTimeMils(transferState.getUpdateTimeMils())
+                                    .setTransfer(transferEntity.get());
+                            statusRepository.save(ety);
+                            logger.info("Saved state for transfer {}", transferId);
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+                    logger.info("Deleting key " + value.getKey());
+                    kvClient.deleteKey(value.getKey()); // Due to bug in consul https://github.com/hashicorp/consul/issues/571
+                }
+            });
+        };
+        stateCache.addListener(stateCacheListener);
+        stateCache.start();
+    }
 
     @Override
     public void run(String... args) throws Exception {
         init();
         acceptRequests();
+        acceptStates();
         mainHold.acquire();
     }
 
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferStatusEntity.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferStatusEntity.java
index eb920ed..cd8cfe0 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferStatusEntity.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/entities/TransferStatusEntity.java
@@ -30,41 +30,57 @@ public class TransferStatusEntity {
     @JoinColumn(name = "TRANSFER_ID", referencedColumnName = "TRANSFER_ID")
     private TransferEntity transfer;
 
-    @Column(name = "STATUS")
+    @Column(name = "STATE")
     private String status;
 
-    @Column(name = "TIME_OF_CHANGE")
-    private long timeOfChange;
+    @Column(name = "UPDATE_TIME")
+    private long updateTimeMils;
+
+    @Column(name = "PERCENTAGE")
+    private double percentage;
 
     public int getId() {
         return id;
     }
 
-    public void setId(int id) {
+    public TransferStatusEntity setId(int id) {
         this.id = id;
+        return this;
     }
 
     public TransferEntity getTransfer() {
         return transfer;
     }
 
-    public void setTransfer(TransferEntity transfer) {
+    public TransferStatusEntity setTransfer(TransferEntity transfer) {
         this.transfer = transfer;
+        return this;
     }
 
     public String getStatus() {
         return status;
     }
 
-    public void setStatus(String status) {
+    public TransferStatusEntity setStatus(String status) {
         this.status = status;
+        return this;
+    }
+
+    public long getUpdateTimeMils() {
+        return updateTimeMils;
+    }
+
+    public TransferStatusEntity setUpdateTimeMils(long updateTimeMils) {
+        this.updateTimeMils = updateTimeMils;
+        return this;
     }
 
-    public long getTimeOfChange() {
-        return timeOfChange;
+    public double getPercentage() {
+        return percentage;
     }
 
-    public void setTimeOfChange(long timeOfChange) {
-        this.timeOfChange = timeOfChange;
+    public TransferStatusEntity setPercentage(double percentage) {
+        this.percentage = percentage;
+        return this;
     }
 }
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TransferStatusRepository.java b/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TransferStatusRepository.java
new file mode 100644
index 0000000..0f03d8e
--- /dev/null
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/db/repositories/TransferStatusRepository.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.controller.db.repositories;
+
+import org.apache.airavata.mft.controller.db.entities.TransferStatusEntity;
+import org.springframework.data.repository.CrudRepository;
+
+public interface TransferStatusRepository extends CrudRepository<TransferStatusEntity, Integer> {
+}