You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/04/03 19:54:13 UTC
[airavata] branch develop updated: Publishing process status as
Executing once the workflow is launched
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push:
new bf3943a Publishing process status as Executing once the workflow is launched
bf3943a is described below
commit bf3943a37fc182e7ad884c9683e8563f4bc29d5b
Author: dimuthu <di...@gmail.com>
AuthorDate: Tue Apr 3 15:54:05 2018 -0400
Publishing process status as Executing once the workflow is launched
---
.../helix/impl/workflow/PreWorkflowManager.java | 56 ++++++++++++++--------
1 file changed, 36 insertions(+), 20 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index 53f1512..6733b42 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -20,7 +20,7 @@
package org.apache.airavata.helix.impl.workflow;
import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.helix.core.AbstractTask;
@@ -35,15 +35,16 @@ import org.apache.airavata.helix.impl.task.submission.DefaultJobSubmissionTask;
import org.apache.airavata.helix.workflow.WorkflowManager;
import org.apache.airavata.messaging.core.*;
import org.apache.airavata.model.experiment.ExperimentModel;
-import org.apache.airavata.model.messaging.event.MessageType;
-import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
-import org.apache.airavata.model.messaging.event.ProcessTerminateEvent;
+import org.apache.airavata.model.messaging.event.*;
import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
import org.apache.airavata.model.task.TaskModel;
import org.apache.airavata.model.task.TaskTypes;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -54,28 +55,33 @@ import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
public class PreWorkflowManager {
private final static Logger logger = LoggerFactory.getLogger(PreWorkflowManager.class);
private Subscriber subscriber;
+ private Publisher statusPublisher;
private CuratorFramework curatorClient = null;
+ private WorkflowManager workflowManager;
@SuppressWarnings("WeakerAccess")
- public PreWorkflowManager() throws AiravataException {
+ public PreWorkflowManager() throws Exception {
init();
}
- private void init() throws AiravataException {
+ private void init() throws Exception {
+
+ workflowManager = new WorkflowManager(
+ ServerSettings.getSetting("helix.cluster.name"),
+ ServerSettings.getSetting("pre.workflow.manager.name"),
+ ServerSettings.getZookeeperConnection());
+
List<String> routingKeys = new ArrayList<>();
routingKeys.add(ServerSettings.getRabbitmqProcessExchangeName());
this.subscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(), routingKeys, Type.PROCESS_LAUNCH);
-
+ this.statusPublisher = MessagingFactory.getPublisher(Type.STATUS);
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
this.curatorClient = CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(), retryPolicy);
this.curatorClient.start();
@@ -149,10 +155,6 @@ public class PreWorkflowManager {
}
}
- WorkflowManager workflowManager = new WorkflowManager(
- ServerSettings.getSetting("helix.cluster.name"),
- ServerSettings.getSetting("pre.workflow.manager.name"),
- ServerSettings.getZookeeperConnection());
String workflowName = workflowManager.launchWorkflow(processId + "-PRE-" + UUID.randomUUID().toString(),
new ArrayList<>(allTasks), true, false);
try {
@@ -216,11 +218,6 @@ public class PreWorkflowManager {
}
allTasks.add(cct);
- WorkflowManager workflowManager = new WorkflowManager(
- ServerSettings.getSetting("helix.cluster.name"),
- ServerSettings.getSetting("pre.workflow.manager.name"),
- ServerSettings.getZookeeperConnection());
-
String workflow = workflowManager.launchWorkflow(processId + "-CANCEL-" + UUID.randomUUID().toString(), allTasks, true, false);
logger.info("Started launching workflow " + workflow + " to cancel process " + processId);
return workflow;
@@ -257,11 +254,18 @@ public class PreWorkflowManager {
logger.info("Launching the pre workflow for process " + processId + " in gateway " + gateway);
String workflowName = createAndLaunchPreWorkflow(processId, gateway);
logger.info("Completed launching the pre workflow " + workflowName + " for process " + processId + " in gateway " + gateway);
+
+ // updating the process status
+ ProcessStatus status = new ProcessStatus();
+ status.setState(ProcessState.STARTED);
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ publishProcessStatus(event, status);
subscriber.sendAck(messageContext.getDeliveryTag());
} catch (Exception e) {
logger.error("Failed to launch the pre workflow for process " + processId + " in gateway " + gateway, e);
//subscriber.sendAck(messageContext.getDeliveryTag());
}
+
} else if (messageContext.getType().equals(MessageType.TERMINATEPROCESS)) {
ProcessTerminateEvent event = new ProcessTerminateEvent();
TBase messageEvent = messageContext.getEvent();
@@ -294,4 +298,16 @@ public class PreWorkflowManager {
}
}
}
+
+ private void publishProcessStatus(ProcessSubmitEvent event, ProcessStatus status) throws AiravataException, RegistryException {
+
+ ExperimentCatalog experimentCatalog = RegistryFactory.getExperimentCatalog(event.getGatewayId());
+ experimentCatalog.update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
+ ProcessIdentifier identifier = new ProcessIdentifier(event.getProcessId(), event.getExperimentId(), event.getGatewayId());
+ ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier);
+ MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
+ AiravataUtils.getId(MessageType.PROCESS.name()), event.getGatewayId());
+ msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ statusPublisher.publish(msgCtx);
+ }
}
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.