You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2016/02/08 17:49:20 UTC
[38/50] [abbrv] airavata git commit: Fixed all compilation issues of
workflow-core module
Fixed all compilation issues of workflow-core module
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ce795581
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ce795581
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ce795581
Branch: refs/heads/master
Commit: ce7955813f79812a58bc6395914f3a978b0ae1e9
Parents: e21fae7
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Fri Feb 5 16:29:41 2016 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Fri Feb 5 16:29:41 2016 -0500
----------------------------------------------------------------------
.../cpi/impl/SimpleOrchestratorImpl.java | 3 +-
.../server/OrchestratorServerHandler.java | 75 +++----
modules/workflow/workflow-core/pom.xml | 6 +
.../core/SimpleWorkflowInterpreter.java | 197 ++++++-------------
.../airavata/workflow/core/WorkflowContext.java | 60 ------
.../workflow/core/WorkflowEnactmentService.java | 2 +-
.../airavata/workflow/core/WorkflowFactory.java | 1 -
pom.xml | 1 +
8 files changed, 105 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index ff515ef..c875180 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -269,11 +269,10 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
return processModels;
}
- public String createAndSaveTasks(String gatewayId, ExperimentModel experimentModel, ProcessModel processModel) throws OrchestratorException {
+ public String createAndSaveTasks(String gatewayId, ProcessModel processModel, boolean autoSchedule) throws OrchestratorException {
try {
ExperimentCatalog experimentCatalog = orchestratorContext.getRegistry().getExperimentCatalog();
AppCatalog appCatalog = orchestratorContext.getRegistry().getAppCatalog();
- boolean autoSchedule = experimentModel.getUserConfigurationData().isAiravataAutoSchedule();
ComputationalResourceSchedulingModel resourceSchedule = processModel.getResourceSchedule();
String userGivenQueueName = resourceSchedule.getQueueName();
int userGivenWallTime = resourceSchedule.getWallTimeLimit();
http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index a461ba4..977191e 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -136,45 +136,46 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
ExperimentModel experiment = null;
try {
- List<ProcessModel> processes = orchestrator.createProcesses(experimentId, gatewayId);
- experiment = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
- for (ProcessModel processModel : processes){
- String taskDag = orchestrator.createAndSaveTasks(gatewayId, experiment, processModel);
- processModel.setTaskDag(taskDag);
- experimentCatalog.update(ExperimentCatalogModelType.PROCESS,processModel, processModel.getProcessId());
- }
- if (experiment == null) {
- log.error(experimentId, "Error retrieving the Experiment by the given experimentID: {} ", experimentId);
- return false;
- }
+ String experimentNodePath = GFacUtils.getExperimentNodePath (experimentId);
+ ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentNodePath);
+ String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+ ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode);
- if (!validateProcess(experimentId, processes)) {
- log.error("Validating process fails for given experiment Id : {}", experimentId);
- return false;
- }
- ComputeResourcePreference computeResourcePreference = appCatalog.getGatewayProfile().
+ ComputeResourcePreference computeResourcePreference = appCatalog.getGatewayProfile().
getComputeResourcePreference(gatewayId,
experiment.getUserConfigurationData().getComputationalResourceScheduling().getResourceHostId());
- String token = computeResourcePreference.getResourceSpecificCredentialStoreToken();
- if (token == null || token.isEmpty()){
- // try with gateway profile level token
- GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId);
- token = gatewayProfile.getCredentialStoreToken();
- }
- // still the token is empty, then we fail the experiment
- if (token == null || token.isEmpty()){
- log.error("You have not configured credential store token at gateway profile or compute resource preference. Please provide the correct token at gateway profile or compute resource preference.");
- return false;
- }
- String experimentNodePath = GFacUtils.getExperimentNodePath (experimentId);
- ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentNodePath);
- String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
- ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode);
+ String token = computeResourcePreference.getResourceSpecificCredentialStoreToken();
+ if (token == null || token.isEmpty()){
+ // try with gateway profile level token
+ GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId);
+ token = gatewayProfile.getCredentialStoreToken();
+ }
+ // still the token is empty, then we fail the experiment
+ if (token == null || token.isEmpty()){
+ log.error("You have not configured credential store token at gateway profile or compute resource preference. Please provide the correct token at gateway profile or compute resource preference.");
+ return false;
+ }
+ ExperimentType executionType = experiment.getExperimentType();
+ if (executionType == ExperimentType.SINGLE_APPLICATION) {
+ //its an single application execution experiment
+ List<ProcessModel> processes = orchestrator.createProcesses(experimentId, gatewayId);
+ experiment = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+ if (experiment == null) {
+ log.error(experimentId, "Error retrieving the Experiment by the given experimentID: {} ", experimentId);
+ return false;
+ }
+ for (ProcessModel processModel : processes){
+ String taskDag = orchestrator.createAndSaveTasks(gatewayId, processModel, experiment.getUserConfigurationData().isAiravataAutoSchedule());
+ processModel.setTaskDag(taskDag);
+ experimentCatalog.update(ExperimentCatalogModelType.PROCESS,processModel, processModel.getProcessId());
+ }
+
+ if (!validateProcess(experimentId, processes)) {
+ log.error("Validating process fails for given experiment Id : {}", experimentId);
+ return false;
+ }
- ExperimentType executionType = experiment.getExperimentType();
- if (executionType == ExperimentType.SINGLE_APPLICATION) {
- //its an single application execution experiment
- log.debug(experimentId, "Launching single application experiment {}.", experimentId);
+ log.debug(experimentId, "Launching single application experiment {}.", experimentId);
ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED);
status.setReason("submitted all processes");
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
@@ -184,7 +185,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
} else if (executionType == ExperimentType.WORKFLOW) {
//its a workflow execution experiment
log.debug(experimentId, "Launching workflow experiment {}.", experimentId);
- launchWorkflowExperiment(experimentId, token);
+ launchWorkflowExperiment(experimentId, token, gatewayId);
} else {
log.error(experimentId, "Couldn't identify experiment type, experiment {} is neither single application nor workflow.", experimentId);
throw new TException("Experiment '" + experimentId + "' launch failed. Unable to figureout execution type for application " + experiment.getExecutionId());
@@ -367,7 +368,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
}
}
- private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken) throws TException {
+ private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken, String gatewayId) throws TException {
// FIXME
// try {
// WorkflowEnactmentService.getInstance().
http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/workflow/workflow-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/pom.xml b/modules/workflow/workflow-core/pom.xml
index bb1ea79..72990c8 100644
--- a/modules/workflow/workflow-core/pom.xml
+++ b/modules/workflow/workflow-core/pom.xml
@@ -31,6 +31,12 @@
<version>${project.version}</version>
</dependency>
+ <!--Workflow Interpreter dependency-->
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-gfac-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- Airavata default parser dependency -->
<dependency>
<groupId>org.apache.airavata</groupId>
http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
index 7f8a8a5..cdbf2f2 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
@@ -22,35 +22,25 @@
package org.apache.airavata.workflow.core;
import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher;
import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.model.ComponentState;
+import org.apache.airavata.model.ComponentStatus;
import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.experiment.ExperimentModel;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.ProcessIdentifier;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
import org.apache.airavata.model.status.ProcessState;
-import org.apache.airavata.model.util.ExperimentModelUtil;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.core.experiment.catalog.model.Experiment;
import org.apache.airavata.registry.cpi.*;
import org.apache.airavata.workflow.core.dag.edge.Edge;
-import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode;
-import org.apache.airavata.workflow.core.dag.nodes.NodeState;
-import org.apache.airavata.workflow.core.dag.nodes.InputNode;
-import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
-import org.apache.airavata.workflow.core.dag.nodes.OutputNode;
-import org.apache.airavata.workflow.core.dag.port.InPort;
+import org.apache.airavata.workflow.core.dag.nodes.*;
import org.apache.airavata.workflow.core.dag.port.OutPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -68,12 +58,12 @@ class SimpleWorkflowInterpreter{
private String gatewayName;
private String workflowString;
- private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<String, WorkflowNode>();
- private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>();
- private Map<String, WorkflowContext> processingQueue = new ConcurrentHashMap<String, WorkflowContext>();
- private Map<String, WorkflowContext> completeList = new HashMap<String, WorkflowContext>();
+ private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<>();
+ private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<>();
+ private Map<String, WorkflowNode> processingQueue = new ConcurrentHashMap<>();
+ private Map<String, WorkflowNode> completeList = new HashMap<>();
private Registry registry;
- private List<OutputNode> completeWorkflowOutputs = new ArrayList<OutputNode>();
+ private List<OutputNode> completeWorkflowOutputs = new ArrayList<>();
private RabbitMQProcessLaunchPublisher publisher;
private RabbitMQStatusConsumer statusConsumer;
private String consumerId;
@@ -120,10 +110,11 @@ class SimpleWorkflowInterpreter{
processReadyList();
}
- private String getWorkflow() throws AppCatalogException {
+ private String getWorkflow() throws AppCatalogException, WorkflowCatalogException {
WorkflowCatalog workflowCatalog = RegistryFactory.getAppCatalog().getWorkflowCatalog();
//FIXME: parse workflowTemplateId or experimentId
- workflowCatalog.getWorkflow("");
+// workflowCatalog.getWorkflow("");
+ return "";
}
// try to remove synchronization tag
@@ -141,15 +132,16 @@ class SimpleWorkflowInterpreter{
OutputNode outputNode = (OutputNode) readyNode;
outputNode.getOutputObject().setValue(outputNode.getInPort().getInputObject().getValue());
addToCompleteOutputNodeList(outputNode);
- continue;
+ } else if (readyNode instanceof InputNode) {
+ // set input object of applications and add applications to ready List.
+ } else if (readyNode instanceof ApplicationNode) {
+ // call orchestrator to create process for the application
+ } else {
+ throw new RuntimeException("Unsupported workflow node type");
}
- WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
- TaskDetails process = getProcess(workflowNodeDetails);
- WorkflowContext workflowContext = new WorkflowContext(readyNode, workflowNodeDetails, process);
- addToProcessingQueue(workflowContext);
- publishToProcessQueue(process);
}
- if (processingQueue.isEmpty()) {
+
+ if (processingQueue.isEmpty() && waitingList.isEmpty()) {
try {
saveWorkflowOutputs();
} catch (AppCatalogException e) {
@@ -159,64 +151,16 @@ class SimpleWorkflowInterpreter{
}
private void saveWorkflowOutputs() throws AppCatalogException {
- List<OutputDataObjectType> outputDataObjects = new ArrayList<OutputDataObjectType>();
+ List<OutputDataObjectType> outputDataObjects = new ArrayList<>();
for (OutputNode completeWorkflowOutput : completeWorkflowOutputs) {
outputDataObjects.add(completeWorkflowOutput.getOutputObject());
}
- RegistryFactory.getAppCatalog().getWorkflowCatalog()
- .updateWorkflowOutputs(experiment.getApplicationId(), outputDataObjects);
- }
-
-
- private void publishToProcessQueue(TaskDetails process) throws AiravataException {
- ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
- processSubmitEvent.setCredentialToken(credentialToken);
- processSubmitEvent.setTaskId(process.getTaskID());
- MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.TASK, process.getTaskID(), null);
- messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- publisher.publish(messageContext);
- }
-
- private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException {
- // create workflow taskDetails from workflowNodeDetails
- TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromWorkflowNodeDetails(getExperiment(), wfNodeDetails);
- taskDetails.setTaskID(getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK_DETAIL, taskDetails, wfNodeDetails.getNodeInstanceId()).toString());
- return taskDetails;
- }
-
- private WorkflowNodeDetails createWorkflowNodeDetails(WorkflowNode readyNode) throws RegistryException {
- WorkflowNodeDetails wfNodeDetails = ExperimentModelUtil.createWorkflowNode(readyNode.getId(), null);
- ExecutionUnit executionUnit = ExecutionUnit.APPLICATION;
- String executionData = null;
- if (readyNode instanceof ApplicationNode) {
- executionUnit = ExecutionUnit.APPLICATION;
- executionData = ((ApplicationNode) readyNode).getApplicationId();
- setupNodeDetailsInput(((ApplicationNode) readyNode), wfNodeDetails);
- } else if (readyNode instanceof InputNode) {
- executionUnit = ExecutionUnit.INPUT;
- } else if (readyNode instanceof OutputNode) {
- executionUnit = ExecutionUnit.OUTPUT;
- }
- wfNodeDetails.setExecutionUnit(executionUnit);
- wfNodeDetails.setExecutionUnitData(executionData);
- wfNodeDetails.setNodeInstanceId((String) getRegistry().getExperimentCatalog().add(ExpCatChildDataType.WORKFLOW_NODE_DETAIL, wfNodeDetails, getExperiment().getExperimentID()));
- return wfNodeDetails;
+// RegistryFactory.getAppCatalog().getWorkflowCatalog()
+// .updateWorkflowOutputs(experiment.getApplicationId(), outputDataObjects);
}
- private void setupNodeDetailsInput(ApplicationNode readyAppNode, WorkflowNodeDetails wfNodeDetails) {
- if (readyAppNode.isReady()) {
- for (InPort inPort : readyAppNode.getInputPorts()) {
- wfNodeDetails.addToNodeInputs(inPort.getInputObject());
- }
- } else {
- throw new IllegalArgumentException("Application node should be in ready state to set inputs to the " +
- "workflow node details, nodeId = " + readyAppNode.getId());
- }
- }
-
-
private void processWorkflowInputNodes(List<InputNode> inputNodes) {
- Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>();
+ Set<WorkflowNode> tempNodeSet = new HashSet<>();
for (InputNode inputNode : inputNodes) {
if (inputNode.isReady()) {
log.debug("Workflow node : " + inputNode.getId() + " is ready to execute");
@@ -251,16 +195,6 @@ class SimpleWorkflowInterpreter{
return registry;
}
- public Experiment getExperiment() {
- return experiment;
- }
-
- private void updateWorkflowNodeStatus(WorkflowNodeDetails wfNodeDetails, WorkflowNodeState state) throws RegistryException{
- WorkflowNodeStatus status = ExperimentModelUtil.createWorkflowNodeStatus(state);
- wfNodeDetails.setWorkflowNodeStatus(status);
- getRegistry().getExperimentCatalog().update(ExperimentCatalogModelType.WORKFLOW_NODE_STATUS, status, wfNodeDetails.getNodeInstanceId());
- }
-
/**
* Package-Private method.
* Remove the workflow node from waiting queue and add it to the ready queue.
@@ -278,16 +212,16 @@ class SimpleWorkflowInterpreter{
/**
* First remove the node from ready list and then add the WfNodeContainer to the process queue.
* Note that underline data structure of the process queue is a Map.
- * @param workflowContext - has both workflow and correspond workflowNodeDetails and TaskDetails
+ * @param applicationNode - has both workflow and correspond workflowNodeDetails and TaskDetails
*/
- private synchronized void addToProcessingQueue(WorkflowContext workflowContext) {
- readyList.remove(workflowContext.getWorkflowNode().getId());
- processingQueue.put(workflowContext.getTaskDetails().getTaskID(), workflowContext);
+ private synchronized void addToProcessingQueue(ApplicationNode applicationNode) {
+ readyList.remove(applicationNode.getId());
+ processingQueue.put(applicationNode.getId(), applicationNode);
}
- private synchronized void addToCompleteQueue(WorkflowContext workflowContext) {
- processingQueue.remove(workflowContext.getTaskDetails().getTaskID());
- completeList.put(workflowContext.getTaskDetails().getTaskID(), workflowContext);
+ private synchronized void addToCompleteQueue(ApplicationNode applicationNode) {
+ processingQueue.remove(applicationNode.getId());
+ completeList.put(applicationNode.getId(), applicationNode);
}
@@ -301,19 +235,18 @@ class SimpleWorkflowInterpreter{
}
private void setExperiment(String experimentId) throws RegistryException {
- experiment = (Experiment) getRegistry().getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+ experiment = (ExperimentModel) getRegistry().getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
log.debug("Retrieve Experiment for experiment id : " + experimentId);
}
- synchronized void handleTaskOutputChangeEvent(TaskOutputChangeEvent taskOutputChangeEvent) {
+/* synchronized void handleTaskOutputChangeEvent(ProcessStatusChangeEvent taskOutputChangeEvent) {
String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId();
log.debug("Task Output changed event received for workflow node : " +
taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
- WorkflowContext workflowContext = processingQueue.get(taskId);
- Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
- if (workflowContext != null) {
- WorkflowNode workflowNode = workflowContext.getWorkflowNode();
+ WorkflowNode workflowNode = processingQueue.get(taskId);
+ Set<WorkflowNode> tempWfNodeSet = new HashSet<>();
+ if (workflowNode != null) {
if (workflowNode instanceof ApplicationNode) {
ApplicationNode applicationNode = (ApplicationNode) workflowNode;
// Workflow node can have one to many output ports and each output port can have one to many links
@@ -331,9 +264,9 @@ class SimpleWorkflowInterpreter{
}
}
}
+ addToCompleteQueue(applicationNode);
+ log.debug("removed task from processing queue : " + taskId);
}
- addToCompleteQueue(workflowContext);
- log.debug("removed task from processing queue : " + taskId);
try {
processReadyList();
} catch (Exception e) {
@@ -341,69 +274,55 @@ class SimpleWorkflowInterpreter{
continueWorkflow = false;
}
}
- }
+ }*/
void handleProcessStatusChangeEvent(ProcessStatusChangeEvent processStatusChangeEvent) {
ProcessState processState = processStatusChangeEvent.getState();
ProcessIdentifier processIdentity = processStatusChangeEvent.getProcessIdentity();
String processId = processIdentity.getProcessId();
- WorkflowContext workflowContext = processingQueue.get(processId);
- if (workflowContext != null) {
- WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED;
+ ApplicationNode applicationNode = (ApplicationNode) processingQueue.get(processId);
+ if (applicationNode != null) {
+ ComponentState state = applicationNode.getState();
switch (processState) {
case CREATED:
case VALIDATED:
case STARTED:
break;
case CONFIGURING_WORKSPACE:
- wfNodeState = WorkflowNodeState.COMPLETED;
- break;
case PRE_PROCESSING:
- wfNodeState = WorkflowNodeState.INVOKED;
- workflowContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
- break;
case INPUT_DATA_STAGING:
- wfNodeState = WorkflowNodeState.INVOKED;
- workflowContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
- break;
case EXECUTING:
- wfNodeState = WorkflowNodeState.EXECUTING;
- workflowContext.getWorkflowNode().setState(NodeState.EXECUTING);
- break;
case OUTPUT_DATA_STAGING:
- wfNodeState = WorkflowNodeState.COMPLETED;
- workflowContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
- break;
case POST_PROCESSING:
- wfNodeState = WorkflowNodeState.COMPLETED;
- workflowContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+ state = ComponentState.RUNNING;
break;
case COMPLETED:
- wfNodeState = WorkflowNodeState.COMPLETED;
- workflowContext.getWorkflowNode().setState(NodeState.EXECUTED);
+ state = ComponentState.COMPLETED;
break;
case FAILED:
- wfNodeState = WorkflowNodeState.FAILED;
- workflowContext.getWorkflowNode().setState(NodeState.FAILED);
+ state = ComponentState.FAILED;
break;
case CANCELED:
case CANCELLING:
- wfNodeState = WorkflowNodeState.CANCELED;
- workflowContext.getWorkflowNode().setState(NodeState.FAILED);
+ state = ComponentState.CANCELED;
break;
default:
break;
}
- if (wfNodeState != WorkflowNodeState.UNKNOWN) {
+ if (state != applicationNode.getState()) {
try {
- updateWorkflowNodeStatus(workflowContext.getWfNodeDetails(), wfNodeState);
+ updateWorkflowNodeStatus(applicationNode, new ComponentStatus(state));
} catch (RegistryException e) {
- log.error("Error while updating workflow node status update to the registry. nodeInstanceId :"
- + workflowContext.getWfNodeDetails().getNodeInstanceId() + " status to: "
- + workflowContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e);
+ log.error("Error! Couldn't update new application state to registry. nodeInstanceId : {} "
+ + applicationNode.getId() + " status to: " + applicationNode.getState().toString() , e);
}
}
}
}
+
+ private void updateWorkflowNodeStatus(ApplicationNode applicationNode, ComponentStatus componentStatus) throws RegistryException {
+ // FIXME: save new workflow node status to registry.
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowContext.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowContext.java
deleted file mode 100644
index 47bd9ca..0000000
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowContext.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.core;
-
-import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
-
-public class WorkflowContext {
- private WorkflowNode workflowNode;
- private WorkflowNodeDetails wfNodeDetails;
- private TaskDetails taskDetails;
-
- public WorkflowContext(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) {
- this.workflowNode = workflowNode;
- this.wfNodeDetails = wfNodeDetails;
- this.taskDetails = taskDetails;
- }
-
- public WorkflowNode getWorkflowNode() {
- return workflowNode;
- }
-
- public void setWorkflowNode(WorkflowNode workflowNode) {
- this.workflowNode = workflowNode;
- }
-
- public WorkflowNodeDetails getWfNodeDetails() {
- return wfNodeDetails;
- }
-
- public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) {
- this.wfNodeDetails = wfNodeDetails;
- }
-
- public TaskDetails getTaskDetails() {
- return taskDetails;
- }
-
- public void setTaskDetails(TaskDetails taskDetails) {
- this.taskDetails = taskDetails;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
index aaa3073..34ef8a7 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
@@ -142,7 +142,7 @@ public class WorkflowEnactmentService {
TaskIdentifier taskIdentifier = event.getTaskIdentity();
simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId());
if (simpleWorkflowInterpreter != null) {
- simpleWorkflowInterpreter.handleTaskOutputChangeEvent(event);
+// simpleWorkflowInterpreter.handleTaskOutputChangeEvent(event);
if (simpleWorkflowInterpreter.isAllDone()) {
workflowMap.remove(taskIdentifier.getExperimentId());
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
index e06fab5..9392461 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
@@ -23,7 +23,6 @@ package org.apache.airavata.workflow.core;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.workflow.core.parser.AiravataWorkflowBuilder;
import org.apache.airavata.workflow.core.parser.JsonWorkflowParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c82cc8d..e41e3f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -557,6 +557,7 @@
<module>modules/credential-store</module>
<module>modules/orchestrator</module>
<module>modules/server</module>
+ <module>modules/workflow</module>
<module>modules/test-suite</module>
<!-- Deprecated Modules-->
<!--<module>modules/integration-tests</module>-->