You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/06 22:19:02 UTC
[2/2] airavata git commit: Fixed AIRAVATA-1620.
Fixed AIRAVATA-1620.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/774b092d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/774b092d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/774b092d
Branch: refs/heads/new-workflow-design-rabbitmq
Commit: 774b092d31e41919b39ca3ce9f1edd5af1c30669
Parents: 44d89ee
Author: shamrath <sh...@gmail.com>
Authored: Fri Mar 6 16:18:44 2015 -0500
Committer: shamrath <sh...@gmail.com>
Committed: Fri Mar 6 16:18:44 2015 -0500
----------------------------------------------------------------------
.../server/OrchestratorServerHandler.java | 31 +-
.../engine/SimpleWorkflowInterpreter.java | 280 ++++++++-----------
.../engine/WorkflowEnactmentService.java | 129 ++++++++-
3 files changed, 259 insertions(+), 181 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/774b092d/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index d168c26..fe306d7 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -52,11 +52,20 @@ import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
import org.apache.airavata.model.util.ExecutionType;
-import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.model.workspace.experiment.ExperimentStatus;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.model.workspace.experiment.TaskStatus;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.cpi.OrchestratorService;
import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl;
import org.apache.airavata.orchestrator.cpi.orchestrator_cpi_serviceConstants;
+import org.apache.airavata.orchestrator.util.DataModelUtils;
import org.apache.airavata.orchestrator.util.OrchestratorServerThreadPoolExecutor;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.Registry;
@@ -64,17 +73,25 @@ import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.TaskDetailConstants;
import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants;
-import org.apache.airavata.orchestrator.util.DataModelUtils;
-import org.apache.airavata.simple.workflow.engine.SimpleWorkflowInterpreter;
import org.apache.airavata.simple.workflow.engine.WorkflowEnactmentService;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
-import org.apache.zookeeper.*;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.io.File;
import java.io.IOException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
public class OrchestratorServerHandler implements OrchestratorService.Iface,
Watcher {
@@ -656,10 +673,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
try {
WorkflowEnactmentService.getInstance().
submitWorkflow(experimentId, airavataCredStoreToken, getGatewayName(), getRabbitMQProcessPublisher());
- } catch (RegistryException e) {
- log.error("Error while launching workflow", e);
} catch (Exception e) {
- log.error("Error while initializing rabbit mq process publisher");
+ log.error("Error while launching workflow", e);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/774b092d/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
index a052e5c..ee7ff6b 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
@@ -24,8 +24,6 @@ package org.apache.airavata.simple.workflow.engine;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher;
import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
@@ -66,7 +64,10 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-public class SimpleWorkflowInterpreter implements Runnable{
+/**
+ * Package-Private class
+ */
+class SimpleWorkflowInterpreter{
private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class);
private List<WorkflowInputNode> workflowInputNodes;
@@ -102,8 +103,11 @@ public class SimpleWorkflowInterpreter implements Runnable{
this.publisher = publisher;
}
-
- public void launchWorkflow() throws Exception {
+ /**
+ * Package-Private method.
+ * @throws Exception
+ */
+ void launchWorkflow() throws Exception {
WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance();
WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken);
log.debug("Initialized workflow parser");
@@ -111,15 +115,16 @@ public class SimpleWorkflowInterpreter implements Runnable{
log.debug("Parsed the workflow and got the workflow input nodes");
// process workflow input nodes
processWorkflowInputNodes(getWorkflowInputNodes());
- // initialize the rabbitmq status consumer
- statusConsumer = new RabbitMQStatusConsumer();
- consumerId = statusConsumer.listen(new TaskMessageHandler());
-
processReadyList();
}
// try to remove synchronization tag
- private synchronized void processReadyList() throws RegistryException, AiravataException {
+ /**
+ * Package-Private method.
+ * @throws RegistryException
+ * @throws AiravataException
+ */
+ void processReadyList() throws RegistryException, AiravataException {
for (WorkflowNode readyNode : readyList.values()) {
if (readyNode instanceof WorkflowOutputNode) {
WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode;
@@ -232,10 +237,11 @@ public class SimpleWorkflowInterpreter implements Runnable{
}
/**
+ * Package-Private method.
* Remove the workflow node from waiting queue and add it to the ready queue.
* @param workflowNode - Workflow Node
*/
- private synchronized void addToReadyQueue(WorkflowNode workflowNode) {
+ synchronized void addToReadyQueue(WorkflowNode workflowNode) {
waitingList.remove(workflowNode.getId());
readyList.put(workflowNode.getId(), workflowNode);
}
@@ -265,31 +271,8 @@ public class SimpleWorkflowInterpreter implements Runnable{
readyList.remove(wfOutputNode.getId());
}
- @Override
- public void run() {
- try {
- log.debug("Launching workflow");
- launchWorkflow();
- while (continueWorkflow && !(waitingList.isEmpty() && readyList.isEmpty())) {
-// processReadyList();
- Thread.sleep(1000);
- }
- if (continueWorkflow) {
- log.info("Successfully launched workflow for experiment : " + getExperiment().getExperimentID());
- } else if (!(waitingList.isEmpty() || readyList.isEmpty())) {
- log.error("Workflow couldn't execute all workflow nodes due to an error");
- }
- } catch (Exception e) {
- log.error("Error launching workflow", e);
- } finally {
- try {
- statusConsumer.stopListen(consumerId);
- log.info("Successfully un-bind status consumer for experiment " + getExperiment().getExperimentID());
- } catch (AiravataException e) {
- log.error("Error while un-binding status consumer: " + consumerId + " for experiment "
- + getExperiment().getExperimentID());
- }
- }
+ boolean isAllDone() {
+ return !continueWorkflow || (waitingList.isEmpty() && readyList.isEmpty());
}
private void setExperiment(String experimentId) throws RegistryException {
@@ -297,147 +280,108 @@ public class SimpleWorkflowInterpreter implements Runnable{
log.debug("Retrieve Experiment for experiment id : " + experimentId);
}
- class TaskMessageHandler implements MessageHandler{
-
- @Override
- public Map<String, Object> getProperties() {
- Map<String, Object> props = new HashMap<String, Object>();
- String gatewayId = "*";
- String experimentId = getExperiment().getExperimentID();
- List<String> routingKeys = new ArrayList<String>();
-// routingKeys.add(gatewayName+ "." + getExperiment().getExperimentID() + ".*");
- routingKeys.add(gatewayId);
- routingKeys.add(gatewayId + "." + experimentId);
- routingKeys.add(gatewayId + "." + experimentId+ ".*");
- routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
- props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
- return props;
- }
-
- @Override
- public void onMessage(MessageContext msgCtx) {
- String message;
- if (msgCtx.getType() == MessageType.TASK) {
- TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
- TaskIdentifier taskIdentifier = event.getTaskIdentity();
- handleTaskStatusChangeEvent(event);
- message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId();
- log.debug(message);
- }else if (msgCtx.getType() == MessageType.TASKOUTPUT) {
- TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
- TaskIdentifier taskIdentifier = event.getTaskIdentity();
- handleTaskOutputChangeEvent(event);
- message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId();
- log.debug(message);
- } else {
- // not interesting, ignores
- }
- }
-
- private void handleTaskOutputChangeEvent(TaskOutputChangeEvent taskOutputChangeEvent) {
-
- String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId();
- log.debug("Task Output changed event received for workflow node : " +
- taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
- ProcessContext processContext = processingQueue.get(taskId);
- Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
- if (processContext != null) {
- WorkflowNode workflowNode = processContext.getWorkflowNode();
- if (workflowNode instanceof ApplicationNode) {
- ApplicationNode applicationNode = (ApplicationNode) workflowNode;
- // Workflow node can have one to many output ports and each output port can have one to many links
- for (OutPort outPort : applicationNode.getOutputPorts()) {
- for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) {
- if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
- outPort.getOutputObject().setValue(outputDataObjectType.getValue());
- break;
- }
+ synchronized void handleTaskOutputChangeEvent(TaskOutputChangeEvent taskOutputChangeEvent) {
+
+ String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId();
+ log.debug("Task Output changed event received for workflow node : " +
+ taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
+ ProcessContext processContext = processingQueue.get(taskId);
+ Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
+ if (processContext != null) {
+ WorkflowNode workflowNode = processContext.getWorkflowNode();
+ if (workflowNode instanceof ApplicationNode) {
+ ApplicationNode applicationNode = (ApplicationNode) workflowNode;
+ // Workflow node can have one to many output ports and each output port can have one to many links
+ for (OutPort outPort : applicationNode.getOutputPorts()) {
+ for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) {
+ if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
+ outPort.getOutputObject().setValue(outputDataObjectType.getValue());
+ break;
}
- for (Edge edge : outPort.getOutEdges()) {
- edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue());
- if (edge.getToPort().getNode().isReady()) {
- addToReadyQueue(edge.getToPort().getNode());
- }
+ }
+ for (Edge edge : outPort.getOutEdges()) {
+ edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue());
+ if (edge.getToPort().getNode().isReady()) {
+ addToReadyQueue(edge.getToPort().getNode());
}
}
}
- addToCompleteQueue(processContext);
- log.debug("removed task from processing queue : " + taskId);
- try {
- processReadyList();
- } catch (Exception e) {
- log.error("Error while processing ready workflow nodes", e);
- continueWorkflow = false;
- }
+ }
+ addToCompleteQueue(processContext);
+ log.debug("removed task from processing queue : " + taskId);
+ try {
+ processReadyList();
+ } catch (Exception e) {
+ log.error("Error while processing ready workflow nodes", e);
+ continueWorkflow = false;
}
}
+ }
- private void handleTaskStatusChangeEvent(TaskStatusChangeEvent taskStatusChangeEvent) {
- TaskState taskState = taskStatusChangeEvent.getState();
- TaskIdentifier taskIdentity = taskStatusChangeEvent.getTaskIdentity();
- String taskId = taskIdentity.getTaskId();
- ProcessContext processContext = processingQueue.get(taskId);
- if (processContext != null) {
- WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED;
- switch (taskState) {
- case WAITING:
- break;
- case STARTED:
- break;
- case PRE_PROCESSING:
- wfNodeState = WorkflowNodeState.INVOKED;
- processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
- break;
- case INPUT_DATA_STAGING:
- wfNodeState = WorkflowNodeState.INVOKED;
- processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
- break;
- case EXECUTING:
- wfNodeState = WorkflowNodeState.EXECUTING;
- processContext.getWorkflowNode().setState(NodeState.EXECUTING);
- break;
- case OUTPUT_DATA_STAGING:
- wfNodeState = WorkflowNodeState.COMPLETED;
- processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
- break;
- case POST_PROCESSING:
- wfNodeState = WorkflowNodeState.COMPLETED;
- processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
- break;
- case COMPLETED:
- wfNodeState = WorkflowNodeState.COMPLETED;
- processContext.getWorkflowNode().setState(NodeState.EXECUTED);
- break;
- case FAILED:
- wfNodeState = WorkflowNodeState.FAILED;
- processContext.getWorkflowNode().setState(NodeState.FAILED);
- break;
- case UNKNOWN:
- wfNodeState = WorkflowNodeState.UNKNOWN;
- break;
- case CONFIGURING_WORKSPACE:
- wfNodeState = WorkflowNodeState.COMPLETED;
- break;
- case CANCELED:
- case CANCELING:
- wfNodeState = WorkflowNodeState.CANCELED;
- processContext.getWorkflowNode().setState(NodeState.FAILED);
- break;
- default:
- break;
- }
- if (wfNodeState != WorkflowNodeState.UNKNOWN) {
- try {
- updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState);
- } catch (RegistryException e) {
- log.error("Error while updating workflow node status update to the registry. nodeInstanceId :"
- + processContext.getWfNodeDetails().getNodeInstanceId() + " status to: "
- + processContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e);
- }
+ void handleTaskStatusChangeEvent(TaskStatusChangeEvent taskStatusChangeEvent) {
+ TaskState taskState = taskStatusChangeEvent.getState();
+ TaskIdentifier taskIdentity = taskStatusChangeEvent.getTaskIdentity();
+ String taskId = taskIdentity.getTaskId();
+ ProcessContext processContext = processingQueue.get(taskId);
+ if (processContext != null) {
+ WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED;
+ switch (taskState) {
+ case WAITING:
+ break;
+ case STARTED:
+ break;
+ case PRE_PROCESSING:
+ wfNodeState = WorkflowNodeState.INVOKED;
+ processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
+ break;
+ case INPUT_DATA_STAGING:
+ wfNodeState = WorkflowNodeState.INVOKED;
+ processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
+ break;
+ case EXECUTING:
+ wfNodeState = WorkflowNodeState.EXECUTING;
+ processContext.getWorkflowNode().setState(NodeState.EXECUTING);
+ break;
+ case OUTPUT_DATA_STAGING:
+ wfNodeState = WorkflowNodeState.COMPLETED;
+ processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+ break;
+ case POST_PROCESSING:
+ wfNodeState = WorkflowNodeState.COMPLETED;
+ processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+ break;
+ case COMPLETED:
+ wfNodeState = WorkflowNodeState.COMPLETED;
+ processContext.getWorkflowNode().setState(NodeState.EXECUTED);
+ break;
+ case FAILED:
+ wfNodeState = WorkflowNodeState.FAILED;
+ processContext.getWorkflowNode().setState(NodeState.FAILED);
+ break;
+ case UNKNOWN:
+ wfNodeState = WorkflowNodeState.UNKNOWN;
+ break;
+ case CONFIGURING_WORKSPACE:
+ wfNodeState = WorkflowNodeState.COMPLETED;
+ break;
+ case CANCELED:
+ case CANCELING:
+ wfNodeState = WorkflowNodeState.CANCELED;
+ processContext.getWorkflowNode().setState(NodeState.FAILED);
+ break;
+ default:
+ break;
+ }
+ if (wfNodeState != WorkflowNodeState.UNKNOWN) {
+ try {
+ updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState);
+ } catch (RegistryException e) {
+ log.error("Error while updating workflow node status update to the registry. nodeInstanceId :"
+ + processContext.getWfNodeDetails().getNodeInstanceId() + " status to: "
+ + processContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e);
}
}
-
}
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/774b092d/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java
index ec5acfa..c7ab7b9 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java
@@ -21,23 +21,46 @@
package org.apache.airavata.simple.workflow.engine;
+import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher;
-import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class WorkflowEnactmentService {
private static WorkflowEnactmentService workflowEnactmentService;
+ private final RabbitMQStatusConsumer statusConsumer;
+ private String consumerId;
private ExecutorService executor;
+ private Map<String,SimpleWorkflowInterpreter> workflowMap;
- private WorkflowEnactmentService () {
+ private WorkflowEnactmentService () throws AiravataException {
executor = Executors.newFixedThreadPool(getThreadPoolSize());
+ workflowMap = new ConcurrentHashMap<String, SimpleWorkflowInterpreter>();
+ statusConsumer = new RabbitMQStatusConsumer();
+ consumerId = statusConsumer.listen(new TaskMessageHandler());
+ // register the shutdown hook to un-bind status consumer.
+ Runtime.getRuntime().addShutdownHook(new EnactmentShutDownHook());
}
- public static WorkflowEnactmentService getInstance(){
+ public static WorkflowEnactmentService getInstance() throws AiravataException {
if (workflowEnactmentService == null) {
synchronized (WorkflowEnactmentService.class) {
if (workflowEnactmentService == null) {
@@ -51,14 +74,110 @@ public class WorkflowEnactmentService {
public void submitWorkflow(String experimentId,
String credentialToken,
String gatewayName,
- RabbitMQProcessPublisher publisher) throws RegistryException {
+ RabbitMQProcessPublisher publisher) throws Exception {
SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter(
experimentId, credentialToken,gatewayName, publisher);
- executor.execute(simpleWorkflowInterpreter);
+ workflowMap.put(experimentId, simpleWorkflowInterpreter);
+ simpleWorkflowInterpreter.launchWorkflow();
+
}
private int getThreadPoolSize() {
return ServerSettings.getEnactmentThreadPoolSize();
}
+
+ private class TaskMessageHandler implements MessageHandler {
+
+ @Override
+ public Map<String, Object> getProperties() {
+ Map<String, Object> props = new HashMap<String, Object>();
+ String gatewayId = "*";
+ String experimentId = "*";
+ List<String> routingKeys = new ArrayList<String>();
+ routingKeys.add(gatewayId);
+ routingKeys.add(gatewayId + "." + experimentId);
+ routingKeys.add(gatewayId + "." + experimentId+ ".*");
+ routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
+ props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
+ return props;
+ }
+
+ @Override
+ public void onMessage(MessageContext msgCtx) {
+ StatusHandler statusHandler = new StatusHandler(msgCtx);
+ executor.execute(statusHandler);
+ }
+
+
+ }
+
+ private class StatusHandler implements Runnable{
+ private final Logger log = LoggerFactory.getLogger(StatusHandler.class);
+
+ private final MessageContext msgCtx;
+
+ public StatusHandler(MessageContext msgCtx) {
+ this.msgCtx = msgCtx;
+ }
+
+ @Override
+ public void run() {
+ process();
+ }
+
+ private void process() {
+ String message;
+ SimpleWorkflowInterpreter simpleWorkflowInterpreter;
+ if (msgCtx.getType() == MessageType.TASK) {
+ TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
+ TaskIdentifier taskIdentifier = event.getTaskIdentity();
+ simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId());
+ if (simpleWorkflowInterpreter != null) {
+ simpleWorkflowInterpreter.handleTaskStatusChangeEvent(event);
+ } else {
+ // this happens when Task status messages comes after the Taskoutput messages,as we have worked on
+ // output changes it is ok to ignore this.
+ }
+ message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId();
+ log.debug(message);
+ }else if (msgCtx.getType() == MessageType.TASKOUTPUT) {
+ TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
+ TaskIdentifier taskIdentifier = event.getTaskIdentity();
+ simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId());
+ if (simpleWorkflowInterpreter != null) {
+ simpleWorkflowInterpreter.handleTaskOutputChangeEvent(event);
+ if (simpleWorkflowInterpreter.isAllDone()) {
+ workflowMap.remove(taskIdentifier.getExperimentId());
+ }
+ } else {
+ throw new IllegalArgumentException("Error while processing TaskOutputChangeEvent, " +
+ "There is no registered workflow for experiment Id : " + taskIdentifier.getExperimentId());
+ }
+ message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId();
+ log.debug(message);
+ } else {
+ // not interested, ignores
+ }
+ }
+
+ private SimpleWorkflowInterpreter getInterpreter(String experimentId){
+ return workflowMap.get(experimentId);
+ }
+ }
+
+
+ private class EnactmentShutDownHook extends Thread {
+ private final Logger log = LoggerFactory.getLogger(EnactmentShutDownHook.class);
+ @Override
+ public void run() {
+ super.run();
+ try {
+ statusConsumer.stopListen(consumerId);
+ log.info("Successfully un-binded task status consumer");
+ } catch (AiravataException e) {
+ log.error("Error while un-bind enactment status consumer", e);
+ }
+ }
+ }
}