You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/04 20:22:33 UTC
[1/2] airavata git commit: Fixed AIRAVATA-1619. and removed
WorkflowUtil class.
Repository: airavata
Updated Branches:
refs/heads/new-workflow-design-rabbitmq 366ada032 -> 917adad5b
Fixed AIRAVATA-1619. and removed WorkflowUtil class.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/71db390f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/71db390f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/71db390f
Branch: refs/heads/new-workflow-design-rabbitmq
Commit: 71db390f49bd4a129ee08e6ae20e87edce0c7115
Parents: 366ada0
Author: shamrath <sh...@gmail.com>
Authored: Wed Mar 4 14:15:25 2015 -0500
Committer: shamrath <sh...@gmail.com>
Committed: Wed Mar 4 14:15:25 2015 -0500
----------------------------------------------------------------------
.../simple/workflow/engine/WorkflowUtil.java | 63 --------------------
.../engine/parser/AiravataWorkflowParser.java | 2 +-
2 files changed, 1 insertion(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/71db390f/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowUtil.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowUtil.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowUtil.java
deleted file mode 100644
index a2b69ae..0000000
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowUtil.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.simple.workflow.engine;
-
-import com.google.common.eventbus.EventBus;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.messaging.event.TaskIdentifier;
-import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
-import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.model.workspace.experiment.TaskState;
-import org.apache.airavata.persistance.registry.jpa.model.TaskDetail;
-
-public class WorkflowUtil {
-
- public static InputDataObjectType copyValues(InputDataObjectType fromInputObj, InputDataObjectType toInputObj){
- if (toInputObj == null) {
- // TODO : throw an error
- }
- toInputObj.setValue(fromInputObj.getValue());
- if (fromInputObj.getApplicationArgument() != null
- && !fromInputObj.getApplicationArgument().trim().equals("")) {
- toInputObj.setApplicationArgument(fromInputObj.getApplicationArgument());
- }
- if (toInputObj.getType() == null) {
- toInputObj.setType(fromInputObj.getType());
- }
- return fromInputObj;
- }
-
- public static InputDataObjectType copyValues(OutputDataObjectType outputData, InputDataObjectType inputData) {
- inputData.setValue(outputData.getValue());
- return inputData;
- }
-
-
- public static OutputDataObjectType copyValues(InputDataObjectType inputObject, OutputDataObjectType outputObject) {
- if (outputObject == null) {
- outputObject = new OutputDataObjectType();
- }
- outputObject.setValue(inputObject.getValue());
- return outputObject;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/71db390f/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
index 673fbdc..a430879 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
@@ -101,7 +101,7 @@ public class AiravataWorkflowParser implements WorkflowParser {
}
for (Node gNode : gNodes) {
wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName());
- wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getName()));
+ wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getId()));
if (wfInputNode.getInputObject() == null) {
// TODO: throw an error and exit.
}
[2/2] airavata git commit: Fixed AIRAVATA-1618,
and optimized the imports.
Posted by sh...@apache.org.
Fixed AIRAVATA-1618, and optimized the imports.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/917adad5
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/917adad5
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/917adad5
Branch: refs/heads/new-workflow-design-rabbitmq
Commit: 917adad5bbec9500a0524eeb7a3ce3763c89d961
Parents: 71db390
Author: shamrath <sh...@gmail.com>
Authored: Wed Mar 4 14:21:08 2015 -0500
Committer: shamrath <sh...@gmail.com>
Committed: Wed Mar 4 14:21:08 2015 -0500
----------------------------------------------------------------------
.../engine/SimpleWorkflowInterpreter.java | 112 ++++++++++---------
.../workflow/engine/WorkflowFactoryImpl.java | 2 -
.../workflow/engine/dag/edge/DirectedEdge.java | 2 +-
.../engine/dag/nodes/ApplicationNodeImpl.java | 7 +-
.../workflow/engine/dag/nodes/NodeState.java | 28 +++--
.../engine/dag/nodes/WorkflowInputNodeImpl.java | 7 +-
.../dag/nodes/WorkflowOutputNodeImpl.java | 7 +-
.../engine/parser/AiravataWorkflowParser.java | 24 ++--
.../workflow/engine/parser/PortContainer.java | 2 +-
.../parser/AiravataWorkflowParserTest.java | 2 +-
10 files changed, 111 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
index 8eb5d5e..a052e5c 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
@@ -55,7 +55,6 @@ import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
-import org.apache.airavata.simple.workflow.engine.parser.AiravataWorkflowParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,7 +77,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
private String gatewayName;
- private Map<String, WorkflowNode> readList = new ConcurrentHashMap<String, WorkflowNode>();
+ private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<String, WorkflowNode>();
private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>();
private Map<String, ProcessContext> processingQueue = new ConcurrentHashMap<String, ProcessContext>();
private Map<String, ProcessContext> completeList = new HashMap<String, ProcessContext>();
@@ -87,6 +86,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
private RabbitMQProcessPublisher publisher;
private RabbitMQStatusConsumer statusConsumer;
private String consumerId;
+ private boolean continueWorkflow = true;
public SimpleWorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessPublisher publisher) throws RegistryException {
this.gatewayName = gatewayName;
@@ -111,33 +111,27 @@ public class SimpleWorkflowInterpreter implements Runnable{
log.debug("Parsed the workflow and got the workflow input nodes");
// process workflow input nodes
processWorkflowInputNodes(getWorkflowInputNodes());
-
-
+ // initialize the rabbitmq status consumer
statusConsumer = new RabbitMQStatusConsumer();
consumerId = statusConsumer.listen(new TaskMessageHandler());
+
+ processReadyList();
}
// try to remove synchronization tag
- private synchronized void processReadyList() {
- for (WorkflowNode readyNode : readList.values()) {
- try {
- if (readyNode instanceof WorkflowOutputNode) {
- WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode;
- wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue());
- addToCompleteOutputNodeList(wfOutputNode);
- continue;
- }
- WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
- TaskDetails process = getProcess(workflowNodeDetails);
- ProcessContext processContext = new ProcessContext(readyNode, workflowNodeDetails, process);
- addToProcessingQueue(processContext);
- publishToProcessQueue(process);
-// publishToProcessQueue(processPack);
- } catch (RegistryException e) {
- // FIXME : handle this exception
- } catch (AiravataException e) {
- log.error("Error while publishing process to the process queue");
+ private synchronized void processReadyList() throws RegistryException, AiravataException {
+ for (WorkflowNode readyNode : readyList.values()) {
+ if (readyNode instanceof WorkflowOutputNode) {
+ WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode;
+ wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue());
+ addToCompleteOutputNodeList(wfOutputNode);
+ continue;
}
+ WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
+ TaskDetails process = getProcess(workflowNodeDetails);
+ ProcessContext processContext = new ProcessContext(readyNode, workflowNodeDetails, process);
+ addToProcessingQueue(processContext);
+ publishToProcessQueue(process);
}
}
@@ -149,11 +143,6 @@ public class SimpleWorkflowInterpreter implements Runnable{
MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.TASK, process.getTaskID(), null);
messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
publisher.publish(messageContext);
-
-
-// Thread thread = new Thread(new TempPublisher(process, eventBus));
-// thread.start();
- //TODO: publish to process queue.
}
private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException {
@@ -171,6 +160,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
if (readyNode instanceof ApplicationNode) {
executionUnit = ExecutionUnit.APPLICATION;
executionData = ((ApplicationNode) readyNode).getApplicationId();
+ setupNodeDetailsInput(((ApplicationNode) readyNode), wfNodeDetails);
} else if (readyNode instanceof WorkflowInputNode) {
executionUnit = ExecutionUnit.INPUT;
} else if (readyNode instanceof WorkflowOutputNode) {
@@ -178,25 +168,19 @@ public class SimpleWorkflowInterpreter implements Runnable{
}
wfNodeDetails.setExecutionUnit(executionUnit);
wfNodeDetails.setExecutionUnitData(executionData);
- setupNodeDetailsInput(readyNode, wfNodeDetails);
wfNodeDetails.setNodeInstanceId((String) getRegistry()
.add(ChildDataType.WORKFLOW_NODE_DETAIL, wfNodeDetails, getExperiment().getExperimentID()));
-// nodeInstanceList.put(node, wfNodeDetails);
return wfNodeDetails;
}
- private void setupNodeDetailsInput(WorkflowNode readyNode, WorkflowNodeDetails wfNodeDetails) {
- if (readyNode instanceof ApplicationNode) {
- ApplicationNode applicationNode = (ApplicationNode) readyNode;
- if (applicationNode.isReady()) {
- for (InPort inPort : applicationNode.getInputPorts()) {
- wfNodeDetails.addToNodeInputs(inPort.getInputObject());
- }
- } else {
- // TODO: handle this scenario properly.
+ private void setupNodeDetailsInput(ApplicationNode readyAppNode, WorkflowNodeDetails wfNodeDetails) {
+ if (readyAppNode.isReady()) {
+ for (InPort inPort : readyAppNode.getInputPorts()) {
+ wfNodeDetails.addToNodeInputs(inPort.getInputObject());
}
} else {
- // TODO: do we support for other type of workflow nodes ?
+ throw new IllegalArgumentException("Application node should be in ready state to set inputs to the " +
+ "workflow node details, nodeId = " + readyAppNode.getId());
}
}
@@ -253,7 +237,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
*/
private synchronized void addToReadyQueue(WorkflowNode workflowNode) {
waitingList.remove(workflowNode.getId());
- readList.put(workflowNode.getId(), workflowNode);
+ readyList.put(workflowNode.getId(), workflowNode);
}
private void addToWaitingQueue(WorkflowNode workflowNode) {
@@ -266,7 +250,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
* @param processContext - has both workflow and correspond workflowNodeDetails and TaskDetails
*/
private synchronized void addToProcessingQueue(ProcessContext processContext) {
- readList.remove(processContext.getWorkflowNode().getId());
+ readyList.remove(processContext.getWorkflowNode().getId());
processingQueue.put(processContext.getTaskDetails().getTaskID(), processContext);
}
@@ -278,7 +262,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
private void addToCompleteOutputNodeList(WorkflowOutputNode wfOutputNode) {
completeWorkflowOutputs.add(wfOutputNode);
- readList.remove(wfOutputNode.getId());
+ readyList.remove(wfOutputNode.getId());
}
@Override
@@ -286,15 +270,25 @@ public class SimpleWorkflowInterpreter implements Runnable{
try {
log.debug("Launching workflow");
launchWorkflow();
- while (!(waitingList.isEmpty() && readList.isEmpty())) {
- processReadyList();
+ while (continueWorkflow && !(waitingList.isEmpty() && readyList.isEmpty())) {
+// processReadyList();
Thread.sleep(1000);
}
- log.info("Successfully launched workflow for experiment : " + getExperiment().getExperimentID());
- statusConsumer.stopListen(consumerId);
- log.info("Successfully un-bind status consumer for experiment " + getExperiment().getExperimentID());
+ if (continueWorkflow) {
+ log.info("Successfully launched workflow for experiment : " + getExperiment().getExperimentID());
+ } else if (!(waitingList.isEmpty() || readyList.isEmpty())) {
+ log.error("Workflow couldn't execute all workflow nodes due to an error");
+ }
} catch (Exception e) {
log.error("Error launching workflow", e);
+ } finally {
+ try {
+ statusConsumer.stopListen(consumerId);
+ log.info("Successfully un-bind status consumer for experiment " + getExperiment().getExperimentID());
+ } catch (AiravataException e) {
+ log.error("Error while un-binding status consumer: " + consumerId + " for experiment "
+ + getExperiment().getExperimentID());
+ }
}
}
@@ -369,6 +363,12 @@ public class SimpleWorkflowInterpreter implements Runnable{
}
addToCompleteQueue(processContext);
log.debug("removed task from processing queue : " + taskId);
+ try {
+ processReadyList();
+ } catch (Exception e) {
+ log.error("Error while processing ready workflow nodes", e);
+ continueWorkflow = false;
+ }
}
}
@@ -378,39 +378,49 @@ public class SimpleWorkflowInterpreter implements Runnable{
String taskId = taskIdentity.getTaskId();
ProcessContext processContext = processingQueue.get(taskId);
if (processContext != null) {
- WorkflowNodeState wfNodeState = WorkflowNodeState.UNKNOWN;
+ WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED;
switch (taskState) {
case WAITING:
break;
case STARTED:
break;
case PRE_PROCESSING:
+ wfNodeState = WorkflowNodeState.INVOKED;
processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
break;
case INPUT_DATA_STAGING:
+ wfNodeState = WorkflowNodeState.INVOKED;
processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
break;
case EXECUTING:
+ wfNodeState = WorkflowNodeState.EXECUTING;
processContext.getWorkflowNode().setState(NodeState.EXECUTING);
break;
case OUTPUT_DATA_STAGING:
+ wfNodeState = WorkflowNodeState.COMPLETED;
processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
break;
case POST_PROCESSING:
+ wfNodeState = WorkflowNodeState.COMPLETED;
processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
break;
case COMPLETED:
+ wfNodeState = WorkflowNodeState.COMPLETED;
processContext.getWorkflowNode().setState(NodeState.EXECUTED);
break;
case FAILED:
+ wfNodeState = WorkflowNodeState.FAILED;
processContext.getWorkflowNode().setState(NodeState.FAILED);
break;
case UNKNOWN:
+ wfNodeState = WorkflowNodeState.UNKNOWN;
break;
case CONFIGURING_WORKSPACE:
+ wfNodeState = WorkflowNodeState.COMPLETED;
break;
case CANCELED:
case CANCELING:
+ wfNodeState = WorkflowNodeState.CANCELED;
processContext.getWorkflowNode().setState(NodeState.FAILED);
break;
default:
@@ -420,7 +430,9 @@ public class SimpleWorkflowInterpreter implements Runnable{
try {
updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState);
} catch (RegistryException e) {
- // TODO: handle this.
+ log.error("Error while updating workflow node status update to the registry. nodeInstanceId :"
+ + processContext.getWfNodeDetails().getNodeInstanceId() + " status to: "
+ + processContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e);
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
index 23fc4c2..e70f062 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
@@ -23,13 +23,11 @@ package org.apache.airavata.simple.workflow.engine;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.airavata.simple.workflow.engine.parser.AiravataWorkflowParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
/**
* Singleton class, only one instance can exist in runtime.
http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java
index 3bc380d..ae7498a 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java
@@ -21,8 +21,8 @@
package org.apache.airavata.simple.workflow.engine.dag.edge;
-import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
public class DirectedEdge implements Edge {
http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
index 52b0595..1233a9d 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
@@ -72,8 +72,11 @@ public class ApplicationNodeImpl implements ApplicationNode {
@Override
public void setState(NodeState newState) {
- // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
- myState = newState;
+ if (newState.getLevel() > myState.getLevel()) {
+ myState = newState;
+ } else {
+ throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString());
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java
index 333fcb2..edbeec5 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java
@@ -22,13 +22,23 @@
package org.apache.airavata.simple.workflow.engine.dag.nodes;
public enum NodeState {
- WAITING, // waiting on inputs
- READY, // all inputs are available and ready to execute
- QUEUED, //
- PRE_PROCESSING, //
- EXECUTING, // task has been submitted , not yet finish
- EXECUTED, // task executed
- POST_PROCESSING, //
- FAILED,
- COMPLETE // all works done
+ WAITING(0), // waiting on inputs
+ READY(1), // all inputs are available and ready to execute
+ QUEUED(2), //
+ PRE_PROCESSING(3), //
+ EXECUTING(4), // task has been submitted , not yet finish
+ EXECUTED(5), // task executed
+ POST_PROCESSING(6), //
+ FAILED(7),
+ COMPLETE(8); // all works done
+
+ private int level;
+
+ NodeState(int level) {
+ this.level = level;
+ }
+
+ public int getLevel() {
+ return level;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
index b3dfa62..7ba8908 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
@@ -62,8 +62,11 @@ public class WorkflowInputNodeImpl implements WorkflowInputNode {
@Override
public void setState(NodeState newState) {
- // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
- myState = newState;
+ if (newState.getLevel() > myState.getLevel()) {
+ myState = newState;
+ } else {
+ throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString());
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
index 5924212..6c80517 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
@@ -63,8 +63,11 @@ public class WorkflowOutputNodeImpl implements WorkflowOutputNode {
@Override
public void setState(NodeState newState) {
- // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
- myState = newState;
+ if (newState.getLevel() > myState.getLevel()) {
+ myState = newState;
+ } else {
+ throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString());
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
index a430879..f7d53be 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
@@ -31,9 +31,19 @@ import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.airavata.simple.workflow.engine.WorkflowParser;
+import org.apache.airavata.simple.workflow.engine.dag.edge.DirectedEdge;
+import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNodeImpl;
import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNodeImpl;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNodeImpl;
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+import org.apache.airavata.simple.workflow.engine.dag.port.InputPortIml;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
import org.apache.airavata.simple.workflow.engine.dag.port.OutPortImpl;
import org.apache.airavata.workflow.model.component.ComponentException;
import org.apache.airavata.workflow.model.component.system.ConstantComponent;
@@ -49,16 +59,6 @@ import org.apache.airavata.workflow.model.graph.system.SystemDataPort;
import org.apache.airavata.workflow.model.graph.ws.WSNode;
import org.apache.airavata.workflow.model.graph.ws.WSPort;
import org.apache.airavata.workflow.model.wf.Workflow;
-import org.apache.airavata.simple.workflow.engine.WorkflowParser;
-import org.apache.airavata.simple.workflow.engine.dag.edge.DirectedEdge;
-import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNodeImpl;
-import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
-import org.apache.airavata.simple.workflow.engine.dag.port.InputPortIml;
-import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
import java.util.ArrayList;
import java.util.HashMap;
@@ -103,7 +103,7 @@ public class AiravataWorkflowParser implements WorkflowParser {
wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName());
wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getId()));
if (wfInputNode.getInputObject() == null) {
- // TODO: throw an error and exit.
+ throw new RuntimeException("Workflow Input object is not set, workflow node id: " + wfInputNode.getId());
}
portContainers.addAll(processOutPorts(gNode, wfInputNode));
wfInputNodes.add(wfInputNode);
@@ -139,7 +139,7 @@ public class AiravataWorkflowParser implements WorkflowParser {
} else if (wfNode instanceof ApplicationNode) {
wfApplicationNode = (ApplicationNode) wfNode;
} else {
- // TODO : handle this scenario
+ throw new IllegalArgumentException("Only support for ApplicationNode implementation, but found other type for node implementation");
}
inPort.setNode(wfApplicationNode);
wfApplicationNode.addInPort(inPort);
http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/PortContainer.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/PortContainer.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/PortContainer.java
index db3dda5..4ddb8b9 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/PortContainer.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/PortContainer.java
@@ -21,8 +21,8 @@
package org.apache.airavata.simple.workflow.engine.parser;
-import org.apache.airavata.workflow.model.graph.DataPort;
import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+import org.apache.airavata.workflow.model.graph.DataPort;
public class PortContainer {
http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java b/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java
index 6443806..d843abe 100644
--- a/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java
+++ b/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java
@@ -26,9 +26,9 @@ import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
import org.apache.airavata.model.workspace.experiment.Experiment;
import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
import org.apache.airavata.workflow.model.wf.Workflow;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;