You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/02/08 17:16:34 UTC
airavata git commit: Added Apache License headers and remove
deprecated test methods from json workflow parser test class
Repository: airavata
Updated Branches:
refs/heads/develop d8df3d00c -> b4ca1eb5a
Added Apache License headers and remove deprecated test methods from json workflow parser test class
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b4ca1eb5
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b4ca1eb5
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b4ca1eb5
Branch: refs/heads/develop
Commit: b4ca1eb5a361b3040e4115dae2235cb4816c8ff1
Parents: d8df3d0
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Feb 8 11:10:37 2016 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Feb 8 11:15:58 2016 -0500
----------------------------------------------------------------------
.../core/SimpleWorkflowInterpreter.java | 328 ------------------
.../airavata/workflow/core/WorkflowBuilder.java | 21 ++
.../workflow/core/WorkflowEnactmentService.java | 28 +-
.../airavata/workflow/core/WorkflowFactory.java | 1 +
.../workflow/core/WorkflowInterpreter.java | 336 +++++++++++++++++++
.../airavata/workflow/core/WorkflowParser.java | 46 ---
.../core/parser/JsonWorkflowParser.java | 37 +-
.../workflow/core/parser/WorkflowParser.java | 46 +++
.../core/parser/JsonWorkflowParserTest.java | 120 +------
9 files changed, 461 insertions(+), 502 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
deleted file mode 100644
index cdbf2f2..0000000
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.core;
-
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
-import org.apache.airavata.model.ComponentState;
-import org.apache.airavata.model.ComponentStatus;
-import org.apache.airavata.model.application.io.OutputDataObjectType;
-import org.apache.airavata.model.experiment.ExperimentModel;
-import org.apache.airavata.model.messaging.event.ProcessIdentifier;
-import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
-import org.apache.airavata.model.status.ProcessState;
-import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.*;
-import org.apache.airavata.workflow.core.dag.edge.Edge;
-import org.apache.airavata.workflow.core.dag.nodes.*;
-import org.apache.airavata.workflow.core.dag.port.OutPort;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Package-Private class
- */
-class SimpleWorkflowInterpreter{
-
- private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class);
- private List<InputNode> inputNodes;
-
- private ExperimentModel experiment;
-
- private String credentialToken;
-
- private String gatewayName;
-
- private String workflowString;
- private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<>();
- private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<>();
- private Map<String, WorkflowNode> processingQueue = new ConcurrentHashMap<>();
- private Map<String, WorkflowNode> completeList = new HashMap<>();
- private Registry registry;
- private List<OutputNode> completeWorkflowOutputs = new ArrayList<>();
- private RabbitMQProcessLaunchPublisher publisher;
- private RabbitMQStatusConsumer statusConsumer;
- private String consumerId;
- private boolean continueWorkflow = true;
-
- public SimpleWorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) throws RegistryException {
- this.gatewayName = gatewayName;
- setExperiment(experimentId);
- this.credentialToken = credentialToken;
- this.publisher = publisher;
- }
-
- public SimpleWorkflowInterpreter(ExperimentModel experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) {
- this.gatewayName = gatewayName;
- this.experiment = experiment;
- this.credentialToken = credentialStoreToken;
- this.publisher = publisher;
- }
-
- /**
- * Package-Private method.
- * @throws Exception
- */
- void launchWorkflow() throws Exception {
-// WorkflowBuilder workflowBuilder = WorkflowFactory.getWorkflowBuilder(experiment.getExperimentId(), credentialToken, null);
- workflowString = getWorkflow();
- WorkflowParser workflowParser = WorkflowFactory.getWorkflowParser(workflowString);
- log.debug("Initialized workflow parser");
- workflowParser.parse();
- setInputNodes(workflowParser.getInputNodes());
- log.debug("Parsed the workflow and got the workflow input nodes");
- // process workflow input nodes
- processWorkflowInputNodes(getInputNodes());
- if (readyList.isEmpty()) {
- StringBuilder sb = new StringBuilder();
- for (InputNode inputNode : inputNodes) {
- sb.append(", ");
- sb.append(inputNode.getInputObject().getName());
- sb.append("=");
- sb.append(inputNode.getInputObject().getValue());
- }
- throw new AiravataException("No workflow application node is in ready state to run with experiment inputs" + sb.toString());
- }
- processReadyList();
- }
-
- private String getWorkflow() throws AppCatalogException, WorkflowCatalogException {
- WorkflowCatalog workflowCatalog = RegistryFactory.getAppCatalog().getWorkflowCatalog();
- //FIXME: parse workflowTemplateId or experimentId
-// workflowCatalog.getWorkflow("");
- return "";
- }
-
- // try to remove synchronization tag
- /**
- * Package-Private method.
- * @throws RegistryException
- * @throws AiravataException
- */
- void processReadyList() throws RegistryException, AiravataException {
- if (readyList.isEmpty() && processingQueue.isEmpty() && !waitingList.isEmpty()) {
- throw new AiravataException("No workflow application node is in ready state to run");
- }
- for (WorkflowNode readyNode : readyList.values()) {
- if (readyNode instanceof OutputNode) {
- OutputNode outputNode = (OutputNode) readyNode;
- outputNode.getOutputObject().setValue(outputNode.getInPort().getInputObject().getValue());
- addToCompleteOutputNodeList(outputNode);
- } else if (readyNode instanceof InputNode) {
- // set input object of applications and add applications to ready List.
- } else if (readyNode instanceof ApplicationNode) {
- // call orchestrator to create process for the application
- } else {
- throw new RuntimeException("Unsupported workflow node type");
- }
- }
-
- if (processingQueue.isEmpty() && waitingList.isEmpty()) {
- try {
- saveWorkflowOutputs();
- } catch (AppCatalogException e) {
- throw new AiravataException("Error while updating completed workflow outputs to registry", e);
- }
- }
- }
-
- private void saveWorkflowOutputs() throws AppCatalogException {
- List<OutputDataObjectType> outputDataObjects = new ArrayList<>();
- for (OutputNode completeWorkflowOutput : completeWorkflowOutputs) {
- outputDataObjects.add(completeWorkflowOutput.getOutputObject());
- }
-// RegistryFactory.getAppCatalog().getWorkflowCatalog()
-// .updateWorkflowOutputs(experiment.getApplicationId(), outputDataObjects);
- }
-
- private void processWorkflowInputNodes(List<InputNode> inputNodes) {
- Set<WorkflowNode> tempNodeSet = new HashSet<>();
- for (InputNode inputNode : inputNodes) {
- if (inputNode.isReady()) {
- log.debug("Workflow node : " + inputNode.getId() + " is ready to execute");
- for (Edge edge : inputNode.getOutPort().getEdges()) {
- edge.getToPort().getInputObject().setValue(inputNode.getInputObject().getValue());
- if (edge.getToPort().getNode().isReady()) {
- addToReadyQueue(edge.getToPort().getNode());
- log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue");
- } else {
- addToWaitingQueue(edge.getToPort().getNode());
- log.debug("Added workflow node " + edge.getToPort().getNode().getId() + " to the waitingQueue");
-
- }
- }
- }
- }
- }
-
-
- public List<InputNode> getInputNodes() throws Exception {
- return inputNodes;
- }
-
- public void setInputNodes(List<InputNode> inputNodes) {
- this.inputNodes = inputNodes;
- }
-
- private Registry getRegistry() throws RegistryException {
- if (registry==null){
- registry = RegistryFactory.getRegistry();
- }
- return registry;
- }
-
- /**
- * Package-Private method.
- * Remove the workflow node from waiting queue and add it to the ready queue.
- * @param workflowNode - Workflow Node
- */
- synchronized void addToReadyQueue(WorkflowNode workflowNode) {
- waitingList.remove(workflowNode.getId());
- readyList.put(workflowNode.getId(), workflowNode);
- }
-
- private void addToWaitingQueue(WorkflowNode workflowNode) {
- waitingList.put(workflowNode.getId(), workflowNode);
- }
-
- /**
- * First remove the node from ready list and then add the WfNodeContainer to the process queue.
- * Note that underline data structure of the process queue is a Map.
- * @param applicationNode - has both workflow and correspond workflowNodeDetails and TaskDetails
- */
- private synchronized void addToProcessingQueue(ApplicationNode applicationNode) {
- readyList.remove(applicationNode.getId());
- processingQueue.put(applicationNode.getId(), applicationNode);
- }
-
- private synchronized void addToCompleteQueue(ApplicationNode applicationNode) {
- processingQueue.remove(applicationNode.getId());
- completeList.put(applicationNode.getId(), applicationNode);
- }
-
-
- private void addToCompleteOutputNodeList(OutputNode wfOutputNode) {
- completeWorkflowOutputs.add(wfOutputNode);
- readyList.remove(wfOutputNode.getId());
- }
-
- boolean isAllDone() {
- return !continueWorkflow || (waitingList.isEmpty() && readyList.isEmpty() && processingQueue.isEmpty());
- }
-
- private void setExperiment(String experimentId) throws RegistryException {
- experiment = (ExperimentModel) getRegistry().getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
- log.debug("Retrieve Experiment for experiment id : " + experimentId);
- }
-
-/* synchronized void handleTaskOutputChangeEvent(ProcessStatusChangeEvent taskOutputChangeEvent) {
-
- String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId();
- log.debug("Task Output changed event received for workflow node : " +
- taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
- WorkflowNode workflowNode = processingQueue.get(taskId);
- Set<WorkflowNode> tempWfNodeSet = new HashSet<>();
- if (workflowNode != null) {
- if (workflowNode instanceof ApplicationNode) {
- ApplicationNode applicationNode = (ApplicationNode) workflowNode;
- // Workflow node can have one to many output ports and each output port can have one to many links
- for (OutPort outPort : applicationNode.getOutputPorts()) {
- for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) {
- if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
- outPort.getOutputObject().setValue(outputDataObjectType.getValue());
- break;
- }
- }
- for (Edge edge : outPort.getEdges()) {
- edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue());
- if (edge.getToPort().getNode().isReady()) {
- addToReadyQueue(edge.getToPort().getNode());
- }
- }
- }
- addToCompleteQueue(applicationNode);
- log.debug("removed task from processing queue : " + taskId);
- }
- try {
- processReadyList();
- } catch (Exception e) {
- log.error("Error while processing ready workflow nodes", e);
- continueWorkflow = false;
- }
- }
- }*/
-
- void handleProcessStatusChangeEvent(ProcessStatusChangeEvent processStatusChangeEvent) {
- ProcessState processState = processStatusChangeEvent.getState();
- ProcessIdentifier processIdentity = processStatusChangeEvent.getProcessIdentity();
- String processId = processIdentity.getProcessId();
- ApplicationNode applicationNode = (ApplicationNode) processingQueue.get(processId);
- if (applicationNode != null) {
- ComponentState state = applicationNode.getState();
- switch (processState) {
- case CREATED:
- case VALIDATED:
- case STARTED:
- break;
- case CONFIGURING_WORKSPACE:
- case PRE_PROCESSING:
- case INPUT_DATA_STAGING:
- case EXECUTING:
- case OUTPUT_DATA_STAGING:
- case POST_PROCESSING:
- state = ComponentState.RUNNING;
- break;
- case COMPLETED:
- state = ComponentState.COMPLETED;
- break;
- case FAILED:
- state = ComponentState.FAILED;
- break;
- case CANCELED:
- case CANCELLING:
- state = ComponentState.CANCELED;
- break;
- default:
- break;
- }
- if (state != applicationNode.getState()) {
- try {
- updateWorkflowNodeStatus(applicationNode, new ComponentStatus(state));
- } catch (RegistryException e) {
- log.error("Error! Couldn't update new application state to registry. nodeInstanceId : {} "
- + applicationNode.getId() + " status to: " + applicationNode.getState().toString() , e);
- }
- }
- }
-
- }
-
- private void updateWorkflowNodeStatus(ApplicationNode applicationNode, ComponentStatus componentStatus) throws RegistryException {
- // FIXME: save new workflow node status to registry.
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowBuilder.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowBuilder.java
index a794282..fb97161 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowBuilder.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowBuilder.java
@@ -1,3 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
package org.apache.airavata.workflow.core;
import org.apache.airavata.workflow.core.dag.nodes.InputNode;
http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
index 34ef8a7..8339aea 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
@@ -46,11 +46,11 @@ public class WorkflowEnactmentService {
private final RabbitMQStatusConsumer statusConsumer;
private String consumerId;
private ExecutorService executor;
- private Map<String,SimpleWorkflowInterpreter> workflowMap;
+ private Map<String,WorkflowInterpreter> workflowMap;
private WorkflowEnactmentService () throws AiravataException {
executor = Executors.newFixedThreadPool(getThreadPoolSize());
- workflowMap = new ConcurrentHashMap<String, SimpleWorkflowInterpreter>();
+ workflowMap = new ConcurrentHashMap<String, WorkflowInterpreter>();
statusConsumer = new RabbitMQStatusConsumer();
consumerId = statusConsumer.listen(new TaskMessageHandler());
// register the shutdown hook to un-bind status consumer.
@@ -73,10 +73,10 @@ public class WorkflowEnactmentService {
String gatewayName,
RabbitMQProcessLaunchPublisher publisher) throws Exception {
- SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter(
+ WorkflowInterpreter workflowInterpreter = new WorkflowInterpreter(
experimentId, credentialToken,gatewayName, publisher);
- workflowMap.put(experimentId, simpleWorkflowInterpreter);
- simpleWorkflowInterpreter.launchWorkflow();
+ workflowMap.put(experimentId, workflowInterpreter);
+ workflowInterpreter.launchWorkflow();
}
@@ -125,13 +125,13 @@ public class WorkflowEnactmentService {
private void process() {
String message;
- SimpleWorkflowInterpreter simpleWorkflowInterpreter;
+ WorkflowInterpreter workflowInterpreter;
if (msgCtx.getType() == MessageType.PROCESS) {
ProcessStatusChangeEvent event = ((ProcessStatusChangeEvent) msgCtx.getEvent());
ProcessIdentifier processIdentity = event.getProcessIdentity();
- simpleWorkflowInterpreter = getInterpreter(processIdentity.getExperimentId());
- if (simpleWorkflowInterpreter != null) {
- simpleWorkflowInterpreter.handleProcessStatusChangeEvent(event);
+ workflowInterpreter = getInterpreter(processIdentity.getExperimentId());
+ if (workflowInterpreter != null) {
+ workflowInterpreter.handleProcessStatusChangeEvent(event);
} else {
// this happens when Task status messages comes after the Taskoutput messages,as we have worked on
// output changes it is ok to ignore this.
@@ -140,10 +140,10 @@ public class WorkflowEnactmentService {
}else if (msgCtx.getType() == MessageType.PROCESSOUTPUT) {
TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
TaskIdentifier taskIdentifier = event.getTaskIdentity();
- simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId());
- if (simpleWorkflowInterpreter != null) {
-// simpleWorkflowInterpreter.handleTaskOutputChangeEvent(event);
- if (simpleWorkflowInterpreter.isAllDone()) {
+ workflowInterpreter = getInterpreter(taskIdentifier.getExperimentId());
+ if (workflowInterpreter != null) {
+// workflowInterpreter.handleTaskOutputChangeEvent(event);
+ if (workflowInterpreter.isAllDone()) {
workflowMap.remove(taskIdentifier.getExperimentId());
}
} else {
@@ -157,7 +157,7 @@ public class WorkflowEnactmentService {
}
}
- private SimpleWorkflowInterpreter getInterpreter(String experimentId){
+ private WorkflowInterpreter getInterpreter(String experimentId){
return workflowMap.get(experimentId);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
index 9392461..cb76790 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
@@ -24,6 +24,7 @@ package org.apache.airavata.workflow.core;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.workflow.core.parser.JsonWorkflowParser;
+import org.apache.airavata.workflow.core.parser.WorkflowParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
new file mode 100644
index 0000000..b42e7ac
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
@@ -0,0 +1,336 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.model.ComponentState;
+import org.apache.airavata.model.ComponentStatus;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.experiment.ExperimentModel;
+import org.apache.airavata.model.messaging.event.ProcessIdentifier;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.workflow.core.dag.edge.Edge;
+import org.apache.airavata.workflow.core.dag.nodes.*;
+import org.apache.airavata.workflow.core.parser.WorkflowParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Package-Private class
+ */
+class WorkflowInterpreter {
+
+ private static final Logger log = LoggerFactory.getLogger(WorkflowInterpreter.class);
+ private List<InputNode> inputNodes;
+
+ private ExperimentModel experiment;
+
+ private String credentialToken;
+
+ private String gatewayName;
+
+ private String workflowString;
+ private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<>();
+ private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<>();
+ private Map<String, WorkflowNode> processingQueue = new ConcurrentHashMap<>();
+ private Map<String, WorkflowNode> completeList = new HashMap<>();
+ private Registry registry;
+ private List<OutputNode> completeWorkflowOutputs = new ArrayList<>();
+ private RabbitMQProcessLaunchPublisher publisher;
+ private RabbitMQStatusConsumer statusConsumer;
+ private String consumerId;
+ private boolean continueWorkflow = true;
+
+ public WorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) throws RegistryException {
+ this.gatewayName = gatewayName;
+ setExperiment(experimentId);
+ this.credentialToken = credentialToken;
+ this.publisher = publisher;
+ }
+
+ public WorkflowInterpreter(ExperimentModel experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) {
+ this.gatewayName = gatewayName;
+ this.experiment = experiment;
+ this.credentialToken = credentialStoreToken;
+ this.publisher = publisher;
+ }
+
+ /**
+ * Package-Private method.
+ *
+ * @throws Exception
+ */
+ void launchWorkflow() throws Exception {
+// WorkflowBuilder workflowBuilder = WorkflowFactory.getWorkflowBuilder(experiment.getExperimentId(), credentialToken, null);
+ workflowString = getWorkflow();
+ WorkflowParser workflowParser = WorkflowFactory.getWorkflowParser(workflowString);
+ log.debug("Initialized workflow parser");
+ workflowParser.parse();
+ setInputNodes(workflowParser.getInputNodes());
+ log.debug("Parsed the workflow and got the workflow input nodes");
+ // process workflow input nodes
+ processWorkflowInputNodes(getInputNodes());
+ if (readyList.isEmpty()) {
+ StringBuilder sb = new StringBuilder();
+ for (InputNode inputNode : inputNodes) {
+ sb.append(", ");
+ sb.append(inputNode.getInputObject().getName());
+ sb.append("=");
+ sb.append(inputNode.getInputObject().getValue());
+ }
+ throw new AiravataException("No workflow application node is in ready state to run with experiment inputs" + sb.toString());
+ }
+ processReadyList();
+ }
+
+ private String getWorkflow() throws AppCatalogException, WorkflowCatalogException {
+ WorkflowCatalog workflowCatalog = RegistryFactory.getAppCatalog().getWorkflowCatalog();
+ //FIXME: parse workflowTemplateId or experimentId
+// workflowCatalog.getWorkflow("");
+ return "";
+ }
+
+ // try to remove synchronization tag
+
+ /**
+ * Package-Private method.
+ *
+ * @throws RegistryException
+ * @throws AiravataException
+ */
+ void processReadyList() throws RegistryException, AiravataException {
+ if (readyList.isEmpty() && processingQueue.isEmpty() && !waitingList.isEmpty()) {
+ throw new AiravataException("No workflow application node is in ready state to run");
+ }
+ for (WorkflowNode readyNode : readyList.values()) {
+ if (readyNode instanceof OutputNode) {
+ OutputNode outputNode = (OutputNode) readyNode;
+ outputNode.getOutputObject().setValue(outputNode.getInPort().getInputObject().getValue());
+ addToCompleteOutputNodeList(outputNode);
+ } else if (readyNode instanceof InputNode) {
+ // FIXME: set input object of applications and add applications to ready List.
+ } else if (readyNode instanceof ApplicationNode) {
+ // FIXME: call orchestrator to create process for the application
+ } else {
+ throw new RuntimeException("Unsupported workflow node type");
+ }
+ }
+
+ if (processingQueue.isEmpty() && waitingList.isEmpty()) {
+ try {
+ saveWorkflowOutputs();
+ } catch (AppCatalogException e) {
+ throw new AiravataException("Error while updating completed workflow outputs to registry", e);
+ }
+ }
+ }
+
+ private void saveWorkflowOutputs() throws AppCatalogException {
+ List<OutputDataObjectType> outputDataObjects = new ArrayList<>();
+ for (OutputNode completeWorkflowOutput : completeWorkflowOutputs) {
+ outputDataObjects.add(completeWorkflowOutput.getOutputObject());
+ }
+ // FIXME: save workflow output to registry.
+// RegistryFactory.getAppCatalog().getWorkflowCatalog()
+// .updateWorkflowOutputs(experiment.getApplicationId(), outputDataObjects);
+ }
+
+ private void processWorkflowInputNodes(List<InputNode> inputNodes) {
+ Set<WorkflowNode> tempNodeSet = new HashSet<>();
+ for (InputNode inputNode : inputNodes) {
+ if (inputNode.isReady()) {
+ log.debug("Workflow node : " + inputNode.getId() + " is ready to execute");
+ for (Edge edge : inputNode.getOutPort().getEdges()) {
+ edge.getToPort().getInputObject().setValue(inputNode.getInputObject().getValue());
+ if (edge.getToPort().getNode().isReady()) {
+ addToReadyQueue(edge.getToPort().getNode());
+ log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue");
+ } else {
+ addToWaitingQueue(edge.getToPort().getNode());
+ log.debug("Added workflow node " + edge.getToPort().getNode().getId() + " to the waitingQueue");
+
+ }
+ }
+ }
+ }
+ }
+
+
+ public List<InputNode> getInputNodes() throws Exception {
+ return inputNodes;
+ }
+
+ public void setInputNodes(List<InputNode> inputNodes) {
+ this.inputNodes = inputNodes;
+ }
+
+ private Registry getRegistry() throws RegistryException {
+ if (registry == null) {
+ registry = RegistryFactory.getRegistry();
+ }
+ return registry;
+ }
+
+ /**
+ * Package-Private method.
+ * Remove the workflow node from waiting queue and add it to the ready queue.
+ *
+ * @param workflowNode - Workflow Node
+ */
+ synchronized void addToReadyQueue(WorkflowNode workflowNode) {
+ waitingList.remove(workflowNode.getId());
+ readyList.put(workflowNode.getId(), workflowNode);
+ }
+
+ private void addToWaitingQueue(WorkflowNode workflowNode) {
+ waitingList.put(workflowNode.getId(), workflowNode);
+ }
+
+ /**
+ * First remove the node from ready list and then add the WfNodeContainer to the process queue.
+ * Note that underline data structure of the process queue is a Map.
+ *
+ * @param applicationNode - has both workflow and correspond workflowNodeDetails and TaskDetails
+ */
+ private synchronized void addToProcessingQueue(ApplicationNode applicationNode) {
+ readyList.remove(applicationNode.getId());
+ processingQueue.put(applicationNode.getId(), applicationNode);
+ }
+
+ private synchronized void addToCompleteQueue(ApplicationNode applicationNode) {
+ processingQueue.remove(applicationNode.getId());
+ completeList.put(applicationNode.getId(), applicationNode);
+ }
+
+
+ private void addToCompleteOutputNodeList(OutputNode wfOutputNode) {
+ completeWorkflowOutputs.add(wfOutputNode);
+ readyList.remove(wfOutputNode.getId());
+ }
+
+ boolean isAllDone() {
+ return !continueWorkflow || (waitingList.isEmpty() && readyList.isEmpty() && processingQueue.isEmpty());
+ }
+
+ private void setExperiment(String experimentId) throws RegistryException {
+ experiment = (ExperimentModel) getRegistry().getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+ log.debug("Retrieve Experiment for experiment id : " + experimentId);
+ }
+
+/* synchronized void handleTaskOutputChangeEvent(ProcessStatusChangeEvent taskOutputChangeEvent) {
+
+ String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId();
+ log.debug("Task Output changed event received for workflow node : " +
+ taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
+ WorkflowNode workflowNode = processingQueue.get(taskId);
+ Set<WorkflowNode> tempWfNodeSet = new HashSet<>();
+ if (workflowNode != null) {
+ if (workflowNode instanceof ApplicationNode) {
+ ApplicationNode applicationNode = (ApplicationNode) workflowNode;
+ // Workflow node can have one to many output ports and each output port can have one to many links
+ for (OutPort outPort : applicationNode.getOutputPorts()) {
+ for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) {
+ if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
+ outPort.getOutputObject().setValue(outputDataObjectType.getValue());
+ break;
+ }
+ }
+ for (Edge edge : outPort.getEdges()) {
+ edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue());
+ if (edge.getToPort().getNode().isReady()) {
+ addToReadyQueue(edge.getToPort().getNode());
+ }
+ }
+ }
+ addToCompleteQueue(applicationNode);
+ log.debug("removed task from processing queue : " + taskId);
+ }
+ try {
+ processReadyList();
+ } catch (Exception e) {
+ log.error("Error while processing ready workflow nodes", e);
+ continueWorkflow = false;
+ }
+ }
+ }*/
+
+ void handleProcessStatusChangeEvent(ProcessStatusChangeEvent processStatusChangeEvent) {
+ ProcessState processState = processStatusChangeEvent.getState();
+ ProcessIdentifier processIdentity = processStatusChangeEvent.getProcessIdentity();
+ String processId = processIdentity.getProcessId();
+ ApplicationNode applicationNode = (ApplicationNode) processingQueue.get(processId);
+ if (applicationNode != null) {
+ ComponentState state = applicationNode.getState();
+ switch (processState) {
+ case CREATED:
+ case VALIDATED:
+ case STARTED:
+ break;
+ case CONFIGURING_WORKSPACE:
+ case PRE_PROCESSING:
+ case INPUT_DATA_STAGING:
+ case EXECUTING:
+ case OUTPUT_DATA_STAGING:
+ case POST_PROCESSING:
+ state = ComponentState.RUNNING;
+ break;
+ case COMPLETED:
+ state = ComponentState.COMPLETED;
+ // FIXME: read output form registry and set it to node outputport then continue to next application.
+ break;
+ case FAILED:
+ state = ComponentState.FAILED;
+ // FIXME: fail workflow.
+ break;
+ case CANCELED:
+ case CANCELLING:
+ state = ComponentState.CANCELED;
+ // FIXME: cancel workflow.
+ break;
+ default:
+ break;
+ }
+ if (state != applicationNode.getState()) {
+ try {
+ updateWorkflowNodeStatus(applicationNode, new ComponentStatus(state));
+ } catch (RegistryException e) {
+ log.error("Error! Couldn't update new application state to registry. nodeInstanceId : {} "
+ + applicationNode.getId() + " status to: " + applicationNode.getState().toString(), e);
+ }
+ }
+ }
+
+ }
+
+ private void updateWorkflowNodeStatus(ApplicationNode applicationNode, ComponentStatus componentStatus) throws RegistryException {
+ // FIXME: save new workflow node status to registry.
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java
deleted file mode 100644
index 46bc1d8..0000000
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.core;
-
-import org.apache.airavata.workflow.core.dag.edge.Edge;
-import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode;
-import org.apache.airavata.workflow.core.dag.nodes.InputNode;
-import org.apache.airavata.workflow.core.dag.nodes.OutputNode;
-import org.apache.airavata.workflow.core.dag.port.Port;
-
-import java.util.List;
-
-public interface WorkflowParser {
-
- public void parse() throws Exception;
-
- public List<InputNode> getInputNodes() throws Exception;
-
- public List<OutputNode> getOutputNodes() throws Exception;
-
- public List<ApplicationNode> getApplicationNodes() throws Exception;
-
- public List<Port> getPorts() throws Exception;
-
- public List<Edge> getEdges() throws Exception;
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java
index ede69e3..f6bb084 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java
@@ -1,19 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
package org.apache.airavata.workflow.core.parser;
import com.google.gson.JsonObject;
-import org.apache.airavata.workflow.core.WorkflowParser;
import org.apache.airavata.workflow.core.dag.edge.Edge;
import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode;
import org.apache.airavata.workflow.core.dag.nodes.InputNode;
import org.apache.airavata.workflow.core.dag.nodes.OutputNode;
import org.apache.airavata.workflow.core.dag.port.Port;
+import java.util.ArrayList;
import java.util.List;
-/**
- * Created by syodage on 1/27/16.
- */
-public class JsonWorkflowParser implements WorkflowParser{
+public class JsonWorkflowParser implements WorkflowParser {
private final String workflow;
private List<InputNode> inputs;
@@ -24,8 +42,15 @@ public class JsonWorkflowParser implements WorkflowParser{
public JsonWorkflowParser(String jsonWorkflowString) {
workflow = jsonWorkflowString;
+
+ inputs = new ArrayList<>();
+ outputs = new ArrayList<>();
+ applications = new ArrayList<>();
+ ports = new ArrayList<>();
+ edges = new ArrayList<>();
}
+
@Override
public void parse() throws Exception {
// TODO parse json string and construct components
@@ -69,7 +94,7 @@ public class JsonWorkflowParser implements WorkflowParser{
return null;
}
- private Port createPort(JsonObject jPort){
+ private Port createPort(JsonObject jPort) {
return null;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/WorkflowParser.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/WorkflowParser.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/WorkflowParser.java
new file mode 100644
index 0000000..dc18c9e
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/WorkflowParser.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.parser;
+
+import org.apache.airavata.workflow.core.dag.edge.Edge;
+import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode;
+import org.apache.airavata.workflow.core.dag.nodes.InputNode;
+import org.apache.airavata.workflow.core.dag.nodes.OutputNode;
+import org.apache.airavata.workflow.core.dag.port.Port;
+
+import java.util.List;
+
+public interface WorkflowParser {
+
+ public void parse() throws Exception;
+
+ public List<InputNode> getInputNodes() throws Exception;
+
+ public List<OutputNode> getOutputNodes() throws Exception;
+
+ public List<ApplicationNode> getApplicationNodes() throws Exception;
+
+ public List<Port> getPorts() throws Exception;
+
+ public List<Edge> getEdges() throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ca1eb5/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java b/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java
index 712944d..3fedc9c 100644
--- a/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java
+++ b/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java
@@ -1,51 +1,25 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
package org.apache.airavata.workflow.core.parser;
-import org.apache.airavata.model.application.io.DataType;
-import org.apache.airavata.model.application.io.InputDataObjectType;
-import org.apache.airavata.model.experiment.ExperimentModel;
-import org.apache.airavata.workflow.core.WorkflowParser;
-import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode;
-import org.apache.airavata.workflow.core.dag.nodes.InputNode;
-import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
-import org.apache.airavata.workflow.core.dag.nodes.OutputNode;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.io.File;
+import java.io.InputStream;
+import static org.junit.Assert.*;
+
+/**
+ * Created by syodage on 2/8/16.
+ */
public class JsonWorkflowParserTest {
+ private String workflowString;
+
+
@Before
public void setUp() throws Exception {
-
+ InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("TestWorkflow.json");
}
@After
@@ -54,77 +28,7 @@ public class JsonWorkflowParserTest {
}
@Test
- public void testWorkflowParse() throws Exception {
- Assert.assertNotNull("Test file (ComplexMathWorkflow.awf) is missing", getClass().getResource("/ComplexMathWorkflow.awf"));
- InputStreamReader isr = new InputStreamReader(this.getClass().getResourceAsStream("/ComplexMathWorkflow.awf"));
- BufferedReader br = new BufferedReader(isr);
- StringBuffer sb = new StringBuffer();
- String nextLine = br.readLine();
- while (nextLine != null) {
- sb.append(nextLine);
- nextLine = br.readLine();
- }
-// Workflow workflow = new Workflow(sb.toString());
- ExperimentModel experiment = new ExperimentModel();
- InputDataObjectType x = new InputDataObjectType();
- x.setValue("6");
- x.setType(DataType.STRING);
- x.setName("x");
-
- InputDataObjectType y = new InputDataObjectType();
- y.setValue("8");
- y.setType(DataType.STRING);
- y.setName("y");
-
- InputDataObjectType z = new InputDataObjectType();
- z.setValue("10");
- z.setType(DataType.STRING);
- z.setName("y_2");
-
- List<InputDataObjectType> inputs = new ArrayList<InputDataObjectType>();
- inputs.add(x);
- inputs.add(y);
- inputs.add(z);
- experiment.setExperimentInputs(inputs);
- // create parser
- WorkflowParser parser = new JsonWorkflowParser("workflow string");
- parser.parse();
- List<InputNode> inputNodes = parser.getInputNodes();
- Assert.assertNotNull(inputNodes);
- Assert.assertEquals(3, inputNodes.size());
- for (InputNode inputNode : inputNodes) {
- Assert.assertNotNull(inputNode.getOutPort());
- Assert.assertNotNull(inputNode.getInputObject());
- }
-
- Map<String, WorkflowNode> wfNodes = getWorkflowNodeMap(parser.getApplicationNodes());
- for (String wfId : wfNodes.keySet()) {
- WorkflowNode wfNode = wfNodes.get(wfId);
- if (wfNode instanceof ApplicationNode) {
- ApplicationNode node = (ApplicationNode) wfNode;
- Assert.assertEquals(2, node.getInputPorts().size());
- Assert.assertNotNull(node.getInputPorts().get(0).getInputObject());
- Assert.assertNotNull(node.getInputPorts().get(1).getInputObject());
- Assert.assertNotNull(node.getInputPorts().get(0).getEdge());
- Assert.assertNotNull(node.getInputPorts().get(1).getEdge());
-
- Assert.assertEquals(1, node.getOutputPorts().size());
- Assert.assertEquals(1, node.getOutputPorts().get(0).getEdges().size());
- Assert.assertNotNull(node.getOutputPorts().get(0).getEdges().get(0));
- } else if (wfNode instanceof OutputNode) {
- OutputNode outputNode = (OutputNode) wfNode;
- Assert.assertNotNull(outputNode.getInPort());
- }
- }
-
- }
-
- private Map<String, WorkflowNode> getWorkflowNodeMap(List<ApplicationNode> applicationNodes) {
- Map<String, WorkflowNode> map = new HashMap<>();
- for (ApplicationNode applicationNode : applicationNodes) {
- map.put(applicationNode.getApplicationId(), applicationNode);
- }
+ public void testParse() throws Exception {
- return map;
}
}
\ No newline at end of file