You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/26 18:08:19 UTC
[20/50] [abbrv] airavata git commit: Fixed AIRAVATA-1591 ,
AIRAVATA-1592 , AIRAVATA-1593
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
index 6dcb8bd..edfa306 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
@@ -21,10 +21,16 @@
package org.apache.airavata.simple.workflow.engine;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
import org.apache.airavata.model.messaging.event.TaskIdentifier;
import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
@@ -45,11 +51,11 @@ import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
import org.apache.airavata.simple.workflow.engine.dag.nodes.NodeState;
import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
-import org.apache.airavata.simple.workflow.engine.parser.AiravataDefaultParser;
import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+import org.apache.airavata.simple.workflow.engine.parser.AiravataWorkflowParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,32 +70,36 @@ import java.util.concurrent.ConcurrentHashMap;
public class SimpleWorkflowInterpreter implements Runnable{
private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class);
-
private List<WorkflowInputNode> workflowInputNodes;
private Experiment experiment;
private String credentialToken;
+ private String gatewayName;
+
private Map<String, WorkflowNode> readList = new ConcurrentHashMap<String, WorkflowNode>();
private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>();
- private Map<String, ProcessPack> processingQueue = new ConcurrentHashMap<String, ProcessPack>();
- private Map<String, ProcessPack> completeList = new HashMap<String, ProcessPack>();
+ private Map<String, ProcessContext> processingQueue = new ConcurrentHashMap<String, ProcessContext>();
+ private Map<String, ProcessContext> completeList = new HashMap<String, ProcessContext>();
private Registry registry;
- private EventBus eventBus = new EventBus();
private List<WorkflowOutputNode> completeWorkflowOutputs = new ArrayList<WorkflowOutputNode>();
+ private RabbitMQProcessPublisher publisher;
+ private RabbitMQStatusConsumer statusConsumer;
+ private String consumerId;
- public SimpleWorkflowInterpreter(String experimentId, String credentialToken) throws RegistryException {
+ public SimpleWorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessPublisher publisher) throws RegistryException {
+ this.gatewayName = gatewayName;
setExperiment(experimentId);
this.credentialToken = credentialToken;
+ this.publisher = publisher;
}
- public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken) {
- // read the workflow file and build the topology to a DAG. Then execute that dag
- // get workflowInputNode list and start processing
- // next() will return ready task and block the thread if no task in ready state.
+ public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessPublisher publisher) {
+ this.gatewayName = gatewayName;
this.experiment = experiment;
this.credentialToken = credentialStoreToken;
+ this.publisher = publisher;
}
@@ -97,11 +107,15 @@ public class SimpleWorkflowInterpreter implements Runnable{
// process workflow input nodes
// WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance();
// WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken);
- WorkflowParser workflowParser = new AiravataDefaultParser(experiment, credentialToken);
+ WorkflowParser workflowParser = new AiravataWorkflowParser(experiment, credentialToken);
log.debug("Initialized workflow parser");
setWorkflowInputNodes(workflowParser.parse());
log.debug("Parsed the workflow and got the workflow input nodes");
processWorkflowInputNodes(getWorkflowInputNodes());
+
+
+ statusConsumer = new RabbitMQStatusConsumer();
+ consumerId = statusConsumer.listen(new TaskMessageHandler());
}
// try to remove synchronization tag
@@ -116,56 +130,31 @@ public class SimpleWorkflowInterpreter implements Runnable{
}
WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
TaskDetails process = getProcess(workflowNodeDetails);
- ProcessPack processPack = new ProcessPack(readyNode, workflowNodeDetails, process);
- addToProcessingQueue(processPack);
-// publishToProcessQueue(process);
- publishToProcessQueue(processPack);
+ ProcessContext processContext = new ProcessContext(readyNode, workflowNodeDetails, process);
+ addToProcessingQueue(processContext);
+ publishToProcessQueue(process);
+// publishToProcessQueue(processPack);
} catch (RegistryException e) {
// FIXME : handle this exception
+ } catch (AiravataException e) {
+ log.error("Error while publishing process to the process queue");
}
}
}
- private void publishToProcessQueue(TaskDetails process) {
- Thread thread = new Thread(new TempPublisher(process, eventBus));
- thread.start();
- //TODO: publish to process queue.
- }
+ private void publishToProcessQueue(TaskDetails process) throws AiravataException {
+ ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
+ processSubmitEvent.setCredentialToken(credentialToken);
+ processSubmitEvent.setTaskId(process.getTaskID());
+ MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.TASK, process.getTaskID(), null);
+ messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ publisher.publish(messageContext);
- // TODO : remove this test method
- private void publishToProcessQueue(ProcessPack process) {
- WorkflowNode workflowNode = process.getWorkflowNode();
- if (workflowNode instanceof ApplicationNode) {
- ApplicationNode applicationNode = (ApplicationNode) workflowNode;
- List<InPort> inputPorts = applicationNode.getInputPorts();
- if (applicationNode.getName().equals("Add")) {
- applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
- Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) + Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
- } else if (applicationNode.getName().equals("Multiply")) {
- applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
- Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) * Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
- } else if (applicationNode.getName().equals("Subtract")) {
- applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
- Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) - Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
- } else {
- throw new RuntimeException("Invalid Application name");
- }
- for (Edge edge : applicationNode.getOutputPorts().get(0).getOutEdges()) {
- WorkflowUtil.copyValues(applicationNode.getOutputPorts().get(0).getOutputObject(), edge.getToPort().getInputObject());
- if (edge.getToPort().getNode().isReady()) {
- addToReadyQueue(edge.getToPort().getNode());
- } else {
- addToWaitingQueue(edge.getToPort().getNode());
- }
- }
- } else if (workflowNode instanceof WorkflowOutputNode) {
- WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) workflowNode;
- throw new RuntimeException("Workflow output node in processing queue");
- }
-
- processingQueue.remove(process.getTaskDetails().getTaskID());
+// Thread thread = new Thread(new TempPublisher(process, eventBus));
+// thread.start();
+ //TODO: publish to process queue.
}
private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException {
@@ -242,12 +231,6 @@ public class SimpleWorkflowInterpreter implements Runnable{
this.workflowInputNodes = workflowInputNodes;
}
-
- private List<WorkflowInputNode> parseWorkflowDescription(){
- return null;
- }
-
-
private Registry getRegistry() throws RegistryException {
if (registry==null){
registry = RegistryFactory.getDefaultRegistry();
@@ -265,93 +248,6 @@ public class SimpleWorkflowInterpreter implements Runnable{
getRegistry().update(RegistryModelType.WORKFLOW_NODE_STATUS, status, wfNodeDetails.getNodeInstanceId());
}
- @Subscribe
- public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent){
- String taskId = taskOutputEvent.getTaskIdentity().getTaskId();
- log.debug("Task Output changed event received for workflow node : " +
- taskOutputEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
- ProcessPack processPack = processingQueue.get(taskId);
- Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
- if (processPack != null) {
- WorkflowNode workflowNode = processPack.getWorkflowNode();
- if (workflowNode instanceof ApplicationNode) {
- ApplicationNode applicationNode = (ApplicationNode) workflowNode;
- // Workflow node can have one to many output ports and each output port can have one to many links
- for (OutPort outPort : applicationNode.getOutputPorts()) {
- for (OutputDataObjectType outputDataObjectType : taskOutputEvent.getOutput()) {
- if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
- outPort.getOutputObject().setValue(outputDataObjectType.getValue());
- break;
- }
- }
- for (Edge edge : outPort.getOutEdges()) {
- WorkflowUtil.copyValues(outPort.getOutputObject(), edge.getToPort().getInputObject());
- if (edge.getToPort().getNode().isReady()) {
- addToReadyQueue(edge.getToPort().getNode());
- }
- }
- }
- }
- processingQueue.remove(taskId);
- log.debug("removed task from processing queue : " + taskId);
- }
-
- }
-
- @Subscribe
- public void taskStatusChanged(TaskStatusChangeEvent taskStatus){
- String taskId = taskStatus.getTaskIdentity().getTaskId();
- ProcessPack processPack = processingQueue.get(taskId);
- if (processPack != null) {
- WorkflowNodeState wfNodeState = WorkflowNodeState.UNKNOWN;
- switch (taskStatus.getState()) {
- case WAITING:
- break;
- case STARTED:
- break;
- case PRE_PROCESSING:
- processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
- break;
- case INPUT_DATA_STAGING:
- processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
- break;
- case EXECUTING:
- processPack.getWorkflowNode().setState(NodeState.EXECUTING);
- break;
- case OUTPUT_DATA_STAGING:
- processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING);
- break;
- case POST_PROCESSING:
- processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING);
- break;
- case COMPLETED:
- processPack.getWorkflowNode().setState(NodeState.EXECUTED);
- break;
- case FAILED:
- processPack.getWorkflowNode().setState(NodeState.FAILED);
- break;
- case UNKNOWN:
- break;
- case CONFIGURING_WORKSPACE:
- break;
- case CANCELED:
- case CANCELING:
- processPack.getWorkflowNode().setState(NodeState.FAILED);
- break;
- default:
- break;
- }
- if (wfNodeState != WorkflowNodeState.UNKNOWN) {
- try {
- updateWorkflowNodeStatus(processPack.getWfNodeDetails(), wfNodeState);
- } catch (RegistryException e) {
- // TODO: handle this.
- }
- }
- }
-
- }
-
/**
* Remove the workflow node from waiting queue and add it to the ready queue.
* @param workflowNode - Workflow Node
@@ -368,16 +264,16 @@ public class SimpleWorkflowInterpreter implements Runnable{
/**
* First remove the node from ready list and then add the WfNodeContainer to the process queue.
* Note that underline data structure of the process queue is a Map.
- * @param processPack - has both workflow and correspond workflowNodeDetails and TaskDetails
+ * @param processContext - has both workflow and correspond workflowNodeDetails and TaskDetails
*/
- private synchronized void addToProcessingQueue(ProcessPack processPack) {
- readList.remove(processPack.getWorkflowNode().getId());
- processingQueue.put(processPack.getTaskDetails().getTaskID(), processPack);
+ private synchronized void addToProcessingQueue(ProcessContext processContext) {
+ readList.remove(processContext.getWorkflowNode().getId());
+ processingQueue.put(processContext.getTaskDetails().getTaskID(), processContext);
}
- private synchronized void addToCompleteQueue(ProcessPack processPack) {
- processingQueue.remove(processPack.getTaskDetails().getTaskID());
- completeList.put(processPack.getTaskDetails().getTaskID(), processPack);
+ private synchronized void addToCompleteQueue(ProcessContext processContext) {
+ processingQueue.remove(processContext.getTaskDetails().getTaskID());
+ completeList.put(processContext.getTaskDetails().getTaskID(), processContext);
}
@@ -388,7 +284,6 @@ public class SimpleWorkflowInterpreter implements Runnable{
@Override
public void run() {
- // TODO: Auto generated method body.
try {
log.debug("Launching workflow");
launchWorkflow();
@@ -396,8 +291,11 @@ public class SimpleWorkflowInterpreter implements Runnable{
processReadyList();
Thread.sleep(1000);
}
+ log.info("Successfully launched workflow for experiment : " + getExperiment().getExperimentID());
+ statusConsumer.stopListen(consumerId);
+ log.info("Successfully un-bind status consumer for experiment " + getExperiment().getExperimentID());
} catch (Exception e) {
- e.printStackTrace();
+ //TODO - handle this.
}
}
@@ -406,65 +304,129 @@ public class SimpleWorkflowInterpreter implements Runnable{
log.debug("Retrieve Experiment for experiment id : " + experimentId);
}
+ class TaskMessageHandler implements MessageHandler{
- class TempPublisher implements Runnable {
- private TaskDetails tempTaskDetails;
- private EventBus tempEventBus;
-
- public TempPublisher(TaskDetails tempTaskDetails, EventBus tempEventBus) {
- this.tempTaskDetails = tempTaskDetails;
- this.tempEventBus = tempEventBus;
+ @Override
+ public Map<String, Object> getProperties() {
+ Map<String, Object> props = new HashMap<String, Object>();
+ String gatewayId = "*";
+ String experimentId = getExperiment().getExperimentID();
+ List<String> routingKeys = new ArrayList<String>();
+// routingKeys.add(gatewayName+ "." + getExperiment().getExperimentID() + ".*");
+ routingKeys.add(gatewayId);
+ routingKeys.add(gatewayId + "." + experimentId);
+ routingKeys.add(gatewayId + "." + experimentId+ ".*");
+ routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
+ props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
+ return props;
}
@Override
- public void run() {
- try {
- TaskIdentifier identifier = new TaskIdentifier(tempTaskDetails.getTaskID(), null, null, null);
- TaskStatusChangeEvent statusChangeEvent = new TaskStatusChangeEvent(TaskState.PRE_PROCESSING, identifier);
- tempEventBus.post(statusChangeEvent);
- Thread.sleep(1000);
- statusChangeEvent = new TaskStatusChangeEvent(TaskState.WAITING, identifier);
- tempEventBus.post(statusChangeEvent);
- Thread.sleep(1000);
- statusChangeEvent = new TaskStatusChangeEvent(TaskState.INPUT_DATA_STAGING, identifier);
- tempEventBus.post(statusChangeEvent);
- Thread.sleep(1000);
- statusChangeEvent = new TaskStatusChangeEvent(TaskState.STARTED, identifier);
- tempEventBus.post(statusChangeEvent);
- Thread.sleep(1000);
- statusChangeEvent = new TaskStatusChangeEvent(TaskState.EXECUTING, identifier);
- tempEventBus.post(statusChangeEvent);
- Thread.sleep(1000);
- statusChangeEvent = new TaskStatusChangeEvent(TaskState.POST_PROCESSING, identifier);
- tempEventBus.post(statusChangeEvent);
- Thread.sleep(1000);
- statusChangeEvent = new TaskStatusChangeEvent(TaskState.OUTPUT_DATA_STAGING, identifier);
- tempEventBus.post(statusChangeEvent);
- Thread.sleep(1000);
- statusChangeEvent = new TaskStatusChangeEvent(TaskState.COMPLETED, identifier);
- tempEventBus.post(statusChangeEvent);
- Thread.sleep(1000);
+ public void onMessage(MessageContext msgCtx) {
+ String message;
+ if (msgCtx.getType() == MessageType.TASK) {
+ TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
+ TaskIdentifier taskIdentifier = event.getTaskIdentity();
+ handleTaskStatusChangeEvent(event);
+ message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId();
+ log.debug(message);
+ }else if (msgCtx.getType() == MessageType.TASKOUTPUT) {
+ TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
+ TaskIdentifier taskIdentifier = event.getTaskIdentity();
+ handleTaskOutputChangeEvent(event);
+ message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId();
+ log.debug(message);
+ } else {
+ // not interesting, ignores
+ }
+ }
- List<InputDataObjectType> applicationInputs = tempTaskDetails.getApplicationInputs();
- List<OutputDataObjectType> applicationOutputs = tempTaskDetails.getApplicationOutputs();
- log.info("************** Task output change event fired for application id :" + tempTaskDetails.getApplicationId());
- if (tempTaskDetails.getApplicationId().equals("Add") || tempTaskDetails.getApplicationId().equals("Add_2")) {
- applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) +
- Integer.parseInt(applicationInputs.get(1).getValue())) + "");
- } else if (tempTaskDetails.getApplicationId().equals("Subtract")) {
- applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) -
- Integer.parseInt(applicationInputs.get(1).getValue())) + "");
- } else if (tempTaskDetails.getApplicationId().equals("Multiply")) {
- applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) *
- Integer.parseInt(applicationInputs.get(1).getValue())) + "");
+ private void handleTaskOutputChangeEvent(TaskOutputChangeEvent taskOutputChangeEvent) {
+
+ String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId();
+ log.debug("Task Output changed event received for workflow node : " +
+ taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
+ ProcessContext processContext = processingQueue.get(taskId);
+ Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
+ if (processContext != null) {
+ WorkflowNode workflowNode = processContext.getWorkflowNode();
+ if (workflowNode instanceof ApplicationNode) {
+ ApplicationNode applicationNode = (ApplicationNode) workflowNode;
+ // Workflow node can have one to many output ports and each output port can have one to many links
+ for (OutPort outPort : applicationNode.getOutputPorts()) {
+ for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) {
+ if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
+ outPort.getOutputObject().setValue(outputDataObjectType.getValue());
+ break;
+ }
+ }
+ for (Edge edge : outPort.getOutEdges()) {
+ edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue());
+ if (edge.getToPort().getNode().isReady()) {
+ addToReadyQueue(edge.getToPort().getNode());
+ }
+ }
+ }
}
- TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent(applicationOutputs, identifier);
- eventBus.post(taskOutputChangeEvent);
+ addToCompleteQueue(processContext);
+ log.debug("removed task from processing queue : " + taskId);
+ }
+ }
- } catch (InterruptedException e) {
- log.error("Thread was interrupted while sleeping");
+ private void handleTaskStatusChangeEvent(TaskStatusChangeEvent taskStatusChangeEvent) {
+ TaskState taskState = taskStatusChangeEvent.getState();
+ TaskIdentifier taskIdentity = taskStatusChangeEvent.getTaskIdentity();
+ String taskId = taskIdentity.getTaskId();
+ ProcessContext processContext = processingQueue.get(taskId);
+ if (processContext != null) {
+ WorkflowNodeState wfNodeState = WorkflowNodeState.UNKNOWN;
+ switch (taskState) {
+ case WAITING:
+ break;
+ case STARTED:
+ break;
+ case PRE_PROCESSING:
+ processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
+ break;
+ case INPUT_DATA_STAGING:
+ processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
+ break;
+ case EXECUTING:
+ processContext.getWorkflowNode().setState(NodeState.EXECUTING);
+ break;
+ case OUTPUT_DATA_STAGING:
+ processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+ break;
+ case POST_PROCESSING:
+ processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+ break;
+ case COMPLETED:
+ processContext.getWorkflowNode().setState(NodeState.EXECUTED);
+ break;
+ case FAILED:
+ processContext.getWorkflowNode().setState(NodeState.FAILED);
+ break;
+ case UNKNOWN:
+ break;
+ case CONFIGURING_WORKSPACE:
+ break;
+ case CANCELED:
+ case CANCELING:
+ processContext.getWorkflowNode().setState(NodeState.FAILED);
+ break;
+ default:
+ break;
+ }
+ if (wfNodeState != WorkflowNodeState.UNKNOWN) {
+ try {
+ updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState);
+ } catch (RegistryException e) {
+ // TODO: handle this.
+ }
+ }
}
}
}
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
index 116a10d..b12260d 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
@@ -22,7 +22,7 @@
package org.apache.airavata.simple.workflow.engine;
import org.apache.airavata.registry.cpi.RegistryException;
-import org.apache.airavata.simple.workflow.engine.parser.AiravataDefaultParser;
+import org.apache.airavata.simple.workflow.engine.parser.AiravataWorkflowParser;
/**
* Singleton class, only one instance can exist in runtime.
@@ -55,7 +55,7 @@ public class WorkflowFactoryImpl implements WorkflowFactory {
public WorkflowParser getWorkflowParser(String experimentId, String credentialToken) {
if (workflowParser == null) {
try {
- workflowParser = new AiravataDefaultParser(experimentId, credentialToken);
+ workflowParser = new AiravataWorkflowParser(experimentId, credentialToken);
} catch (RegistryException e) {
// TODO : handle this scenario
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParser.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParser.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParser.java
deleted file mode 100644
index 644eda6..0000000
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParser.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.simple.workflow.engine.parser;
-
-import org.airavata.appcatalog.cpi.AppCatalogException;
-import org.airavata.appcatalog.cpi.WorkflowCatalog;
-import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.workspace.experiment.Experiment;
-import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.Registry;
-import org.apache.airavata.registry.cpi.RegistryException;
-import org.apache.airavata.registry.cpi.RegistryModelType;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNodeImpl;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNodeImpl;
-import org.apache.airavata.simple.workflow.engine.dag.port.OutPortImpl;
-import org.apache.airavata.workflow.model.component.ComponentException;
-import org.apache.airavata.workflow.model.component.system.ConstantComponent;
-import org.apache.airavata.workflow.model.component.system.InputComponent;
-import org.apache.airavata.workflow.model.component.system.S3InputComponent;
-import org.apache.airavata.workflow.model.graph.DataEdge;
-import org.apache.airavata.workflow.model.graph.DataPort;
-import org.apache.airavata.workflow.model.graph.GraphException;
-import org.apache.airavata.workflow.model.graph.Node;
-import org.apache.airavata.workflow.model.graph.impl.NodeImpl;
-import org.apache.airavata.workflow.model.graph.system.OutputNode;
-import org.apache.airavata.workflow.model.graph.system.SystemDataPort;
-import org.apache.airavata.workflow.model.graph.ws.WSNode;
-import org.apache.airavata.workflow.model.graph.ws.WSPort;
-import org.apache.airavata.workflow.model.wf.Workflow;
-import org.apache.airavata.simple.workflow.engine.WorkflowParser;
-import org.apache.airavata.simple.workflow.engine.dag.edge.DirectedEdge;
-import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNodeImpl;
-import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
-import org.apache.airavata.simple.workflow.engine.dag.port.InputPortIml;
-import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class AiravataDefaultParser implements WorkflowParser {
-
- private String credentialToken ;
- private Workflow workflow;
-
-
- private Experiment experiment;
- private Map<String, WorkflowNode> wfNodes = new HashMap<String, WorkflowNode>();
-
-
- public AiravataDefaultParser(String experimentId, String credentialToken) throws RegistryException {
- this.experiment = getExperiment(experimentId);
- this.credentialToken = credentialToken;
- }
-
- public AiravataDefaultParser(Experiment experiment, String credentialToken) {
- this.credentialToken = credentialToken;
- this.experiment = experiment;
- }
-
- @Override
- public List<WorkflowInputNode> parse() throws RegistryException, AppCatalogException,
- ComponentException, GraphException {
- return parseWorkflow(getWorkflowFromExperiment(experiment));
- }
-
- public List<WorkflowInputNode> parseWorkflow(Workflow workflow) {
- List<Node> gNodes = getInputNodes(workflow);
- List<WorkflowInputNode> wfInputNodes = new ArrayList<WorkflowInputNode>();
- List<PortContainer> portContainers = new ArrayList<PortContainer>();
- List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs();
- Map<String,InputDataObjectType> inputDataMap=new HashMap<String, InputDataObjectType>();
- WorkflowInputNode wfInputNode = null;
- for (InputDataObjectType dataObjectType : experimentInputs) {
- inputDataMap.put(dataObjectType.getName(), dataObjectType);
- }
- for (Node gNode : gNodes) {
- wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName());
- wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getName()));
- if (wfInputNode.getInputObject() == null) {
- // TODO: throw an error and exit.
- }
- portContainers.addAll(processOutPorts(gNode, wfInputNode));
- wfInputNodes.add(wfInputNode);
- }
-
- // while port container is not empty iterate graph and build the workflow DAG.
- buildModel(portContainers);
-
- return wfInputNodes;
- }
-
- private void buildModel(List<PortContainer> portContainerList) {
- // end condition of recursive call.
- if (portContainerList == null || portContainerList.isEmpty()) {
- return ;
- }
- DataPort dataPort = null;
- InPort inPort = null;
- ApplicationNode wfApplicationNode = null;
- WorkflowOutputNode wfOutputNode = null;
- List<PortContainer> nextPortContainerList = new ArrayList<PortContainer>();
- for (PortContainer portContainer : portContainerList) {
- dataPort = portContainer.getDataPort();
- inPort = portContainer.getInPort();
- Node node = dataPort.getNode();
- if (node instanceof WSNode) {
- WSNode wsNode = (WSNode) node;
- WorkflowNode wfNode = wfNodes.get(wsNode.getID());
- if (wfNode == null) {
- wfApplicationNode = createApplicationNode(wsNode);
- wfNodes.put(wfApplicationNode.getId(), wfApplicationNode);
- nextPortContainerList.addAll(processOutPorts(wsNode, wfApplicationNode));
- } else if (wfNode instanceof ApplicationNode) {
- wfApplicationNode = (ApplicationNode) wfNode;
- } else {
- // TODO : handle this scenario
- }
- inPort.setNode(wfApplicationNode);
- wfApplicationNode.addInPort(inPort);
-
- }else if (node instanceof OutputNode) {
- OutputNode oNode = (OutputNode) node;
- wfOutputNode = createWorkflowOutputNode(oNode);
- wfOutputNode.setInPort(inPort);
- inPort.setNode(wfOutputNode);
- wfNodes.put(wfOutputNode.getId(), wfOutputNode);
- }
- }
- buildModel(nextPortContainerList);
-
- }
-
- private WorkflowOutputNode createWorkflowOutputNode(OutputNode oNode) {
- WorkflowOutputNodeImpl workflowOutputNode = new WorkflowOutputNodeImpl(oNode.getID(), oNode.getName());
- OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
- outputDataObjectType.setType(oNode.getParameterType());
- workflowOutputNode.setOutputObject(outputDataObjectType);
- return workflowOutputNode;
- }
-
- private ApplicationNode createApplicationNode(WSNode wsNode) {
- ApplicationNode applicationNode = new ApplicationNodeImpl(wsNode.getID(),
- wsNode.getComponent().getApplication().getName(),
- wsNode.getComponent().getApplication().getApplicationId());
- return applicationNode;
- }
-
- private List<PortContainer> processOutPorts(Node node, WorkflowNode wfNode) {
- OutPort outPort ;
- Edge edge;
- InPort inPort = null;
- List<PortContainer> portContainers = new ArrayList<PortContainer>();
- for (DataPort dataPort : node.getOutputPorts()) {
- outPort = createOutPort(dataPort);
- for (DataEdge dataEdge : dataPort.getEdges()) {
- edge = new DirectedEdge();
- edge.setFromPort(outPort);
- outPort.addEdge(edge);
- inPort = createInPort(dataEdge.getToPort());
- edge.setToPort(inPort);
- inPort.addEdge(edge);
- portContainers.add(new PortContainer(dataEdge.getToPort(), inPort));
- }
- outPort.setNode(wfNode);
- if (wfNode instanceof WorkflowInputNode) {
- WorkflowInputNode workflowInputNode = (WorkflowInputNode) wfNode;
- workflowInputNode.setOutPort(outPort);
- } else if (wfNode instanceof ApplicationNode) {
- ApplicationNode applicationNode = ((ApplicationNode) wfNode);
- applicationNode.addOutPort(outPort);
- }
- }
- return portContainers;
- }
-
- private OutPort createOutPort(DataPort dataPort) {
- OutPortImpl outPort = new OutPortImpl(dataPort.getID());
- OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
- if (dataPort instanceof WSPort) {
- WSPort wsPort = (WSPort) dataPort;
- outputDataObjectType.setName(wsPort.getFromNode().getName());
- outputDataObjectType.setType(wsPort.getType());
- }else if (dataPort instanceof SystemDataPort) {
- SystemDataPort sysPort = (SystemDataPort) dataPort;
- outputDataObjectType.setName(sysPort.getFromNode().getName());
- outputDataObjectType.setType(sysPort.getType());
- }
-
- outPort.setOutputObject(outputDataObjectType);
- return outPort;
- }
-
- private InPort createInPort(DataPort toPort) {
- InPort inPort = new InputPortIml(toPort.getID());
- InputDataObjectType inputDataObjectType = new InputDataObjectType();
- if (toPort instanceof WSPort) {
- WSPort wsPort = (WSPort) toPort;
- inputDataObjectType.setName(wsPort.getName());
- inputDataObjectType.setType(wsPort.getType());
- inputDataObjectType.setApplicationArgument(wsPort.getComponentPort().getApplicationArgument());
- inputDataObjectType.setIsRequired(!wsPort.getComponentPort().isOptional());
- inputDataObjectType.setInputOrder(wsPort.getComponentPort().getInputOrder());
-
- inPort.setDefaultValue(wsPort.getComponentPort().getDefaultValue());
- }else if (toPort instanceof SystemDataPort) {
- SystemDataPort sysPort = (SystemDataPort) toPort;
- inputDataObjectType.setName(sysPort.getName());
- inputDataObjectType.setType(sysPort.getType());
- }
- inPort.setInputObject(inputDataObjectType);
- return inPort;
- }
-
- private InputDataObjectType getInputDataObject(DataPort dataPort) {
- InputDataObjectType inputDataObject = new InputDataObjectType();
- inputDataObject.setName(dataPort.getName());
- if (dataPort instanceof WSPort) {
- WSPort port = (WSPort) dataPort;
- inputDataObject.setInputOrder(port.getComponentPort().getInputOrder());
- inputDataObject.setApplicationArgument(port.getComponentPort().getApplicationArgument() == null ?
- "" : port.getComponentPort().getApplicationArgument());
- inputDataObject.setType(dataPort.getType());
- }
- return inputDataObject;
- }
-
- private OutputDataObjectType getOutputDataObject(InputDataObjectType inputObject) {
- OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
- outputDataObjectType.setApplicationArgument(inputObject.getApplicationArgument());
- outputDataObjectType.setName(inputObject.getName());
- outputDataObjectType.setType(inputObject.getType());
- outputDataObjectType.setValue(inputObject.getValue());
- return outputDataObjectType;
- }
-
- private Experiment getExperiment(String experimentId) throws RegistryException {
- Registry registry = RegistryFactory.getDefaultRegistry();
- return (Experiment)registry.get(RegistryModelType.EXPERIMENT, experimentId);
- }
-
- private Workflow getWorkflowFromExperiment(Experiment experiment) throws RegistryException, AppCatalogException, GraphException, ComponentException {
- WorkflowCatalog workflowCatalog = getWorkflowCatalog();
- return new Workflow(workflowCatalog.getWorkflow(experiment.getApplicationId()).getGraph());
- }
-
- private WorkflowCatalog getWorkflowCatalog() throws AppCatalogException {
- return AppCatalogFactory.getAppCatalog().getWorkflowCatalog();
- }
-
- private ArrayList<Node> getInputNodes(Workflow wf) {
- ArrayList<Node> list = new ArrayList<Node>();
- List<NodeImpl> nodes = wf.getGraph().getNodes();
- for (Node node : nodes) {
- String name = node.getComponent().getName();
- if (InputComponent.NAME.equals(name) || ConstantComponent.NAME.equals(name) || S3InputComponent.NAME.equals(name)) {
- list.add(node);
- }
- }
- return list;
- }
-
- public Map<String, WorkflowNode> getWfNodes() {
- return wfNodes;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
new file mode 100644
index 0000000..673fbdc
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
@@ -0,0 +1,291 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.parser;
+
+import org.airavata.appcatalog.cpi.AppCatalogException;
+import org.airavata.appcatalog.cpi.WorkflowCatalog;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNodeImpl;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNodeImpl;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPortImpl;
+import org.apache.airavata.workflow.model.component.ComponentException;
+import org.apache.airavata.workflow.model.component.system.ConstantComponent;
+import org.apache.airavata.workflow.model.component.system.InputComponent;
+import org.apache.airavata.workflow.model.component.system.S3InputComponent;
+import org.apache.airavata.workflow.model.graph.DataEdge;
+import org.apache.airavata.workflow.model.graph.DataPort;
+import org.apache.airavata.workflow.model.graph.GraphException;
+import org.apache.airavata.workflow.model.graph.Node;
+import org.apache.airavata.workflow.model.graph.impl.NodeImpl;
+import org.apache.airavata.workflow.model.graph.system.OutputNode;
+import org.apache.airavata.workflow.model.graph.system.SystemDataPort;
+import org.apache.airavata.workflow.model.graph.ws.WSNode;
+import org.apache.airavata.workflow.model.graph.ws.WSPort;
+import org.apache.airavata.workflow.model.wf.Workflow;
+import org.apache.airavata.simple.workflow.engine.WorkflowParser;
+import org.apache.airavata.simple.workflow.engine.dag.edge.DirectedEdge;
+import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNodeImpl;
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+import org.apache.airavata.simple.workflow.engine.dag.port.InputPortIml;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AiravataWorkflowParser implements WorkflowParser {
+
+ private String credentialToken ;
+
+ private Experiment experiment;
+ private Map<String, WorkflowNode> wfNodes = new HashMap<String, WorkflowNode>();
+
+
+ public AiravataWorkflowParser(String experimentId, String credentialToken) throws RegistryException {
+ this.experiment = getExperiment(experimentId);
+ this.credentialToken = credentialToken;
+ }
+
+ public AiravataWorkflowParser(Experiment experiment, String credentialToken) {
+ this.credentialToken = credentialToken;
+ this.experiment = experiment;
+ }
+
+ @Override
+ public List<WorkflowInputNode> parse() throws RegistryException, AppCatalogException,
+ ComponentException, GraphException {
+ return parseWorkflow(getWorkflowFromExperiment(experiment));
+ }
+
+ public List<WorkflowInputNode> parseWorkflow(Workflow workflow) {
+ List<Node> gNodes = getInputNodes(workflow);
+ List<WorkflowInputNode> wfInputNodes = new ArrayList<WorkflowInputNode>();
+ List<PortContainer> portContainers = new ArrayList<PortContainer>();
+ List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs();
+ Map<String,InputDataObjectType> inputDataMap=new HashMap<String, InputDataObjectType>();
+ WorkflowInputNode wfInputNode = null;
+ for (InputDataObjectType dataObjectType : experimentInputs) {
+ inputDataMap.put(dataObjectType.getName(), dataObjectType);
+ }
+ for (Node gNode : gNodes) {
+ wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName());
+ wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getName()));
+ if (wfInputNode.getInputObject() == null) {
+ // TODO: throw an error and exit.
+ }
+ portContainers.addAll(processOutPorts(gNode, wfInputNode));
+ wfInputNodes.add(wfInputNode);
+ }
+
+ // while port container is not empty iterate graph and build the workflow DAG.
+ buildModel(portContainers);
+
+ return wfInputNodes;
+ }
+
+ private void buildModel(List<PortContainer> portContainerList) {
+ // end condition of recursive call.
+ if (portContainerList == null || portContainerList.isEmpty()) {
+ return ;
+ }
+ DataPort dataPort = null;
+ InPort inPort = null;
+ ApplicationNode wfApplicationNode = null;
+ WorkflowOutputNode wfOutputNode = null;
+ List<PortContainer> nextPortContainerList = new ArrayList<PortContainer>();
+ for (PortContainer portContainer : portContainerList) {
+ dataPort = portContainer.getDataPort();
+ inPort = portContainer.getInPort();
+ Node node = dataPort.getNode();
+ if (node instanceof WSNode) {
+ WSNode wsNode = (WSNode) node;
+ WorkflowNode wfNode = wfNodes.get(wsNode.getID());
+ if (wfNode == null) {
+ wfApplicationNode = createApplicationNode(wsNode);
+ wfNodes.put(wfApplicationNode.getId(), wfApplicationNode);
+ nextPortContainerList.addAll(processOutPorts(wsNode, wfApplicationNode));
+ } else if (wfNode instanceof ApplicationNode) {
+ wfApplicationNode = (ApplicationNode) wfNode;
+ } else {
+ // TODO : handle this scenario
+ }
+ inPort.setNode(wfApplicationNode);
+ wfApplicationNode.addInPort(inPort);
+
+ }else if (node instanceof OutputNode) {
+ OutputNode oNode = (OutputNode) node;
+ wfOutputNode = createWorkflowOutputNode(oNode);
+ wfOutputNode.setInPort(inPort);
+ inPort.setNode(wfOutputNode);
+ wfNodes.put(wfOutputNode.getId(), wfOutputNode);
+ }
+ }
+ buildModel(nextPortContainerList);
+
+ }
+
+ private WorkflowOutputNode createWorkflowOutputNode(OutputNode oNode) {
+ WorkflowOutputNodeImpl workflowOutputNode = new WorkflowOutputNodeImpl(oNode.getID(), oNode.getName());
+ OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
+ outputDataObjectType.setType(oNode.getParameterType());
+ workflowOutputNode.setOutputObject(outputDataObjectType);
+ return workflowOutputNode;
+ }
+
+ private ApplicationNode createApplicationNode(WSNode wsNode) {
+ ApplicationNode applicationNode = new ApplicationNodeImpl(wsNode.getID(),
+ wsNode.getComponent().getApplication().getName(),
+ wsNode.getComponent().getApplication().getApplicationId());
+ return applicationNode;
+ }
+
+ private List<PortContainer> processOutPorts(Node node, WorkflowNode wfNode) {
+ OutPort outPort ;
+ Edge edge;
+ InPort inPort = null;
+ List<PortContainer> portContainers = new ArrayList<PortContainer>();
+ for (DataPort dataPort : node.getOutputPorts()) {
+ outPort = createOutPort(dataPort);
+ for (DataEdge dataEdge : dataPort.getEdges()) {
+ edge = new DirectedEdge();
+ edge.setFromPort(outPort);
+ outPort.addEdge(edge);
+ inPort = createInPort(dataEdge.getToPort());
+ edge.setToPort(inPort);
+ inPort.addEdge(edge);
+ portContainers.add(new PortContainer(dataEdge.getToPort(), inPort));
+ }
+ outPort.setNode(wfNode);
+ if (wfNode instanceof WorkflowInputNode) {
+ WorkflowInputNode workflowInputNode = (WorkflowInputNode) wfNode;
+ workflowInputNode.setOutPort(outPort);
+ } else if (wfNode instanceof ApplicationNode) {
+ ApplicationNode applicationNode = ((ApplicationNode) wfNode);
+ applicationNode.addOutPort(outPort);
+ }
+ }
+ return portContainers;
+ }
+
+ private OutPort createOutPort(DataPort dataPort) {
+ OutPortImpl outPort = new OutPortImpl(dataPort.getID());
+ OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
+ if (dataPort instanceof WSPort) {
+ WSPort wsPort = (WSPort) dataPort;
+ outputDataObjectType.setName(wsPort.getComponentPort().getName());
+ outputDataObjectType.setType(wsPort.getType());
+ }else if (dataPort instanceof SystemDataPort) {
+ SystemDataPort sysPort = (SystemDataPort) dataPort;
+ outputDataObjectType.setName(sysPort.getFromNode().getName());
+ outputDataObjectType.setType(sysPort.getType());
+ }
+
+ outPort.setOutputObject(outputDataObjectType);
+ return outPort;
+ }
+
+ private InPort createInPort(DataPort toPort) {
+ InPort inPort = new InputPortIml(toPort.getID());
+ InputDataObjectType inputDataObjectType = new InputDataObjectType();
+ if (toPort instanceof WSPort) {
+ WSPort wsPort = (WSPort) toPort;
+ inputDataObjectType.setName(wsPort.getName());
+ inputDataObjectType.setType(wsPort.getType());
+ inputDataObjectType.setApplicationArgument(wsPort.getComponentPort().getApplicationArgument());
+ inputDataObjectType.setIsRequired(!wsPort.getComponentPort().isOptional());
+ inputDataObjectType.setInputOrder(wsPort.getComponentPort().getInputOrder());
+
+ inPort.setDefaultValue(wsPort.getComponentPort().getDefaultValue());
+ }else if (toPort instanceof SystemDataPort) {
+ SystemDataPort sysPort = (SystemDataPort) toPort;
+ inputDataObjectType.setName(sysPort.getName());
+ inputDataObjectType.setType(sysPort.getType());
+ }
+ inPort.setInputObject(inputDataObjectType);
+ return inPort;
+ }
+
+ private InputDataObjectType getInputDataObject(DataPort dataPort) {
+ InputDataObjectType inputDataObject = new InputDataObjectType();
+ inputDataObject.setName(dataPort.getName());
+ if (dataPort instanceof WSPort) {
+ WSPort port = (WSPort) dataPort;
+ inputDataObject.setInputOrder(port.getComponentPort().getInputOrder());
+ inputDataObject.setApplicationArgument(port.getComponentPort().getApplicationArgument() == null ?
+ "" : port.getComponentPort().getApplicationArgument());
+ inputDataObject.setType(dataPort.getType());
+ }
+ return inputDataObject;
+ }
+
+ private OutputDataObjectType getOutputDataObject(InputDataObjectType inputObject) {
+ OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
+ outputDataObjectType.setApplicationArgument(inputObject.getApplicationArgument());
+ outputDataObjectType.setName(inputObject.getName());
+ outputDataObjectType.setType(inputObject.getType());
+ outputDataObjectType.setValue(inputObject.getValue());
+ return outputDataObjectType;
+ }
+
+ private Experiment getExperiment(String experimentId) throws RegistryException {
+ Registry registry = RegistryFactory.getDefaultRegistry();
+ return (Experiment)registry.get(RegistryModelType.EXPERIMENT, experimentId);
+ }
+
+ private Workflow getWorkflowFromExperiment(Experiment experiment) throws RegistryException, AppCatalogException, GraphException, ComponentException {
+ WorkflowCatalog workflowCatalog = getWorkflowCatalog();
+ return new Workflow(workflowCatalog.getWorkflow(experiment.getApplicationId()).getGraph());
+ }
+
+ private WorkflowCatalog getWorkflowCatalog() throws AppCatalogException {
+ return AppCatalogFactory.getAppCatalog().getWorkflowCatalog();
+ }
+
+ private ArrayList<Node> getInputNodes(Workflow wf) {
+ ArrayList<Node> list = new ArrayList<Node>();
+ List<NodeImpl> nodes = wf.getGraph().getNodes();
+ for (Node node : nodes) {
+ String name = node.getComponent().getName();
+ if (InputComponent.NAME.equals(name) || ConstantComponent.NAME.equals(name) || S3InputComponent.NAME.equals(name)) {
+ list.add(node);
+ }
+ }
+ return list;
+ }
+
+ public Map<String, WorkflowNode> getWfNodes() {
+ return wfNodes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParserTest.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParserTest.java b/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParserTest.java
deleted file mode 100644
index e9b3e55..0000000
--- a/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParserTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.simple.workflow.engine.parser;
-
-import org.apache.airavata.model.appcatalog.appinterface.DataType;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.workspace.experiment.Experiment;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
-import org.apache.airavata.workflow.model.wf.Workflow;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class AiravataDefaultParserTest {
-
- @Before
- public void setUp() throws Exception {
-
- }
-
- @After
- public void tearDown() throws Exception {
-
- }
-
- @Test
- public void testWorkflowParse() throws Exception {
- Assert.assertNotNull("Test file (ComplexMathWorkflow.awf) is missing", getClass().getResource("/ComplexMathWorkflow.awf"));
- InputStreamReader isr = new InputStreamReader(this.getClass().getResourceAsStream("/ComplexMathWorkflow.awf"));
- BufferedReader br = new BufferedReader(isr);
- StringBuffer sb = new StringBuffer();
- String nextLine = br.readLine();
- while (nextLine != null) {
- sb.append(nextLine);
- nextLine = br.readLine();
- }
- Workflow workflow = new Workflow(sb.toString());
- Experiment experiment = new Experiment();
- InputDataObjectType x = new InputDataObjectType();
- x.setValue("6");
- x.setType(DataType.STRING);
- x.setName("x");
-
- InputDataObjectType y = new InputDataObjectType();
- y.setValue("8");
- y.setType(DataType.STRING);
- y.setName("y");
-
- InputDataObjectType z = new InputDataObjectType();
- z.setValue("10");
- z.setType(DataType.STRING);
- z.setName("y_2");
-
- List<InputDataObjectType> inputs = new ArrayList<InputDataObjectType>();
- inputs.add(x);
- inputs.add(y);
- inputs.add(z);
- experiment.setExperimentInputs(inputs);
- // create parser
- AiravataDefaultParser parser = new AiravataDefaultParser(experiment, "testCredentialId");
- List<WorkflowInputNode> workflowInputNodes = parser.parseWorkflow(workflow);
- Assert.assertNotNull(workflowInputNodes);
- Assert.assertEquals(3, workflowInputNodes.size());
- for (WorkflowInputNode workflowInputNode : workflowInputNodes) {
- Assert.assertNotNull(workflowInputNode.getOutPort());
- Assert.assertNotNull(workflowInputNode.getInputObject());
- }
-
- Map<String, WorkflowNode> wfNodes = parser.getWfNodes();
- for (String wfId : wfNodes.keySet()) {
- WorkflowNode wfNode = wfNodes.get(wfId);
- if (wfNode instanceof ApplicationNode) {
- ApplicationNode node = (ApplicationNode) wfNode;
- Assert.assertEquals(2, node.getInputPorts().size());
- Assert.assertNotNull(node.getInputPorts().get(0).getInputObject());
- Assert.assertNotNull(node.getInputPorts().get(1).getInputObject());
- Assert.assertNotNull(node.getInputPorts().get(0).getEdge());
- Assert.assertNotNull(node.getInputPorts().get(1).getEdge());
-
- Assert.assertEquals(1, node.getOutputPorts().size());
- Assert.assertEquals(1, node.getOutputPorts().get(0).getOutEdges().size());
- Assert.assertNotNull(node.getOutputPorts().get(0).getOutEdges().get(0));
- } else if (wfNode instanceof WorkflowOutputNode) {
- WorkflowOutputNode workflowOutputNode = (WorkflowOutputNode) wfNode;
- Assert.assertNotNull(workflowOutputNode.getInPort());
- }
- }
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java b/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java
new file mode 100644
index 0000000..6443806
--- /dev/null
+++ b/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.parser;
+
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
+import org.apache.airavata.workflow.model.wf.Workflow;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class AiravataWorkflowParserTest {
+
+ @Before
+ public void setUp() throws Exception {
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+
+ }
+
+ @Test
+ public void testWorkflowParse() throws Exception {
+ Assert.assertNotNull("Test file (ComplexMathWorkflow.awf) is missing", getClass().getResource("/ComplexMathWorkflow.awf"));
+ InputStreamReader isr = new InputStreamReader(this.getClass().getResourceAsStream("/ComplexMathWorkflow.awf"));
+ BufferedReader br = new BufferedReader(isr);
+ StringBuffer sb = new StringBuffer();
+ String nextLine = br.readLine();
+ while (nextLine != null) {
+ sb.append(nextLine);
+ nextLine = br.readLine();
+ }
+ Workflow workflow = new Workflow(sb.toString());
+ Experiment experiment = new Experiment();
+ InputDataObjectType x = new InputDataObjectType();
+ x.setValue("6");
+ x.setType(DataType.STRING);
+ x.setName("x");
+
+ InputDataObjectType y = new InputDataObjectType();
+ y.setValue("8");
+ y.setType(DataType.STRING);
+ y.setName("y");
+
+ InputDataObjectType z = new InputDataObjectType();
+ z.setValue("10");
+ z.setType(DataType.STRING);
+ z.setName("y_2");
+
+ List<InputDataObjectType> inputs = new ArrayList<InputDataObjectType>();
+ inputs.add(x);
+ inputs.add(y);
+ inputs.add(z);
+ experiment.setExperimentInputs(inputs);
+ // create parser
+ AiravataWorkflowParser parser = new AiravataWorkflowParser(experiment, "testCredentialId");
+ List<WorkflowInputNode> workflowInputNodes = parser.parseWorkflow(workflow);
+ Assert.assertNotNull(workflowInputNodes);
+ Assert.assertEquals(3, workflowInputNodes.size());
+ for (WorkflowInputNode workflowInputNode : workflowInputNodes) {
+ Assert.assertNotNull(workflowInputNode.getOutPort());
+ Assert.assertNotNull(workflowInputNode.getInputObject());
+ }
+
+ Map<String, WorkflowNode> wfNodes = parser.getWfNodes();
+ for (String wfId : wfNodes.keySet()) {
+ WorkflowNode wfNode = wfNodes.get(wfId);
+ if (wfNode instanceof ApplicationNode) {
+ ApplicationNode node = (ApplicationNode) wfNode;
+ Assert.assertEquals(2, node.getInputPorts().size());
+ Assert.assertNotNull(node.getInputPorts().get(0).getInputObject());
+ Assert.assertNotNull(node.getInputPorts().get(1).getInputObject());
+ Assert.assertNotNull(node.getInputPorts().get(0).getEdge());
+ Assert.assertNotNull(node.getInputPorts().get(1).getEdge());
+
+ Assert.assertEquals(1, node.getOutputPorts().size());
+ Assert.assertEquals(1, node.getOutputPorts().get(0).getOutEdges().size());
+ Assert.assertNotNull(node.getOutputPorts().get(0).getOutEdges().get(0));
+ } else if (wfNode instanceof WorkflowOutputNode) {
+ WorkflowOutputNode workflowOutputNode = (WorkflowOutputNode) wfNode;
+ Assert.assertNotNull(workflowOutputNode.getInPort());
+ }
+ }
+
+ }
+}
\ No newline at end of file