You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/26 18:08:43 UTC
[44/50] [abbrv] airavata git commit: Renamed simple-workflow module
to workflow and created a new workflow-core module which will keep all the
core code
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/pom.xml
----------------------------------------------------------------------
diff --git a/modules/workflow/pom.xml b/modules/workflow/pom.xml
new file mode 100644
index 0000000..4fb8e09
--- /dev/null
+++ b/modules/workflow/pom.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata</artifactId>
+ <version>0.15-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>workflow</artifactId>
+ <packaging>pom</packaging>
+ <version>0.15-SNAPSHOT</version>
+ <name>Airavata Workflow</name>
+ <modules>
+ <module>workflow-core</module>
+ </modules>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/pom.xml b/modules/workflow/workflow-core/pom.xml
new file mode 100644
index 0000000..1cb8e0d
--- /dev/null
+++ b/modules/workflow/workflow-core/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>workflow</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>0.15-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>workflow-core</artifactId>
+ <name>Airavata Workflow Core</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-data-models</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-registry-cpi</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-model-utils</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-jpa-registry</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Airavata default parser dependency -->
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-workflow-model-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>app-catalog-data</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>app-catalog-cpi</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <!-- Messaging dependency -->
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-messaging-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>18.0</version>
+ </dependency>
+
+ <!--test-->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/ProcessContext.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/ProcessContext.java
new file mode 100644
index 0000000..e0c2651
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/ProcessContext.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core;
+
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
+
+public class ProcessContext {
+ private WorkflowNode workflowNode;
+ private WorkflowNodeDetails wfNodeDetails;
+ private TaskDetails taskDetails;
+
+ public ProcessContext(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) {
+ this.workflowNode = workflowNode;
+ this.wfNodeDetails = wfNodeDetails;
+ this.taskDetails = taskDetails;
+ }
+
+ public WorkflowNode getWorkflowNode() {
+ return workflowNode;
+ }
+
+ public void setWorkflowNode(WorkflowNode workflowNode) {
+ this.workflowNode = workflowNode;
+ }
+
+ public WorkflowNodeDetails getWfNodeDetails() {
+ return wfNodeDetails;
+ }
+
+ public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) {
+ this.wfNodeDetails = wfNodeDetails;
+ }
+
+ public TaskDetails getTaskDetails() {
+ return taskDetails;
+ }
+
+ public void setTaskDetails(TaskDetails taskDetails) {
+ this.taskDetails = taskDetails;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
new file mode 100644
index 0000000..a674aad
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
@@ -0,0 +1,400 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.util.ExperimentModelUtil;
+import org.apache.airavata.model.workspace.experiment.ExecutionUnit;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.airavata.workflow.core.dag.edge.Edge;
+import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode;
+import org.apache.airavata.workflow.core.dag.nodes.NodeState;
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowInputNode;
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowOutputNode;
+import org.apache.airavata.workflow.core.dag.port.InPort;
+import org.apache.airavata.workflow.core.dag.port.OutPort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Package-Private class
+ */
+class SimpleWorkflowInterpreter{
+
+ private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class);
+ private List<WorkflowInputNode> workflowInputNodes;
+
+ private Experiment experiment;
+
+ private String credentialToken;
+
+ private String gatewayName;
+
+ private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<String, WorkflowNode>();
+ private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>();
+ private Map<String, ProcessContext> processingQueue = new ConcurrentHashMap<String, ProcessContext>();
+ private Map<String, ProcessContext> completeList = new HashMap<String, ProcessContext>();
+ private Registry registry;
+ private List<WorkflowOutputNode> completeWorkflowOutputs = new ArrayList<WorkflowOutputNode>();
+ private RabbitMQProcessPublisher publisher;
+ private RabbitMQStatusConsumer statusConsumer;
+ private String consumerId;
+ private boolean continueWorkflow = true;
+
+ public SimpleWorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessPublisher publisher) throws RegistryException {
+ this.gatewayName = gatewayName;
+ setExperiment(experimentId);
+ this.credentialToken = credentialToken;
+ this.publisher = publisher;
+ }
+
+ public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessPublisher publisher) {
+ this.gatewayName = gatewayName;
+ this.experiment = experiment;
+ this.credentialToken = credentialStoreToken;
+ this.publisher = publisher;
+ }
+
+ /**
+ * Package-Private method.
+ * @throws Exception
+ */
+ void launchWorkflow() throws Exception {
+ WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance();
+ WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken);
+ log.debug("Initialized workflow parser");
+ setWorkflowInputNodes(workflowParser.parse());
+ log.debug("Parsed the workflow and got the workflow input nodes");
+ // process workflow input nodes
+ processWorkflowInputNodes(getWorkflowInputNodes());
+ if (readyList.isEmpty()) {
+ StringBuilder sb = new StringBuilder();
+ for (WorkflowInputNode workflowInputNode : workflowInputNodes) {
+ sb.append(", ");
+ sb.append(workflowInputNode.getInputObject().getName());
+ sb.append("=");
+ sb.append(workflowInputNode.getInputObject().getValue());
+ }
+ throw new AiravataException("No workflow application node is in ready state to run with experiment inputs" + sb.toString());
+ }
+ processReadyList();
+ }
+
+ // try to remove synchronization tag
+ /**
+ * Package-Private method.
+ * @throws RegistryException
+ * @throws AiravataException
+ */
+ void processReadyList() throws RegistryException, AiravataException {
+ if (readyList.isEmpty() && processingQueue.isEmpty() && !waitingList.isEmpty()) {
+ throw new AiravataException("No workflow application node is in ready state to run");
+ }
+ for (WorkflowNode readyNode : readyList.values()) {
+ if (readyNode instanceof WorkflowOutputNode) {
+ WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode;
+ wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue());
+ addToCompleteOutputNodeList(wfOutputNode);
+ continue;
+ }
+ WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
+ TaskDetails process = getProcess(workflowNodeDetails);
+ ProcessContext processContext = new ProcessContext(readyNode, workflowNodeDetails, process);
+ addToProcessingQueue(processContext);
+ publishToProcessQueue(process);
+ }
+ }
+
+
+ private void publishToProcessQueue(TaskDetails process) throws AiravataException {
+ ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
+ processSubmitEvent.setCredentialToken(credentialToken);
+ processSubmitEvent.setTaskId(process.getTaskID());
+ MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.TASK, process.getTaskID(), null);
+ messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ publisher.publish(messageContext);
+ }
+
+ private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException {
+ // create workflow taskDetails from workflowNodeDetails
+ TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromWorkflowNodeDetails(getExperiment(), wfNodeDetails);
+ taskDetails.setTaskID(getRegistry()
+ .add(ChildDataType.TASK_DETAIL, taskDetails, wfNodeDetails.getNodeInstanceId()).toString());
+ return taskDetails;
+ }
+
+ private WorkflowNodeDetails createWorkflowNodeDetails(WorkflowNode readyNode) throws RegistryException {
+ WorkflowNodeDetails wfNodeDetails = ExperimentModelUtil.createWorkflowNode(readyNode.getId(), null);
+ ExecutionUnit executionUnit = ExecutionUnit.APPLICATION;
+ String executionData = null;
+ if (readyNode instanceof ApplicationNode) {
+ executionUnit = ExecutionUnit.APPLICATION;
+ executionData = ((ApplicationNode) readyNode).getApplicationId();
+ setupNodeDetailsInput(((ApplicationNode) readyNode), wfNodeDetails);
+ } else if (readyNode instanceof WorkflowInputNode) {
+ executionUnit = ExecutionUnit.INPUT;
+ } else if (readyNode instanceof WorkflowOutputNode) {
+ executionUnit = ExecutionUnit.OUTPUT;
+ }
+ wfNodeDetails.setExecutionUnit(executionUnit);
+ wfNodeDetails.setExecutionUnitData(executionData);
+ wfNodeDetails.setNodeInstanceId((String) getRegistry()
+ .add(ChildDataType.WORKFLOW_NODE_DETAIL, wfNodeDetails, getExperiment().getExperimentID()));
+ return wfNodeDetails;
+ }
+
+ private void setupNodeDetailsInput(ApplicationNode readyAppNode, WorkflowNodeDetails wfNodeDetails) {
+ if (readyAppNode.isReady()) {
+ for (InPort inPort : readyAppNode.getInputPorts()) {
+ wfNodeDetails.addToNodeInputs(inPort.getInputObject());
+ }
+ } else {
+ throw new IllegalArgumentException("Application node should be in ready state to set inputs to the " +
+ "workflow node details, nodeId = " + readyAppNode.getId());
+ }
+ }
+
+
+ private void processWorkflowInputNodes(List<WorkflowInputNode> wfInputNodes) {
+ Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>();
+ for (WorkflowInputNode wfInputNode : wfInputNodes) {
+ if (wfInputNode.isReady()) {
+ log.debug("Workflow node : " + wfInputNode.getId() + " is ready to execute");
+ for (Edge edge : wfInputNode.getOutPort().getOutEdges()) {
+ edge.getToPort().getInputObject().setValue(wfInputNode.getInputObject().getValue());
+ if (edge.getToPort().getNode().isReady()) {
+ addToReadyQueue(edge.getToPort().getNode());
+ log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue");
+ } else {
+ addToWaitingQueue(edge.getToPort().getNode());
+ log.debug("Added workflow node " + edge.getToPort().getNode().getId() + " to the waitingQueue");
+
+ }
+ }
+ }
+ }
+ }
+
+
+ public List<WorkflowInputNode> getWorkflowInputNodes() throws Exception {
+ return workflowInputNodes;
+ }
+
+ public void setWorkflowInputNodes(List<WorkflowInputNode> workflowInputNodes) {
+ this.workflowInputNodes = workflowInputNodes;
+ }
+
+ private Registry getRegistry() throws RegistryException {
+ if (registry==null){
+ registry = RegistryFactory.getDefaultRegistry();
+ }
+ return registry;
+ }
+
+ public Experiment getExperiment() {
+ return experiment;
+ }
+
+ private void updateWorkflowNodeStatus(WorkflowNodeDetails wfNodeDetails, WorkflowNodeState state) throws RegistryException{
+ WorkflowNodeStatus status = ExperimentModelUtil.createWorkflowNodeStatus(state);
+ wfNodeDetails.setWorkflowNodeStatus(status);
+ getRegistry().update(RegistryModelType.WORKFLOW_NODE_STATUS, status, wfNodeDetails.getNodeInstanceId());
+ }
+
+ /**
+ * Package-Private method.
+ * Remove the workflow node from waiting queue and add it to the ready queue.
+ * @param workflowNode - Workflow Node
+ */
+ synchronized void addToReadyQueue(WorkflowNode workflowNode) {
+ waitingList.remove(workflowNode.getId());
+ readyList.put(workflowNode.getId(), workflowNode);
+ }
+
+ private void addToWaitingQueue(WorkflowNode workflowNode) {
+ waitingList.put(workflowNode.getId(), workflowNode);
+ }
+
+ /**
+ * First remove the node from ready list and then add the WfNodeContainer to the process queue.
+ * Note that underline data structure of the process queue is a Map.
+ * @param processContext - has both workflow and correspond workflowNodeDetails and TaskDetails
+ */
+ private synchronized void addToProcessingQueue(ProcessContext processContext) {
+ readyList.remove(processContext.getWorkflowNode().getId());
+ processingQueue.put(processContext.getTaskDetails().getTaskID(), processContext);
+ }
+
+ private synchronized void addToCompleteQueue(ProcessContext processContext) {
+ processingQueue.remove(processContext.getTaskDetails().getTaskID());
+ completeList.put(processContext.getTaskDetails().getTaskID(), processContext);
+ }
+
+
+ private void addToCompleteOutputNodeList(WorkflowOutputNode wfOutputNode) {
+ completeWorkflowOutputs.add(wfOutputNode);
+ readyList.remove(wfOutputNode.getId());
+ }
+
+ boolean isAllDone() {
+ return !continueWorkflow || (waitingList.isEmpty() && readyList.isEmpty() && processingQueue.isEmpty());
+ }
+
+ private void setExperiment(String experimentId) throws RegistryException {
+ experiment = (Experiment) getRegistry().get(RegistryModelType.EXPERIMENT, experimentId);
+ log.debug("Retrieve Experiment for experiment id : " + experimentId);
+ }
+
+ synchronized void handleTaskOutputChangeEvent(TaskOutputChangeEvent taskOutputChangeEvent) {
+
+ String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId();
+ log.debug("Task Output changed event received for workflow node : " +
+ taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
+ ProcessContext processContext = processingQueue.get(taskId);
+ Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
+ if (processContext != null) {
+ WorkflowNode workflowNode = processContext.getWorkflowNode();
+ if (workflowNode instanceof ApplicationNode) {
+ ApplicationNode applicationNode = (ApplicationNode) workflowNode;
+ // Workflow node can have one to many output ports and each output port can have one to many links
+ for (OutPort outPort : applicationNode.getOutputPorts()) {
+ for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) {
+ if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
+ outPort.getOutputObject().setValue(outputDataObjectType.getValue());
+ break;
+ }
+ }
+ for (Edge edge : outPort.getOutEdges()) {
+ edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue());
+ if (edge.getToPort().getNode().isReady()) {
+ addToReadyQueue(edge.getToPort().getNode());
+ }
+ }
+ }
+ }
+ addToCompleteQueue(processContext);
+ log.debug("removed task from processing queue : " + taskId);
+ try {
+ processReadyList();
+ } catch (Exception e) {
+ log.error("Error while processing ready workflow nodes", e);
+ continueWorkflow = false;
+ }
+ }
+ }
+
+ void handleTaskStatusChangeEvent(TaskStatusChangeEvent taskStatusChangeEvent) {
+ TaskState taskState = taskStatusChangeEvent.getState();
+ TaskIdentifier taskIdentity = taskStatusChangeEvent.getTaskIdentity();
+ String taskId = taskIdentity.getTaskId();
+ ProcessContext processContext = processingQueue.get(taskId);
+ if (processContext != null) {
+ WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED;
+ switch (taskState) {
+ case WAITING:
+ break;
+ case STARTED:
+ break;
+ case PRE_PROCESSING:
+ wfNodeState = WorkflowNodeState.INVOKED;
+ processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
+ break;
+ case INPUT_DATA_STAGING:
+ wfNodeState = WorkflowNodeState.INVOKED;
+ processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
+ break;
+ case EXECUTING:
+ wfNodeState = WorkflowNodeState.EXECUTING;
+ processContext.getWorkflowNode().setState(NodeState.EXECUTING);
+ break;
+ case OUTPUT_DATA_STAGING:
+ wfNodeState = WorkflowNodeState.COMPLETED;
+ processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+ break;
+ case POST_PROCESSING:
+ wfNodeState = WorkflowNodeState.COMPLETED;
+ processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+ break;
+ case COMPLETED:
+ wfNodeState = WorkflowNodeState.COMPLETED;
+ processContext.getWorkflowNode().setState(NodeState.EXECUTED);
+ break;
+ case FAILED:
+ wfNodeState = WorkflowNodeState.FAILED;
+ processContext.getWorkflowNode().setState(NodeState.FAILED);
+ break;
+ case UNKNOWN:
+ wfNodeState = WorkflowNodeState.UNKNOWN;
+ break;
+ case CONFIGURING_WORKSPACE:
+ wfNodeState = WorkflowNodeState.COMPLETED;
+ break;
+ case CANCELED:
+ case CANCELING:
+ wfNodeState = WorkflowNodeState.CANCELED;
+ processContext.getWorkflowNode().setState(NodeState.FAILED);
+ break;
+ default:
+ break;
+ }
+ if (wfNodeState != WorkflowNodeState.UNKNOWN) {
+ try {
+ updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState);
+ } catch (RegistryException e) {
+ log.error("Error while updating workflow node status update to the registry. nodeInstanceId :"
+ + processContext.getWfNodeDetails().getNodeInstanceId() + " status to: "
+ + processContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e);
+ }
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
new file mode 100644
index 0000000..7795296
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
@@ -0,0 +1,183 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class WorkflowEnactmentService {
+
+ private static WorkflowEnactmentService workflowEnactmentService;
+ private final RabbitMQStatusConsumer statusConsumer;
+ private String consumerId;
+ private ExecutorService executor;
+ private Map<String,SimpleWorkflowInterpreter> workflowMap;
+
+ private WorkflowEnactmentService () throws AiravataException {
+ executor = Executors.newFixedThreadPool(getThreadPoolSize());
+ workflowMap = new ConcurrentHashMap<String, SimpleWorkflowInterpreter>();
+ statusConsumer = new RabbitMQStatusConsumer();
+ consumerId = statusConsumer.listen(new TaskMessageHandler());
+ // register the shutdown hook to un-bind status consumer.
+ Runtime.getRuntime().addShutdownHook(new EnactmentShutDownHook());
+ }
+
+ public static WorkflowEnactmentService getInstance() throws AiravataException {
+ if (workflowEnactmentService == null) {
+ synchronized (WorkflowEnactmentService.class) {
+ if (workflowEnactmentService == null) {
+ workflowEnactmentService = new WorkflowEnactmentService();
+ }
+ }
+ }
+ return workflowEnactmentService;
+ }
+
+ public void submitWorkflow(String experimentId,
+ String credentialToken,
+ String gatewayName,
+ RabbitMQProcessPublisher publisher) throws Exception {
+
+ SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter(
+ experimentId, credentialToken,gatewayName, publisher);
+ workflowMap.put(experimentId, simpleWorkflowInterpreter);
+ simpleWorkflowInterpreter.launchWorkflow();
+
+ }
+
+ private int getThreadPoolSize() {
+ return ServerSettings.getEnactmentThreadPoolSize();
+ }
+
+ private class TaskMessageHandler implements MessageHandler {
+
+ @Override
+ public Map<String, Object> getProperties() {
+ Map<String, Object> props = new HashMap<String, Object>();
+ String gatewayId = "*";
+ String experimentId = "*";
+ List<String> routingKeys = new ArrayList<String>();
+ routingKeys.add(gatewayId);
+ routingKeys.add(gatewayId + "." + experimentId);
+ routingKeys.add(gatewayId + "." + experimentId+ ".*");
+ routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
+ props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
+ return props;
+ }
+
+ @Override
+ public void onMessage(MessageContext msgCtx) {
+ StatusHandler statusHandler = new StatusHandler(msgCtx);
+ executor.execute(statusHandler);
+ }
+
+
+ }
+
+ private class StatusHandler implements Runnable{
+ private final Logger log = LoggerFactory.getLogger(StatusHandler.class);
+
+ private final MessageContext msgCtx;
+
+ public StatusHandler(MessageContext msgCtx) {
+ this.msgCtx = msgCtx;
+ }
+
+ @Override
+ public void run() {
+ process();
+ }
+
+ private void process() {
+ String message;
+ SimpleWorkflowInterpreter simpleWorkflowInterpreter;
+ if (msgCtx.getType() == MessageType.TASK) {
+ TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
+ TaskIdentifier taskIdentifier = event.getTaskIdentity();
+ simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId());
+ if (simpleWorkflowInterpreter != null) {
+ simpleWorkflowInterpreter.handleTaskStatusChangeEvent(event);
+ } else {
+ // this happens when Task status messages comes after the Taskoutput messages,as we have worked on
+ // output changes it is ok to ignore this.
+ }
+ message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId();
+ log.debug(message);
+ }else if (msgCtx.getType() == MessageType.TASKOUTPUT) {
+ TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
+ TaskIdentifier taskIdentifier = event.getTaskIdentity();
+ simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId());
+ if (simpleWorkflowInterpreter != null) {
+ simpleWorkflowInterpreter.handleTaskOutputChangeEvent(event);
+ if (simpleWorkflowInterpreter.isAllDone()) {
+ workflowMap.remove(taskIdentifier.getExperimentId());
+ }
+ } else {
+ throw new IllegalArgumentException("Error while processing TaskOutputChangeEvent, " +
+ "There is no registered workflow for experiment Id : " + taskIdentifier.getExperimentId());
+ }
+ message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId();
+ log.debug(message);
+ } else {
+ // not interested, ignores
+ }
+ }
+
+ private SimpleWorkflowInterpreter getInterpreter(String experimentId){
+ return workflowMap.get(experimentId);
+ }
+ }
+
+
+ private class EnactmentShutDownHook extends Thread {
+ private final Logger log = LoggerFactory.getLogger(EnactmentShutDownHook.class);
+ @Override
+ public void run() {
+ super.run();
+ try {
+ statusConsumer.stopListen(consumerId);
+ log.info("Successfully un-binded task status consumer");
+ } catch (AiravataException e) {
+ log.error("Error while un-bind enactment status consumer", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
new file mode 100644
index 0000000..ee89dd9
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core;
+
+/**
+ * All classes implement this WorkflowFactory interface, should be abstract or singleton.
+ */
+public interface WorkflowFactory {
+
+ public WorkflowParser getWorkflowParser(String experimentId, String credentialToken) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactoryImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactoryImpl.java
new file mode 100644
index 0000000..1df2084
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactoryImpl.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.workflow.core.parser.AiravataWorkflowParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+
+/**
+ * Singleton class, only one instance can exist in runtime.
+ */
+public class WorkflowFactoryImpl implements WorkflowFactory {
+
+ private static final Logger log = LoggerFactory.getLogger(WorkflowFactoryImpl.class);
+
+ private static WorkflowFactoryImpl workflowFactoryImpl;
+
+ private WorkflowFactoryImpl(){
+
+ }
+
+ public static WorkflowFactoryImpl getInstance() {
+ if (workflowFactoryImpl == null) {
+ synchronized (WorkflowFactory.class) {
+ if (workflowFactoryImpl == null) {
+ workflowFactoryImpl = new WorkflowFactoryImpl();
+ }
+ }
+ }
+ return workflowFactoryImpl;
+ }
+
+
+ @Override
+ public WorkflowParser getWorkflowParser(String experimentId, String credentialToken) throws Exception {
+ WorkflowParser workflowParser = null;
+ try {
+ String wfParserClassName = ServerSettings.getWorkflowParser();
+ Class<?> aClass = Class.forName(wfParserClassName);
+ Constructor<?> constructor = aClass.getConstructor(String.class, String.class);
+ workflowParser = (WorkflowParser) constructor.newInstance(experimentId, credentialToken);
+ } catch (ApplicationSettingsException e) {
+ log.info("A custom workflow parser is not defined, Use default Airavata workflow parser");
+ }
+ if (workflowParser == null) {
+ workflowParser = new AiravataWorkflowParser(experimentId, credentialToken);
+ }
+ return workflowParser;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java
new file mode 100644
index 0000000..8d284dd
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core;
+
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowInputNode;
+
+import java.util.List;
+
+public interface WorkflowParser {
+
+ public List<WorkflowInputNode> parse() throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java
new file mode 100644
index 0000000..91118cc
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.edge;
+
+import org.apache.airavata.workflow.core.dag.port.InPort;
+import org.apache.airavata.workflow.core.dag.port.OutPort;
+
+
+public class DirectedEdge implements Edge {
+
+ private InPort inPort;
+ private OutPort outPort;
+
+ @Override
+ public InPort getToPort() {
+ return inPort;
+ }
+
+ @Override
+ public void setToPort(InPort inPort) {
+ this.inPort = inPort;
+ }
+
+ @Override
+ public OutPort getFromPort() {
+ return outPort;
+ }
+
+ @Override
+ public void setFromPort(OutPort outPort) {
+ this.outPort = outPort;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java
new file mode 100644
index 0000000..ee11371
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.edge;
+
+import org.apache.airavata.workflow.core.dag.port.InPort;
+import org.apache.airavata.workflow.core.dag.port.OutPort;
+
+/**
+ * Edge is a link to one node to another, basically edge should have outPort of a workflow node ,
+ * which is starting point and inPort of a workflow node, which is end point of the edge.
+ */
+
+public interface Edge {
+
+ public InPort getToPort();
+
+ public void setToPort(InPort inPort);
+
+ public OutPort getFromPort();
+
+ public void setFromPort(OutPort outPort);
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNode.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNode.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNode.java
new file mode 100644
index 0000000..d775bf4
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNode.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+import org.apache.airavata.workflow.core.dag.port.InPort;
+import org.apache.airavata.workflow.core.dag.port.OutPort;
+
+import java.util.List;
+
+public interface ApplicationNode extends WorkflowNode {
+
+ public String getApplicationId();
+
+ public void addInPort(InPort inPort);
+
+ public List<InPort> getInputPorts();
+
+ public void addOutPort(OutPort outPort);
+
+ public List<OutPort> getOutputPorts();
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNodeImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNodeImpl.java
new file mode 100644
index 0000000..ad7bd63
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNodeImpl.java
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+import org.apache.airavata.workflow.core.dag.port.InPort;
+import org.apache.airavata.workflow.core.dag.port.OutPort;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ApplicationNodeImpl implements ApplicationNode {
+
+ private final String nodeId;
+ private NodeState myState = NodeState.WAITING;
+ private String applicationId;
+ private List<InPort> inPorts = new ArrayList<InPort>();
+ private List<OutPort> outPorts = new ArrayList<OutPort>();
+ private String applicationName;
+
+// public ApplicationNodeImpl(String nodeId) {
+// this(nodeId, null);
+// }
+//
+// public ApplicationNodeImpl(String nodeId, String applicationId) {
+// this(nodeId, null, applicationId);
+// }
+
+ public ApplicationNodeImpl(String nodeId, String applicationName, String applicationId) {
+ this.nodeId = nodeId;
+ this.applicationName = applicationName;
+ this.applicationId = applicationId;
+ }
+
+ @Override
+ public String getId() {
+ return this.nodeId;
+ }
+
+ @Override
+ public String getName() {
+ return applicationName;
+ }
+
+ @Override
+ public NodeType getType() {
+ return NodeType.APPLICATION;
+ }
+
+ @Override
+ public NodeState getState() {
+ return myState;
+ }
+
+ @Override
+ public void setState(NodeState newState) {
+ if (newState.getLevel() > myState.getLevel()) {
+ myState = newState;
+ } else {
+ throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString());
+ }
+ }
+
+ @Override
+ public boolean isReady() {
+ for (InPort inPort : getInputPorts()) {
+ if (!inPort.isReady()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String getApplicationId() {
+ return this.applicationId;
+ }
+
+ @Override
+ public void addInPort(InPort inPort) {
+ this.inPorts.add(inPort);
+ }
+
+ @Override
+ public List<InPort> getInputPorts() {
+ return this.inPorts;
+ }
+
+ @Override
+ public void addOutPort(OutPort outPort) {
+ this.outPorts.add(outPort);
+ }
+
+ @Override
+ public List<OutPort> getOutputPorts() {
+ return this.outPorts;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeState.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeState.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeState.java
new file mode 100644
index 0000000..df2c87a
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeState.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+public enum NodeState {
+ WAITING(0), // waiting on inputs
+ READY(1), // all inputs are available and ready to execute
+ QUEUED(2), //
+ PRE_PROCESSING(3), //
+ EXECUTING(4), // task has been submitted , not yet finish
+ EXECUTED(5), // task executed
+ POST_PROCESSING(6), //
+ FAILED(7),
+ COMPLETE(8); // all works done
+
+ private int level;
+
+ NodeState(int level) {
+ this.level = level;
+ }
+
+ public int getLevel() {
+ return level;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeType.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeType.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeType.java
new file mode 100644
index 0000000..04e4c07
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeType.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+public enum NodeType {
+ APPLICATION,
+ WORKFLOW_INPUT,
+ WORKFLOW_OUTPUT
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNode.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNode.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNode.java
new file mode 100644
index 0000000..83bcacf
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNode.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.workflow.core.dag.port.OutPort;
+
+public interface WorkflowInputNode extends WorkflowNode {
+
+ public InputDataObjectType getInputObject();
+
+ public void setInputObject(InputDataObjectType inputObject);
+
+ public OutPort getOutPort();
+
+ public void setOutPort(OutPort outPort);
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNodeImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNodeImpl.java
new file mode 100644
index 0000000..2364b03
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNodeImpl.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.workflow.core.dag.port.OutPort;
+
+public class WorkflowInputNodeImpl implements WorkflowInputNode {
+
+ private NodeState myState = NodeState.READY;
+ private final String nodeId;
+ private String nodeName;
+ private OutPort outPort;
+ private InputDataObjectType inputDataObjectType;
+ private String name;
+
+ public WorkflowInputNodeImpl(String nodeId) {
+ this(nodeId, null);
+ }
+
+ public WorkflowInputNodeImpl(String nodeId, String nodeName) {
+ this.nodeId = nodeId;
+ this.nodeName = nodeName;
+ }
+
+ @Override
+ public String getId() {
+ return this.nodeId;
+ }
+
+ @Override
+ public String getName() {
+ return this.nodeName;
+ }
+
+ @Override
+ public NodeType getType() {
+ return NodeType.WORKFLOW_INPUT;
+ }
+
+ @Override
+ public NodeState getState() {
+ return myState;
+ }
+
+ @Override
+ public void setState(NodeState newState) {
+ if (newState.getLevel() > myState.getLevel()) {
+ myState = newState;
+ } else {
+ throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString());
+ }
+ }
+
+ @Override
+ public boolean isReady() {
+ return (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals(""))
+ || !inputDataObjectType.isIsRequired();
+ }
+
+ @Override
+ public InputDataObjectType getInputObject() {
+ return this.inputDataObjectType;
+ }
+
+ @Override
+ public void setInputObject(InputDataObjectType inputObject) {
+ this.inputDataObjectType = inputObject;
+ }
+
+ @Override
+ public OutPort getOutPort() {
+ return this.outPort;
+ }
+
+ @Override
+ public void setOutPort(OutPort outPort) {
+ this.outPort = outPort;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowNode.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowNode.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowNode.java
new file mode 100644
index 0000000..e86a740
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowNode.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+public interface WorkflowNode {
+
+ public String getId();
+
+ public String getName();
+
+ public NodeType getType();
+
+ public NodeState getState();
+
+ public void setState(NodeState newState);
+
+ public boolean isReady();
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNode.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNode.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNode.java
new file mode 100644
index 0000000..340ac30
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNode.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.workflow.core.dag.port.InPort;
+
+public interface WorkflowOutputNode extends WorkflowNode {
+
+ public OutputDataObjectType getOutputObject();
+
+ public void setOutputObject(OutputDataObjectType outputObject);
+
+ public InPort getInPort();
+
+ public void setInPort(InPort inPort);
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNodeImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNodeImpl.java
new file mode 100644
index 0000000..294109f
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNodeImpl.java
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.workflow.core.dag.port.InPort;
+
+public class WorkflowOutputNodeImpl implements WorkflowOutputNode {
+
+ private NodeState myState = NodeState.WAITING;
+ private final String nodeId;
+ private String nodeName;
+ private OutputDataObjectType outputDataObjectType;
+ private InPort inPort;
+
+ public WorkflowOutputNodeImpl(String nodeId) {
+ this(nodeId, null);
+ }
+
+ public WorkflowOutputNodeImpl(String nodeId, String nodeName) {
+ this.nodeId = nodeId;
+ this.nodeName = nodeName;
+ }
+
+ @Override
+ public String getId() {
+ return this.nodeId;
+ }
+
+ @Override
+ public String getName() {
+ return this.nodeName;
+ }
+
+ @Override
+ public NodeType getType() {
+ return NodeType.WORKFLOW_OUTPUT;
+ }
+
+ @Override
+ public NodeState getState() {
+ return myState;
+ }
+
+ @Override
+ public void setState(NodeState newState) {
+ if (newState.getLevel() > myState.getLevel()) {
+ myState = newState;
+ } else {
+ throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString());
+ }
+ }
+
+ @Override
+ public boolean isReady() {
+ return !(inPort.getInputObject() == null || inPort.getInputObject().getValue() == null
+ || inPort.getInputObject().getValue().equals(""));
+ }
+
+ @Override
+ public OutputDataObjectType getOutputObject() {
+ return this.outputDataObjectType;
+ }
+
+ @Override
+ public void setOutputObject(OutputDataObjectType outputObject) {
+ this.outputDataObjectType = outputObject;
+ }
+
+ @Override
+ public InPort getInPort() {
+ return this.inPort;
+ }
+
+ @Override
+ public void setInPort(InPort inPort) {
+ this.inPort = inPort;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InPort.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InPort.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InPort.java
new file mode 100644
index 0000000..fb8dfa7
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InPort.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.port;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.workflow.core.dag.edge.Edge;
+
+public interface InPort extends Port {
+
+ public void setInputObject(InputDataObjectType inputObject);
+
+ public InputDataObjectType getInputObject();
+
+ public Edge getEdge();
+
+ public void addEdge(Edge edge);
+
+ public String getDefaultValue();
+
+ public void setDefaultValue(String defaultValue);
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InputPortIml.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InputPortIml.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InputPortIml.java
new file mode 100644
index 0000000..43a41be
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InputPortIml.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.airavata.workflow.core.dag.port;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.workflow.core.dag.edge.Edge;
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
+
+public class InputPortIml implements InPort {
+
+ private InputDataObjectType inputDataObjectType;
+ private boolean ready = false;
+ private String portId;
+ private Edge edge;
+ private WorkflowNode node;
+ private String defaultValue;
+
+ public InputPortIml(String portId) {
+ this.portId = portId;
+ }
+
+ @Override
+ public void setInputObject(InputDataObjectType inputObject) {
+ this.inputDataObjectType = inputObject;
+ ready = (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals(""))
+ || !inputDataObjectType.isIsRequired();
+ }
+
+ @Override
+ public InputDataObjectType getInputObject() {
+ return this.inputDataObjectType;
+ }
+
+ @Override
+ public Edge getEdge() {
+ return this.edge;
+ }
+
+ @Override
+ public void addEdge(Edge edge) {
+ this.edge = edge;
+ }
+
+ @Override
+ public String getDefaultValue() {
+ return defaultValue;
+ }
+
+ public void setDefaultValue(String defaultValue) {
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public boolean isReady() {
+ return getInputObject() != null && (!inputDataObjectType.isIsRequired() ||
+ (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals("")));
+ }
+
+ @Override
+ public WorkflowNode getNode() {
+ return this.node;
+ }
+
+ @Override
+ public void setNode(WorkflowNode workflowNode) {
+ this.node = workflowNode;
+ }
+
+ @Override
+ public String getId() {
+ return this.portId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java
new file mode 100644
index 0000000..d9aa004
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.port;
+
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.workflow.core.dag.edge.Edge;
+
+import java.util.List;
+
+public interface OutPort extends Port {
+
+ public void setOutputObject(OutputDataObjectType outputObject);
+
+ public OutputDataObjectType getOutputObject();
+
+ public List<Edge> getOutEdges();
+
+ public void addEdge(Edge edge);
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java
new file mode 100644
index 0000000..9c3f86a
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.port;
+
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.workflow.core.dag.edge.Edge;
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class OutPortImpl implements OutPort {
+
+ private OutputDataObjectType outputDataObjectType;
+ private List<Edge> outEdges = new ArrayList<Edge>();
+ private boolean isSatisfy = false;
+ private String portId;
+ private WorkflowNode node;
+
+ public OutPortImpl(String portId) {
+ this.portId = portId;
+ }
+
+ @Override
+ public void setOutputObject(OutputDataObjectType outputObject) {
+ this.outputDataObjectType = outputObject;
+ }
+
+ @Override
+ public OutputDataObjectType getOutputObject() {
+ return this.outputDataObjectType;
+ }
+
+ @Override
+ public List<Edge> getOutEdges() {
+ return this.outEdges;
+ }
+
+ @Override
+ public void addEdge(Edge edge) {
+ this.outEdges.add(edge);
+ }
+
+ @Override
+ public boolean isReady() {
+ return this.outputDataObjectType.getValue() != null
+ && !this.outputDataObjectType.getValue().equals("");
+ }
+
+ @Override
+ public WorkflowNode getNode() {
+ return this.node;
+ }
+
+ @Override
+ public void setNode(WorkflowNode workflowNode) {
+ this.node = workflowNode;
+ }
+
+ @Override
+ public String getId() {
+ return portId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/Port.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/Port.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/Port.java
new file mode 100644
index 0000000..e3756cf
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/Port.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.port;
+
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
+
+public interface Port {
+
+ public boolean isReady();
+
+ public WorkflowNode getNode();
+
+ public void setNode(WorkflowNode workflowNode);
+
+ public String getId();
+
+}