You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/26 18:08:15 UTC
[16/50] [abbrv] airavata git commit: Renamed the wrong package name
Renamed the wrong package name
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6bfb9563
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6bfb9563
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6bfb9563
Branch: refs/heads/master
Commit: 6bfb9563a049b943c2ab2287f468a38f8eaf9b78
Parents: 8835097
Author: shamrath <sh...@gmail.com>
Authored: Mon Feb 23 16:51:21 2015 -0500
Committer: shamrath <sh...@gmail.com>
Committed: Mon Feb 23 16:51:21 2015 -0500
----------------------------------------------------------------------
.../simple/workflow/engine/ProcessPack.java | 62 +++
.../engine/SimpleWorkflowInterpreter.java | 470 +++++++++++++++++++
.../simple/workflow/engine/WorkflowFactory.java | 31 ++
.../workflow/engine/WorkflowFactoryImpl.java | 66 +++
.../simple/workflow/engine/WorkflowParser.java | 32 ++
.../simple/workflow/engine/WorkflowUtil.java | 63 +++
.../workflow/engine/dag/edge/DirectedEdge.java | 52 ++
.../simple/workflow/engine/dag/edge/Edge.java | 43 ++
.../engine/dag/nodes/ApplicationNode.java | 41 ++
.../engine/dag/nodes/ApplicationNodeImpl.java | 113 +++++
.../workflow/engine/dag/nodes/NodeState.java | 34 ++
.../workflow/engine/dag/nodes/NodeType.java | 28 ++
.../engine/dag/nodes/WorkflowInputNode.java | 37 ++
.../engine/dag/nodes/WorkflowInputNodeImpl.java | 96 ++++
.../workflow/engine/dag/nodes/WorkflowNode.java | 38 ++
.../engine/dag/nodes/WorkflowOutputNode.java | 37 ++
.../dag/nodes/WorkflowOutputNodeImpl.java | 97 ++++
.../simple/workflow/engine/dag/port/InPort.java | 41 ++
.../workflow/engine/dag/port/InputPortIml.java | 90 ++++
.../workflow/engine/dag/port/OutPort.java | 39 ++
.../workflow/engine/dag/port/OutPortImpl.java | 83 ++++
.../simple/workflow/engine/dag/port/Port.java | 36 ++
.../engine/parser/AiravataDefaultParser.java | 293 ++++++++++++
.../workflow/engine/parser/PortContainer.java | 53 +++
.../simple/workflow/engine/ProcessPack.java | 62 ---
.../engine/SimpleWorkflowInterpreter.java | 470 -------------------
.../simple/workflow/engine/WorkflowFactory.java | 31 --
.../workflow/engine/WorkflowFactoryImpl.java | 66 ---
.../simple/workflow/engine/WorkflowParser.java | 32 --
.../simple/workflow/engine/WorkflowUtil.java | 63 ---
.../workflow/engine/dag/edge/DirectedEdge.java | 52 --
.../simple/workflow/engine/dag/edge/Edge.java | 43 --
.../engine/dag/nodes/ApplicationNode.java | 41 --
.../engine/dag/nodes/ApplicationNodeImpl.java | 113 -----
.../workflow/engine/dag/nodes/NodeState.java | 34 --
.../workflow/engine/dag/nodes/NodeType.java | 28 --
.../engine/dag/nodes/WorkflowInputNode.java | 37 --
.../engine/dag/nodes/WorkflowInputNodeImpl.java | 96 ----
.../workflow/engine/dag/nodes/WorkflowNode.java | 38 --
.../engine/dag/nodes/WorkflowOutputNode.java | 37 --
.../dag/nodes/WorkflowOutputNodeImpl.java | 97 ----
.../simple/workflow/engine/dag/port/InPort.java | 41 --
.../workflow/engine/dag/port/InputPortIml.java | 90 ----
.../workflow/engine/dag/port/OutPort.java | 39 --
.../workflow/engine/dag/port/OutPortImpl.java | 83 ----
.../simple/workflow/engine/dag/port/Port.java | 36 --
.../engine/parser/AiravataDefaultParser.java | 293 ------------
.../workflow/engine/parser/PortContainer.java | 53 ---
.../simple/workflow/engine/WorkflowDAGTest.java | 46 ++
.../parser/AiravataDefaultParserTest.java | 119 +++++
.../simple/workflow/engine/WorkflowDAGTest.java | 46 --
.../parser/AiravataDefaultParserTest.java | 119 -----
52 files changed, 2140 insertions(+), 2140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java
new file mode 100644
index 0000000..b58b947
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine;
+
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+
+public class ProcessPack {
+ private WorkflowNode workflowNode;
+ private WorkflowNodeDetails wfNodeDetails;
+ private TaskDetails taskDetails;
+
+ public ProcessPack(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) {
+ this.workflowNode = workflowNode;
+ this.wfNodeDetails = wfNodeDetails;
+ this.taskDetails = taskDetails;
+ }
+
+ public WorkflowNode getWorkflowNode() {
+ return workflowNode;
+ }
+
+ public void setWorkflowNode(WorkflowNode workflowNode) {
+ this.workflowNode = workflowNode;
+ }
+
+ public WorkflowNodeDetails getWfNodeDetails() {
+ return wfNodeDetails;
+ }
+
+ public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) {
+ this.wfNodeDetails = wfNodeDetails;
+ }
+
+ public TaskDetails getTaskDetails() {
+ return taskDetails;
+ }
+
+ public void setTaskDetails(TaskDetails taskDetails) {
+ this.taskDetails = taskDetails;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
new file mode 100644
index 0000000..6dcb8bd
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
@@ -0,0 +1,470 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.util.ExperimentModelUtil;
+import org.apache.airavata.model.workspace.experiment.ExecutionUnit;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.NodeState;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
+import org.apache.airavata.simple.workflow.engine.parser.AiravataDefaultParser;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SimpleWorkflowInterpreter implements Runnable{
+
+ private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class);
+
+ private List<WorkflowInputNode> workflowInputNodes;
+
+ private Experiment experiment;
+
+ private String credentialToken;
+
+ private Map<String, WorkflowNode> readList = new ConcurrentHashMap<String, WorkflowNode>();
+ private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>();
+ private Map<String, ProcessPack> processingQueue = new ConcurrentHashMap<String, ProcessPack>();
+ private Map<String, ProcessPack> completeList = new HashMap<String, ProcessPack>();
+ private Registry registry;
+ private EventBus eventBus = new EventBus();
+ private List<WorkflowOutputNode> completeWorkflowOutputs = new ArrayList<WorkflowOutputNode>();
+
+ public SimpleWorkflowInterpreter(String experimentId, String credentialToken) throws RegistryException {
+ setExperiment(experimentId);
+ this.credentialToken = credentialToken;
+ }
+
+ public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken) {
+ // read the workflow file and build the topology to a DAG. Then execute that dag
+ // get workflowInputNode list and start processing
+ // next() will return ready task and block the thread if no task in ready state.
+ this.experiment = experiment;
+ this.credentialToken = credentialStoreToken;
+ }
+
+
+ public void launchWorkflow() throws Exception {
+ // process workflow input nodes
+// WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance();
+// WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken);
+ WorkflowParser workflowParser = new AiravataDefaultParser(experiment, credentialToken);
+ log.debug("Initialized workflow parser");
+ setWorkflowInputNodes(workflowParser.parse());
+ log.debug("Parsed the workflow and got the workflow input nodes");
+ processWorkflowInputNodes(getWorkflowInputNodes());
+ }
+
+ // try to remove synchronization tag
+ private synchronized void processReadyList() {
+ for (WorkflowNode readyNode : readList.values()) {
+ try {
+ if (readyNode instanceof WorkflowOutputNode) {
+ WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode;
+ wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue());
+ addToCompleteOutputNodeList(wfOutputNode);
+ continue;
+ }
+ WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
+ TaskDetails process = getProcess(workflowNodeDetails);
+ ProcessPack processPack = new ProcessPack(readyNode, workflowNodeDetails, process);
+ addToProcessingQueue(processPack);
+// publishToProcessQueue(process);
+ publishToProcessQueue(processPack);
+ } catch (RegistryException e) {
+ // FIXME : handle this exception
+ }
+ }
+ }
+
+
+ private void publishToProcessQueue(TaskDetails process) {
+ Thread thread = new Thread(new TempPublisher(process, eventBus));
+ thread.start();
+ //TODO: publish to process queue.
+ }
+
+ // TODO : remove this test method
+ private void publishToProcessQueue(ProcessPack process) {
+ WorkflowNode workflowNode = process.getWorkflowNode();
+ if (workflowNode instanceof ApplicationNode) {
+ ApplicationNode applicationNode = (ApplicationNode) workflowNode;
+ List<InPort> inputPorts = applicationNode.getInputPorts();
+ if (applicationNode.getName().equals("Add")) {
+ applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
+ Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) + Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
+ } else if (applicationNode.getName().equals("Multiply")) {
+ applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
+ Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) * Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
+ } else if (applicationNode.getName().equals("Subtract")) {
+ applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
+ Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) - Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
+ } else {
+ throw new RuntimeException("Invalid Application name");
+ }
+
+ for (Edge edge : applicationNode.getOutputPorts().get(0).getOutEdges()) {
+ WorkflowUtil.copyValues(applicationNode.getOutputPorts().get(0).getOutputObject(), edge.getToPort().getInputObject());
+ if (edge.getToPort().getNode().isReady()) {
+ addToReadyQueue(edge.getToPort().getNode());
+ } else {
+ addToWaitingQueue(edge.getToPort().getNode());
+ }
+ }
+ } else if (workflowNode instanceof WorkflowOutputNode) {
+ WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) workflowNode;
+ throw new RuntimeException("Workflow output node in processing queue");
+ }
+
+ processingQueue.remove(process.getTaskDetails().getTaskID());
+ }
+
+ private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException {
+ // create workflow taskDetails from workflowNodeDetails
+ TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromWorkflowNodeDetails(getExperiment(), wfNodeDetails);
+ taskDetails.setTaskID(getRegistry()
+ .add(ChildDataType.TASK_DETAIL, taskDetails, wfNodeDetails.getNodeInstanceId()).toString());
+ return taskDetails;
+ }
+
+ private WorkflowNodeDetails createWorkflowNodeDetails(WorkflowNode readyNode) throws RegistryException {
+ WorkflowNodeDetails wfNodeDetails = ExperimentModelUtil.createWorkflowNode(readyNode.getId(), null);
+ ExecutionUnit executionUnit = ExecutionUnit.APPLICATION;
+ String executionData = null;
+ if (readyNode instanceof ApplicationNode) {
+ executionUnit = ExecutionUnit.APPLICATION;
+ executionData = ((ApplicationNode) readyNode).getApplicationId();
+ } else if (readyNode instanceof WorkflowInputNode) {
+ executionUnit = ExecutionUnit.INPUT;
+ } else if (readyNode instanceof WorkflowOutputNode) {
+ executionUnit = ExecutionUnit.OUTPUT;
+ }
+ wfNodeDetails.setExecutionUnit(executionUnit);
+ wfNodeDetails.setExecutionUnitData(executionData);
+ setupNodeDetailsInput(readyNode, wfNodeDetails);
+ wfNodeDetails.setNodeInstanceId((String) getRegistry()
+ .add(ChildDataType.WORKFLOW_NODE_DETAIL, wfNodeDetails, getExperiment().getExperimentID()));
+// nodeInstanceList.put(node, wfNodeDetails);
+ return wfNodeDetails;
+ }
+
+ private void setupNodeDetailsInput(WorkflowNode readyNode, WorkflowNodeDetails wfNodeDetails) {
+ if (readyNode instanceof ApplicationNode) {
+ ApplicationNode applicationNode = (ApplicationNode) readyNode;
+ if (applicationNode.isReady()) {
+ for (InPort inPort : applicationNode.getInputPorts()) {
+ wfNodeDetails.addToNodeInputs(inPort.getInputObject());
+ }
+ } else {
+ // TODO: handle this scenario properly.
+ }
+ } else {
+ // TODO: do we support for other type of workflow nodes ?
+ }
+ }
+
+
+ private void processWorkflowInputNodes(List<WorkflowInputNode> wfInputNodes) {
+ Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>();
+ for (WorkflowInputNode wfInputNode : wfInputNodes) {
+ if (wfInputNode.isReady()) {
+ log.debug("Workflow node : " + wfInputNode.getId() + " is ready to execute");
+ for (Edge edge : wfInputNode.getOutPort().getOutEdges()) {
+ edge.getToPort().getInputObject().setValue(wfInputNode.getInputObject().getValue());
+ if (edge.getToPort().getNode().isReady()) {
+ addToReadyQueue(edge.getToPort().getNode());
+ log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue");
+ } else {
+ addToWaitingQueue(edge.getToPort().getNode());
+ log.debug("Added workflow node " + edge.getToPort().getNode().getId() + " to the waitingQueue");
+
+ }
+ }
+ }
+ }
+ }
+
+
+ public List<WorkflowInputNode> getWorkflowInputNodes() throws Exception {
+ return workflowInputNodes;
+ }
+
+ public void setWorkflowInputNodes(List<WorkflowInputNode> workflowInputNodes) {
+ this.workflowInputNodes = workflowInputNodes;
+ }
+
+
+ private List<WorkflowInputNode> parseWorkflowDescription(){
+ return null;
+ }
+
+
+ private Registry getRegistry() throws RegistryException {
+ if (registry==null){
+ registry = RegistryFactory.getDefaultRegistry();
+ }
+ return registry;
+ }
+
+ public Experiment getExperiment() {
+ return experiment;
+ }
+
+ private void updateWorkflowNodeStatus(WorkflowNodeDetails wfNodeDetails, WorkflowNodeState state) throws RegistryException{
+ WorkflowNodeStatus status = ExperimentModelUtil.createWorkflowNodeStatus(state);
+ wfNodeDetails.setWorkflowNodeStatus(status);
+ getRegistry().update(RegistryModelType.WORKFLOW_NODE_STATUS, status, wfNodeDetails.getNodeInstanceId());
+ }
+
+ @Subscribe
+ public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent){
+ String taskId = taskOutputEvent.getTaskIdentity().getTaskId();
+ log.debug("Task Output changed event received for workflow node : " +
+ taskOutputEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
+ ProcessPack processPack = processingQueue.get(taskId);
+ Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
+ if (processPack != null) {
+ WorkflowNode workflowNode = processPack.getWorkflowNode();
+ if (workflowNode instanceof ApplicationNode) {
+ ApplicationNode applicationNode = (ApplicationNode) workflowNode;
+ // Workflow node can have one to many output ports and each output port can have one to many links
+ for (OutPort outPort : applicationNode.getOutputPorts()) {
+ for (OutputDataObjectType outputDataObjectType : taskOutputEvent.getOutput()) {
+ if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
+ outPort.getOutputObject().setValue(outputDataObjectType.getValue());
+ break;
+ }
+ }
+ for (Edge edge : outPort.getOutEdges()) {
+ WorkflowUtil.copyValues(outPort.getOutputObject(), edge.getToPort().getInputObject());
+ if (edge.getToPort().getNode().isReady()) {
+ addToReadyQueue(edge.getToPort().getNode());
+ }
+ }
+ }
+ }
+ processingQueue.remove(taskId);
+ log.debug("removed task from processing queue : " + taskId);
+ }
+
+ }
+
+ @Subscribe
+ public void taskStatusChanged(TaskStatusChangeEvent taskStatus){
+ String taskId = taskStatus.getTaskIdentity().getTaskId();
+ ProcessPack processPack = processingQueue.get(taskId);
+ if (processPack != null) {
+ WorkflowNodeState wfNodeState = WorkflowNodeState.UNKNOWN;
+ switch (taskStatus.getState()) {
+ case WAITING:
+ break;
+ case STARTED:
+ break;
+ case PRE_PROCESSING:
+ processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
+ break;
+ case INPUT_DATA_STAGING:
+ processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
+ break;
+ case EXECUTING:
+ processPack.getWorkflowNode().setState(NodeState.EXECUTING);
+ break;
+ case OUTPUT_DATA_STAGING:
+ processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+ break;
+ case POST_PROCESSING:
+ processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+ break;
+ case COMPLETED:
+ processPack.getWorkflowNode().setState(NodeState.EXECUTED);
+ break;
+ case FAILED:
+ processPack.getWorkflowNode().setState(NodeState.FAILED);
+ break;
+ case UNKNOWN:
+ break;
+ case CONFIGURING_WORKSPACE:
+ break;
+ case CANCELED:
+ case CANCELING:
+ processPack.getWorkflowNode().setState(NodeState.FAILED);
+ break;
+ default:
+ break;
+ }
+ if (wfNodeState != WorkflowNodeState.UNKNOWN) {
+ try {
+ updateWorkflowNodeStatus(processPack.getWfNodeDetails(), wfNodeState);
+ } catch (RegistryException e) {
+ // TODO: handle this.
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Remove the workflow node from waiting queue and add it to the ready queue.
+ * @param workflowNode - Workflow Node
+ */
+ private synchronized void addToReadyQueue(WorkflowNode workflowNode) {
+ waitingList.remove(workflowNode.getId());
+ readList.put(workflowNode.getId(), workflowNode);
+ }
+
+ private void addToWaitingQueue(WorkflowNode workflowNode) {
+ waitingList.put(workflowNode.getId(), workflowNode);
+ }
+
+ /**
+ * First remove the node from ready list and then add the WfNodeContainer to the process queue.
+ * Note that underline data structure of the process queue is a Map.
+ * @param processPack - has both workflow and correspond workflowNodeDetails and TaskDetails
+ */
+ private synchronized void addToProcessingQueue(ProcessPack processPack) {
+ readList.remove(processPack.getWorkflowNode().getId());
+ processingQueue.put(processPack.getTaskDetails().getTaskID(), processPack);
+ }
+
+ private synchronized void addToCompleteQueue(ProcessPack processPack) {
+ processingQueue.remove(processPack.getTaskDetails().getTaskID());
+ completeList.put(processPack.getTaskDetails().getTaskID(), processPack);
+ }
+
+
+ private void addToCompleteOutputNodeList(WorkflowOutputNode wfOutputNode) {
+ completeWorkflowOutputs.add(wfOutputNode);
+ readList.remove(wfOutputNode.getId());
+ }
+
+ @Override
+ public void run() {
+ // TODO: Auto generated method body.
+ try {
+ log.debug("Launching workflow");
+ launchWorkflow();
+ while (!(waitingList.isEmpty() && readList.isEmpty())) {
+ processReadyList();
+ Thread.sleep(1000);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void setExperiment(String experimentId) throws RegistryException {
+ experiment = (Experiment) getRegistry().get(RegistryModelType.EXPERIMENT, experimentId);
+ log.debug("Retrieve Experiment for experiment id : " + experimentId);
+ }
+
+
+ class TempPublisher implements Runnable {
+ private TaskDetails tempTaskDetails;
+ private EventBus tempEventBus;
+
+ public TempPublisher(TaskDetails tempTaskDetails, EventBus tempEventBus) {
+ this.tempTaskDetails = tempTaskDetails;
+ this.tempEventBus = tempEventBus;
+ }
+
+ @Override
+ public void run() {
+ try {
+ TaskIdentifier identifier = new TaskIdentifier(tempTaskDetails.getTaskID(), null, null, null);
+ TaskStatusChangeEvent statusChangeEvent = new TaskStatusChangeEvent(TaskState.PRE_PROCESSING, identifier);
+ tempEventBus.post(statusChangeEvent);
+ Thread.sleep(1000);
+ statusChangeEvent = new TaskStatusChangeEvent(TaskState.WAITING, identifier);
+ tempEventBus.post(statusChangeEvent);
+ Thread.sleep(1000);
+ statusChangeEvent = new TaskStatusChangeEvent(TaskState.INPUT_DATA_STAGING, identifier);
+ tempEventBus.post(statusChangeEvent);
+ Thread.sleep(1000);
+ statusChangeEvent = new TaskStatusChangeEvent(TaskState.STARTED, identifier);
+ tempEventBus.post(statusChangeEvent);
+ Thread.sleep(1000);
+ statusChangeEvent = new TaskStatusChangeEvent(TaskState.EXECUTING, identifier);
+ tempEventBus.post(statusChangeEvent);
+ Thread.sleep(1000);
+ statusChangeEvent = new TaskStatusChangeEvent(TaskState.POST_PROCESSING, identifier);
+ tempEventBus.post(statusChangeEvent);
+ Thread.sleep(1000);
+ statusChangeEvent = new TaskStatusChangeEvent(TaskState.OUTPUT_DATA_STAGING, identifier);
+ tempEventBus.post(statusChangeEvent);
+ Thread.sleep(1000);
+ statusChangeEvent = new TaskStatusChangeEvent(TaskState.COMPLETED, identifier);
+ tempEventBus.post(statusChangeEvent);
+ Thread.sleep(1000);
+
+ List<InputDataObjectType> applicationInputs = tempTaskDetails.getApplicationInputs();
+ List<OutputDataObjectType> applicationOutputs = tempTaskDetails.getApplicationOutputs();
+ log.info("************** Task output change event fired for application id :" + tempTaskDetails.getApplicationId());
+ if (tempTaskDetails.getApplicationId().equals("Add") || tempTaskDetails.getApplicationId().equals("Add_2")) {
+ applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) +
+ Integer.parseInt(applicationInputs.get(1).getValue())) + "");
+ } else if (tempTaskDetails.getApplicationId().equals("Subtract")) {
+ applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) -
+ Integer.parseInt(applicationInputs.get(1).getValue())) + "");
+ } else if (tempTaskDetails.getApplicationId().equals("Multiply")) {
+ applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) *
+ Integer.parseInt(applicationInputs.get(1).getValue())) + "");
+ }
+ TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent(applicationOutputs, identifier);
+ eventBus.post(taskOutputChangeEvent);
+
+ } catch (InterruptedException e) {
+ log.error("Thread was interrupted while sleeping");
+ }
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactory.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactory.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactory.java
new file mode 100644
index 0000000..3de90f2
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactory.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine;
+
+/**
+ * All classes implement this WorkflowFactory interface, should be abstract or singleton.
+ */
+public interface WorkflowFactory {
+
+ public WorkflowParser getWorkflowParser(String experimentId, String credentialToken);
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
new file mode 100644
index 0000000..116a10d
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine;
+
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.simple.workflow.engine.parser.AiravataDefaultParser;
+
+/**
+ * Singleton class, only one instance can exist in runtime.
+ */
+public class WorkflowFactoryImpl implements WorkflowFactory {
+
+ private static WorkflowFactoryImpl workflowFactoryImpl;
+
+ private WorkflowParser workflowParser;
+
+ private static final String synch = "sync";
+
+ private WorkflowFactoryImpl(){
+
+ }
+
+ public static WorkflowFactoryImpl getInstance() {
+ if (workflowFactoryImpl == null) {
+ synchronized (synch) {
+ if (workflowFactoryImpl == null) {
+ workflowFactoryImpl = new WorkflowFactoryImpl();
+ }
+ }
+ }
+ return workflowFactoryImpl;
+ }
+
+
+ @Override
+ public WorkflowParser getWorkflowParser(String experimentId, String credentialToken) {
+ if (workflowParser == null) {
+ try {
+ workflowParser = new AiravataDefaultParser(experimentId, credentialToken);
+ } catch (RegistryException e) {
+ // TODO : handle this scenario
+ }
+ }
+ return workflowParser;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowParser.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowParser.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowParser.java
new file mode 100644
index 0000000..6c4d6f2
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowParser.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine;
+
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
+
+import java.util.List;
+
+public interface WorkflowParser {
+
+ public List<WorkflowInputNode> parse() throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowUtil.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowUtil.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowUtil.java
new file mode 100644
index 0000000..a2b69ae
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowUtil.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine;
+
+import com.google.common.eventbus.EventBus;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.persistance.registry.jpa.model.TaskDetail;
+
+public class WorkflowUtil {
+
+ public static InputDataObjectType copyValues(InputDataObjectType fromInputObj, InputDataObjectType toInputObj){
+ if (toInputObj == null) {
+ // TODO : throw an error
+ }
+ toInputObj.setValue(fromInputObj.getValue());
+ if (fromInputObj.getApplicationArgument() != null
+ && !fromInputObj.getApplicationArgument().trim().equals("")) {
+ toInputObj.setApplicationArgument(fromInputObj.getApplicationArgument());
+ }
+ if (toInputObj.getType() == null) {
+ toInputObj.setType(fromInputObj.getType());
+ }
+ return fromInputObj;
+ }
+
+ public static InputDataObjectType copyValues(OutputDataObjectType outputData, InputDataObjectType inputData) {
+ inputData.setValue(outputData.getValue());
+ return inputData;
+ }
+
+
+ public static OutputDataObjectType copyValues(InputDataObjectType inputObject, OutputDataObjectType outputObject) {
+ if (outputObject == null) {
+ outputObject = new OutputDataObjectType();
+ }
+ outputObject.setValue(inputObject.getValue());
+ return outputObject;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java
new file mode 100644
index 0000000..3bc380d
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.edge;
+
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+
+
+public class DirectedEdge implements Edge {
+
+ private InPort inPort;
+ private OutPort outPort;
+
+ @Override
+ public InPort getToPort() {
+ return inPort;
+ }
+
+ @Override
+ public void setToPort(InPort inPort) {
+ this.inPort = inPort;
+ }
+
+ @Override
+ public OutPort getFromPort() {
+ return outPort;
+ }
+
+ @Override
+ public void setFromPort(OutPort outPort) {
+ this.outPort = outPort;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/Edge.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/Edge.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/Edge.java
new file mode 100644
index 0000000..e8bce2e
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/Edge.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.edge;
+
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+
+/**
+ * Edge is a link to one node to another, basically edge should have outPort of a workflow node ,
+ * which is starting point and inPort of a workflow node, which is end point of the edge.
+ */
+
+public interface Edge {
+
+ public InPort getToPort();
+
+ public void setToPort(InPort inPort);
+
+ public OutPort getFromPort();
+
+ public void setFromPort(OutPort outPort);
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNode.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNode.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNode.java
new file mode 100644
index 0000000..37efded
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNode.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+
+import java.util.List;
+
+public interface ApplicationNode extends WorkflowNode {
+
+ public String getApplicationId();
+
+ public void addInPort(InPort inPort);
+
+ public List<InPort> getInputPorts();
+
+ public void addOutPort(OutPort outPort);
+
+ public List<OutPort> getOutputPorts();
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
new file mode 100644
index 0000000..52b0595
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ApplicationNodeImpl implements ApplicationNode {
+
+ private final String nodeId;
+ private NodeState myState = NodeState.WAITING;
+ private String applicationId;
+ private List<InPort> inPorts = new ArrayList<InPort>();
+ private List<OutPort> outPorts = new ArrayList<OutPort>();
+ private String applicationName;
+
+// public ApplicationNodeImpl(String nodeId) {
+// this(nodeId, null);
+// }
+//
+// public ApplicationNodeImpl(String nodeId, String applicationId) {
+// this(nodeId, null, applicationId);
+// }
+
+ public ApplicationNodeImpl(String nodeId, String applicationName, String applicationId) {
+ this.nodeId = nodeId;
+ this.applicationName = applicationName;
+ this.applicationId = applicationId;
+ }
+
+ @Override
+ public String getId() {
+ return this.nodeId;
+ }
+
+ @Override
+ public String getName() {
+ return applicationName;
+ }
+
+ @Override
+ public NodeType getType() {
+ return NodeType.APPLICATION;
+ }
+
+ @Override
+ public NodeState getState() {
+ return myState;
+ }
+
+ @Override
+ public void setState(NodeState newState) {
+ // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
+ myState = newState;
+ }
+
+ @Override
+ public boolean isReady() {
+ for (InPort inPort : getInputPorts()) {
+ if (!inPort.isReady()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String getApplicationId() {
+ return this.applicationId;
+ }
+
+ @Override
+ public void addInPort(InPort inPort) {
+ this.inPorts.add(inPort);
+ }
+
+ @Override
+ public List<InPort> getInputPorts() {
+ return this.inPorts;
+ }
+
+ @Override
+ public void addOutPort(OutPort outPort) {
+ this.outPorts.add(outPort);
+ }
+
+ @Override
+ public List<OutPort> getOutputPorts() {
+ return this.outPorts;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java
new file mode 100644
index 0000000..333fcb2
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+public enum NodeState {
+ WAITING, // waiting on inputs
+ READY, // all inputs are available and ready to execute
+ QUEUED, //
+ PRE_PROCESSING, //
+ EXECUTING, // task has been submitted , not yet finish
+ EXECUTED, // task executed
+ POST_PROCESSING, //
+ FAILED,
+ COMPLETE // all works done
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeType.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeType.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeType.java
new file mode 100644
index 0000000..95710fb
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeType.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+public enum NodeType {
+ APPLICATION,
+ WORKFLOW_INPUT,
+ WORKFLOW_OUTPUT
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java
new file mode 100644
index 0000000..9ac800a
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+
+public interface WorkflowInputNode extends WorkflowNode {
+
+ public InputDataObjectType getInputObject();
+
+ public void setInputObject(InputDataObjectType inputObject);
+
+ public OutPort getOutPort();
+
+ public void setOutPort(OutPort outPort);
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
new file mode 100644
index 0000000..b3dfa62
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+
+public class WorkflowInputNodeImpl implements WorkflowInputNode {
+
+ private NodeState myState = NodeState.READY;
+ private final String nodeId;
+ private String nodeName;
+ private OutPort outPort;
+ private InputDataObjectType inputDataObjectType;
+ private String name;
+
+ public WorkflowInputNodeImpl(String nodeId) {
+ this(nodeId, null);
+ }
+
+ public WorkflowInputNodeImpl(String nodeId, String nodeName) {
+ this.nodeId = nodeId;
+ this.nodeName = nodeName;
+ }
+
+ @Override
+ public String getId() {
+ return this.nodeId;
+ }
+
+ @Override
+ public String getName() {
+ return this.nodeName;
+ }
+
+ @Override
+ public NodeType getType() {
+ return NodeType.WORKFLOW_INPUT;
+ }
+
+ @Override
+ public NodeState getState() {
+ return myState;
+ }
+
+ @Override
+ public void setState(NodeState newState) {
+ // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
+ myState = newState;
+ }
+
+ @Override
+ public boolean isReady() {
+ return (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals(""))
+ || !inputDataObjectType.isIsRequired();
+ }
+
+ @Override
+ public InputDataObjectType getInputObject() {
+ return this.inputDataObjectType;
+ }
+
+ @Override
+ public void setInputObject(InputDataObjectType inputObject) {
+ this.inputDataObjectType = inputObject;
+ }
+
+ @Override
+ public OutPort getOutPort() {
+ return this.outPort;
+ }
+
+ @Override
+ public void setOutPort(OutPort outPort) {
+ this.outPort = outPort;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowNode.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowNode.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowNode.java
new file mode 100644
index 0000000..efcf9c7
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowNode.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+public interface WorkflowNode {
+
+ public String getId();
+
+ public String getName();
+
+ public NodeType getType();
+
+ public NodeState getState();
+
+ public void setState(NodeState newState);
+
+ public boolean isReady();
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNode.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNode.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNode.java
new file mode 100644
index 0000000..14e4519
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNode.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+
+public interface WorkflowOutputNode extends WorkflowNode {
+
+ public OutputDataObjectType getOutputObject();
+
+ public void setOutputObject(OutputDataObjectType outputObject);
+
+ public InPort getInPort();
+
+ public void setInPort(InPort inPort);
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
new file mode 100644
index 0000000..5924212
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+
+public class WorkflowOutputNodeImpl implements WorkflowOutputNode {
+
+ private NodeState myState = NodeState.WAITING;
+ private final String nodeId;
+ private String nodeName;
+ private OutputDataObjectType outputDataObjectType;
+ private InPort inPort;
+
+ public WorkflowOutputNodeImpl(String nodeId) {
+ this(nodeId, null);
+ }
+
+ public WorkflowOutputNodeImpl(String nodeId, String nodeName) {
+ this.nodeId = nodeId;
+ this.nodeName = nodeName;
+ }
+
+ @Override
+ public String getId() {
+ return this.nodeId;
+ }
+
+ @Override
+ public String getName() {
+ return this.nodeName;
+ }
+
+ @Override
+ public NodeType getType() {
+ return NodeType.WORKFLOW_OUTPUT;
+ }
+
+ @Override
+ public NodeState getState() {
+ return myState;
+ }
+
+ @Override
+ public void setState(NodeState newState) {
+ // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
+ myState = newState;
+ }
+
+ @Override
+ public boolean isReady() {
+ return !(inPort.getInputObject() == null || inPort.getInputObject().getValue() == null
+ || inPort.getInputObject().getValue().equals(""));
+ }
+
+ @Override
+ public OutputDataObjectType getOutputObject() {
+ return this.outputDataObjectType;
+ }
+
+ @Override
+ public void setOutputObject(OutputDataObjectType outputObject) {
+ this.outputDataObjectType = outputObject;
+ }
+
+ @Override
+ public InPort getInPort() {
+ return this.inPort;
+ }
+
+ @Override
+ public void setInPort(InPort inPort) {
+ this.inPort = inPort;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InPort.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InPort.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InPort.java
new file mode 100644
index 0000000..bb4a112
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InPort.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.port;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
+
+public interface InPort extends Port {
+
+ public void setInputObject(InputDataObjectType inputObject);
+
+ public InputDataObjectType getInputObject();
+
+ public Edge getEdge();
+
+ public void addEdge(Edge edge);
+
+ public String getDefaultValue();
+
+ public void setDefaultValue(String defaultValue);
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InputPortIml.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InputPortIml.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InputPortIml.java
new file mode 100644
index 0000000..c78dc86
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InputPortIml.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.port;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+
+public class InputPortIml implements InPort {
+
+ private InputDataObjectType inputDataObjectType;
+ private boolean ready = false;
+ private String portId;
+ private Edge edge;
+ private WorkflowNode node;
+ private String defaultValue;
+
+ public InputPortIml(String portId) {
+ this.portId = portId;
+ }
+
+ @Override
+ public void setInputObject(InputDataObjectType inputObject) {
+ this.inputDataObjectType = inputObject;
+ ready = (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals(""))
+ || !inputDataObjectType.isIsRequired();
+ }
+
+ @Override
+ public InputDataObjectType getInputObject() {
+ return this.inputDataObjectType;
+ }
+
+ @Override
+ public Edge getEdge() {
+ return this.edge;
+ }
+
+ @Override
+ public void addEdge(Edge edge) {
+ this.edge = edge;
+ }
+
+ @Override
+ public String getDefaultValue() {
+ return defaultValue;
+ }
+
+ public void setDefaultValue(String defaultValue) {
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public boolean isReady() {
+ return getInputObject() != null && inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals("");
+ }
+
+ @Override
+ public WorkflowNode getNode() {
+ return this.node;
+ }
+
+ @Override
+ public void setNode(WorkflowNode workflowNode) {
+ this.node = workflowNode;
+ }
+
+ @Override
+ public String getId() {
+ return this.portId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPort.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPort.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPort.java
new file mode 100644
index 0000000..0332f81
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPort.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.port;
+
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
+
+import java.util.List;
+
+public interface OutPort extends Port {
+
+ public void setOutputObject(OutputDataObjectType outputObject);
+
+ public OutputDataObjectType getOutputObject();
+
+ public List<Edge> getOutEdges();
+
+ public void addEdge(Edge edge);
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPortImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPortImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPortImpl.java
new file mode 100644
index 0000000..4e26cb3
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPortImpl.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.port;
+
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class OutPortImpl implements OutPort {
+
+ private OutputDataObjectType outputDataObjectType;
+ private List<Edge> outEdges = new ArrayList<Edge>();
+ private boolean isSatisfy = false;
+ private String portId;
+ private WorkflowNode node;
+
+ public OutPortImpl(String portId) {
+ this.portId = portId;
+ }
+
+ @Override
+ public void setOutputObject(OutputDataObjectType outputObject) {
+ this.outputDataObjectType = outputObject;
+ }
+
+ @Override
+ public OutputDataObjectType getOutputObject() {
+ return this.outputDataObjectType;
+ }
+
+ @Override
+ public List<Edge> getOutEdges() {
+ return this.outEdges;
+ }
+
+ @Override
+ public void addEdge(Edge edge) {
+ this.outEdges.add(edge);
+ }
+
+ @Override
+ public boolean isReady() {
+ return this.outputDataObjectType.getValue() != null
+ && !this.outputDataObjectType.getValue().equals("");
+ }
+
+ @Override
+ public WorkflowNode getNode() {
+ return this.node;
+ }
+
+ @Override
+ public void setNode(WorkflowNode workflowNode) {
+ this.node = workflowNode;
+ }
+
+ @Override
+ public String getId() {
+ return portId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/Port.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/Port.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/Port.java
new file mode 100644
index 0000000..2b27ea0
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/Port.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.port;
+
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+
+public interface Port {
+
+ public boolean isReady();
+
+ public WorkflowNode getNode();
+
+ public void setNode(WorkflowNode workflowNode);
+
+ public String getId();
+
+}