You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/04/03 19:54:13 UTC

[airavata] branch develop updated: Publishing process status as Executing once the workflow is launched

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new bf3943a  Publishing process status as Executing once the workflow is launched
bf3943a is described below

commit bf3943a37fc182e7ad884c9683e8563f4bc29d5b
Author: dimuthu <di...@gmail.com>
AuthorDate: Tue Apr 3 15:54:05 2018 -0400

    Publishing process status as Executing once the workflow is launched
---
 .../helix/impl/workflow/PreWorkflowManager.java    | 56 ++++++++++++++--------
 1 file changed, 36 insertions(+), 20 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index 53f1512..6733b42 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -20,7 +20,7 @@
 package org.apache.airavata.helix.impl.workflow;
 
 import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.helix.core.AbstractTask;
@@ -35,15 +35,16 @@ import org.apache.airavata.helix.impl.task.submission.DefaultJobSubmissionTask;
 import org.apache.airavata.helix.workflow.WorkflowManager;
 import org.apache.airavata.messaging.core.*;
 import org.apache.airavata.model.experiment.ExperimentModel;
-import org.apache.airavata.model.messaging.event.MessageType;
-import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
-import org.apache.airavata.model.messaging.event.ProcessTerminateEvent;
+import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
 import org.apache.airavata.model.task.TaskModel;
 import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.ExperimentCatalog;
 import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.RegistryException;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -54,28 +55,33 @@ import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
 
 public class PreWorkflowManager {
 
     private final static Logger logger = LoggerFactory.getLogger(PreWorkflowManager.class);
 
     private Subscriber subscriber;
+    private Publisher statusPublisher;
     private CuratorFramework curatorClient = null;
+    private WorkflowManager workflowManager;
 
     @SuppressWarnings("WeakerAccess")
-    public PreWorkflowManager() throws AiravataException {
+    public PreWorkflowManager() throws Exception {
         init();
     }
 
-    private void init() throws AiravataException {
+    private void init() throws Exception {
+
+        workflowManager = new WorkflowManager(
+                ServerSettings.getSetting("helix.cluster.name"),
+                ServerSettings.getSetting("pre.workflow.manager.name"),
+                ServerSettings.getZookeeperConnection());
+
         List<String> routingKeys = new ArrayList<>();
         routingKeys.add(ServerSettings.getRabbitmqProcessExchangeName());
         this.subscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(), routingKeys, Type.PROCESS_LAUNCH);
-
+        this.statusPublisher = MessagingFactory.getPublisher(Type.STATUS);
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         this.curatorClient = CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), retryPolicy);
         this.curatorClient.start();
@@ -149,10 +155,6 @@ public class PreWorkflowManager {
             }
         }
 
-        WorkflowManager workflowManager = new WorkflowManager(
-                ServerSettings.getSetting("helix.cluster.name"),
-                ServerSettings.getSetting("pre.workflow.manager.name"),
-                ServerSettings.getZookeeperConnection());
         String workflowName = workflowManager.launchWorkflow(processId + "-PRE-" + UUID.randomUUID().toString(),
                 new ArrayList<>(allTasks), true, false);
         try {
@@ -216,11 +218,6 @@ public class PreWorkflowManager {
         }
         allTasks.add(cct);
 
-        WorkflowManager workflowManager = new WorkflowManager(
-                ServerSettings.getSetting("helix.cluster.name"),
-                ServerSettings.getSetting("pre.workflow.manager.name"),
-                ServerSettings.getZookeeperConnection());
-
         String workflow = workflowManager.launchWorkflow(processId + "-CANCEL-" + UUID.randomUUID().toString(), allTasks, true, false);
         logger.info("Started launching workflow " + workflow + " to cancel process " + processId);
         return workflow;
@@ -257,11 +254,18 @@ public class PreWorkflowManager {
                     logger.info("Launching the pre workflow for process " + processId + " in gateway " + gateway);
                     String workflowName = createAndLaunchPreWorkflow(processId, gateway);
                     logger.info("Completed launching the pre workflow " + workflowName + " for process " + processId + " in gateway " + gateway);
+
+                    // updating the process status
+                    ProcessStatus status = new ProcessStatus();
+                    status.setState(ProcessState.STARTED);
+                    status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+                    publishProcessStatus(event, status);
                     subscriber.sendAck(messageContext.getDeliveryTag());
                 } catch (Exception e) {
                     logger.error("Failed to launch the pre workflow for process " + processId + " in gateway " + gateway, e);
                     //subscriber.sendAck(messageContext.getDeliveryTag());
                 }
+
             } else if (messageContext.getType().equals(MessageType.TERMINATEPROCESS)) {
                 ProcessTerminateEvent event = new ProcessTerminateEvent();
                 TBase messageEvent = messageContext.getEvent();
@@ -294,4 +298,16 @@ public class PreWorkflowManager {
             }
         }
     }
+
+    private void publishProcessStatus(ProcessSubmitEvent event, ProcessStatus status) throws AiravataException, RegistryException {
+
+        ExperimentCatalog experimentCatalog = RegistryFactory.getExperimentCatalog(event.getGatewayId());
+        experimentCatalog.update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
+        ProcessIdentifier identifier = new ProcessIdentifier(event.getProcessId(), event.getExperimentId(), event.getGatewayId());
+        ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier);
+        MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
+                AiravataUtils.getId(MessageType.PROCESS.name()), event.getGatewayId());
+        msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+        statusPublisher.publish(msgCtx);
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.