You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/26 18:08:25 UTC

[26/50] [abbrv] airavata git commit: Fixed AIRAVATA-1618, and optimized the imports.

Fixed AIRAVATA-1618, and optimized the imports.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/917adad5
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/917adad5
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/917adad5

Branch: refs/heads/master
Commit: 917adad5bbec9500a0524eeb7a3ce3763c89d961
Parents: 71db390
Author: shamrath <sh...@gmail.com>
Authored: Wed Mar 4 14:21:08 2015 -0500
Committer: shamrath <sh...@gmail.com>
Committed: Wed Mar 4 14:21:08 2015 -0500

----------------------------------------------------------------------
 .../engine/SimpleWorkflowInterpreter.java       | 112 ++++++++++---------
 .../workflow/engine/WorkflowFactoryImpl.java    |   2 -
 .../workflow/engine/dag/edge/DirectedEdge.java  |   2 +-
 .../engine/dag/nodes/ApplicationNodeImpl.java   |   7 +-
 .../workflow/engine/dag/nodes/NodeState.java    |  28 +++--
 .../engine/dag/nodes/WorkflowInputNodeImpl.java |   7 +-
 .../dag/nodes/WorkflowOutputNodeImpl.java       |   7 +-
 .../engine/parser/AiravataWorkflowParser.java   |  24 ++--
 .../workflow/engine/parser/PortContainer.java   |   2 +-
 .../parser/AiravataWorkflowParserTest.java      |   2 +-
 10 files changed, 111 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
index 8eb5d5e..a052e5c 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
@@ -55,7 +55,6 @@ import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
 import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
 import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
 import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
-import org.apache.airavata.simple.workflow.engine.parser.AiravataWorkflowParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,7 +77,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
 
     private String gatewayName;
 
-    private Map<String, WorkflowNode> readList = new ConcurrentHashMap<String, WorkflowNode>();
+    private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<String, WorkflowNode>();
     private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>();
     private Map<String, ProcessContext> processingQueue = new ConcurrentHashMap<String, ProcessContext>();
     private Map<String, ProcessContext> completeList = new HashMap<String, ProcessContext>();
@@ -87,6 +86,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
     private RabbitMQProcessPublisher publisher;
     private RabbitMQStatusConsumer statusConsumer;
     private String consumerId;
+    private boolean continueWorkflow = true;
 
     public SimpleWorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessPublisher publisher) throws RegistryException {
         this.gatewayName = gatewayName;
@@ -111,33 +111,27 @@ public class SimpleWorkflowInterpreter implements Runnable{
         log.debug("Parsed the workflow and got the workflow input nodes");
         // process workflow input nodes
         processWorkflowInputNodes(getWorkflowInputNodes());
-
-
+        // initialize the rabbitmq status consumer
         statusConsumer = new RabbitMQStatusConsumer();
         consumerId = statusConsumer.listen(new TaskMessageHandler());
+
+        processReadyList();
     }
 
     // try to remove synchronization tag
-    private synchronized void processReadyList() {
-        for (WorkflowNode readyNode : readList.values()) {
-            try {
-                if (readyNode instanceof WorkflowOutputNode) {
-                    WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode;
-                    wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue());
-                    addToCompleteOutputNodeList(wfOutputNode);
-                    continue;
-                }
-                WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
-                TaskDetails process = getProcess(workflowNodeDetails);
-                ProcessContext processContext = new ProcessContext(readyNode, workflowNodeDetails, process);
-                addToProcessingQueue(processContext);
-                publishToProcessQueue(process);
-//                publishToProcessQueue(processPack);
-            } catch (RegistryException e) {
-                // FIXME : handle this exception
-            } catch (AiravataException e) {
-                log.error("Error while publishing process to the process queue");
+    private synchronized void processReadyList() throws RegistryException, AiravataException {
+        for (WorkflowNode readyNode : readyList.values()) {
+            if (readyNode instanceof WorkflowOutputNode) {
+                WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode;
+                wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue());
+                addToCompleteOutputNodeList(wfOutputNode);
+                continue;
             }
+            WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
+            TaskDetails process = getProcess(workflowNodeDetails);
+            ProcessContext processContext = new ProcessContext(readyNode, workflowNodeDetails, process);
+            addToProcessingQueue(processContext);
+            publishToProcessQueue(process);
         }
     }
 
@@ -149,11 +143,6 @@ public class SimpleWorkflowInterpreter implements Runnable{
         MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.TASK, process.getTaskID(), null);
         messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
         publisher.publish(messageContext);
-
-
-//        Thread thread = new Thread(new TempPublisher(process, eventBus));
-//        thread.start();
-        //TODO: publish to process queue.
     }
 
     private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException {
@@ -171,6 +160,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
         if (readyNode instanceof ApplicationNode) {
             executionUnit = ExecutionUnit.APPLICATION;
             executionData = ((ApplicationNode) readyNode).getApplicationId();
+            setupNodeDetailsInput(((ApplicationNode) readyNode), wfNodeDetails);
         } else if (readyNode instanceof WorkflowInputNode) {
             executionUnit = ExecutionUnit.INPUT;
         } else if (readyNode instanceof WorkflowOutputNode) {
@@ -178,25 +168,19 @@ public class SimpleWorkflowInterpreter implements Runnable{
         }
         wfNodeDetails.setExecutionUnit(executionUnit);
         wfNodeDetails.setExecutionUnitData(executionData);
-        setupNodeDetailsInput(readyNode, wfNodeDetails);
         wfNodeDetails.setNodeInstanceId((String) getRegistry()
                 .add(ChildDataType.WORKFLOW_NODE_DETAIL, wfNodeDetails, getExperiment().getExperimentID()));
-//        nodeInstanceList.put(node, wfNodeDetails);
         return wfNodeDetails;
     }
 
-    private void setupNodeDetailsInput(WorkflowNode readyNode, WorkflowNodeDetails wfNodeDetails) {
-        if (readyNode instanceof ApplicationNode) {
-            ApplicationNode applicationNode = (ApplicationNode) readyNode;
-            if (applicationNode.isReady()) {
-                for (InPort inPort : applicationNode.getInputPorts()) {
-                    wfNodeDetails.addToNodeInputs(inPort.getInputObject());
-                }
-            } else {
-                // TODO: handle this scenario properly.
+    private void setupNodeDetailsInput(ApplicationNode readyAppNode, WorkflowNodeDetails wfNodeDetails) {
+        if (readyAppNode.isReady()) {
+            for (InPort inPort : readyAppNode.getInputPorts()) {
+                wfNodeDetails.addToNodeInputs(inPort.getInputObject());
             }
         } else {
-            // TODO: do we support for other type of workflow nodes ?
+            throw new IllegalArgumentException("Application node should be in ready state to set inputs to the " +
+                    "workflow node details, nodeId = " + readyAppNode.getId());
         }
     }
 
@@ -253,7 +237,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
      */
     private synchronized void addToReadyQueue(WorkflowNode workflowNode) {
         waitingList.remove(workflowNode.getId());
-        readList.put(workflowNode.getId(), workflowNode);
+        readyList.put(workflowNode.getId(), workflowNode);
     }
 
     private void addToWaitingQueue(WorkflowNode workflowNode) {
@@ -266,7 +250,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
      * @param processContext - has both workflow and correspond workflowNodeDetails and TaskDetails
      */
     private synchronized void addToProcessingQueue(ProcessContext processContext) {
-        readList.remove(processContext.getWorkflowNode().getId());
+        readyList.remove(processContext.getWorkflowNode().getId());
         processingQueue.put(processContext.getTaskDetails().getTaskID(), processContext);
     }
 
@@ -278,7 +262,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
 
     private void addToCompleteOutputNodeList(WorkflowOutputNode wfOutputNode) {
         completeWorkflowOutputs.add(wfOutputNode);
-        readList.remove(wfOutputNode.getId());
+        readyList.remove(wfOutputNode.getId());
     }
 
     @Override
@@ -286,15 +270,25 @@ public class SimpleWorkflowInterpreter implements Runnable{
         try {
             log.debug("Launching workflow");
             launchWorkflow();
-            while (!(waitingList.isEmpty() && readList.isEmpty())) {
-                processReadyList();
+            while (continueWorkflow && !(waitingList.isEmpty() && readyList.isEmpty())) {
+//                processReadyList();
                 Thread.sleep(1000);
             }
-            log.info("Successfully launched workflow for experiment : " + getExperiment().getExperimentID());
-            statusConsumer.stopListen(consumerId);
-            log.info("Successfully un-bind status consumer for experiment " + getExperiment().getExperimentID());
+            if (continueWorkflow) {
+                log.info("Successfully launched workflow for experiment : " + getExperiment().getExperimentID());
+            } else if (!(waitingList.isEmpty() || readyList.isEmpty())) {
+                log.error("Workflow couldn't execute all workflow nodes due to an error");
+            }
         } catch (Exception e) {
             log.error("Error launching workflow", e);
+        } finally {
+            try {
+                statusConsumer.stopListen(consumerId);
+                log.info("Successfully un-bind status consumer for experiment " + getExperiment().getExperimentID());
+            } catch (AiravataException e) {
+                log.error("Error while un-binding status consumer: " + consumerId + " for experiment "
+                        + getExperiment().getExperimentID());
+            }
         }
     }
 
@@ -369,6 +363,12 @@ public class SimpleWorkflowInterpreter implements Runnable{
                 }
                 addToCompleteQueue(processContext);
                 log.debug("removed task from processing queue : " + taskId);
+                try {
+                    processReadyList();
+                } catch (Exception e) {
+                    log.error("Error while processing ready workflow nodes", e);
+                    continueWorkflow = false;
+                }
             }
         }
 
@@ -378,39 +378,49 @@ public class SimpleWorkflowInterpreter implements Runnable{
             String taskId = taskIdentity.getTaskId();
             ProcessContext processContext = processingQueue.get(taskId);
             if (processContext != null) {
-                WorkflowNodeState wfNodeState = WorkflowNodeState.UNKNOWN;
+                WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED;
                 switch (taskState) {
                     case WAITING:
                         break;
                     case STARTED:
                         break;
                     case PRE_PROCESSING:
+                        wfNodeState = WorkflowNodeState.INVOKED;
                         processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
                         break;
                     case INPUT_DATA_STAGING:
+                        wfNodeState = WorkflowNodeState.INVOKED;
                         processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
                         break;
                     case EXECUTING:
+                        wfNodeState = WorkflowNodeState.EXECUTING;
                         processContext.getWorkflowNode().setState(NodeState.EXECUTING);
                         break;
                     case OUTPUT_DATA_STAGING:
+                        wfNodeState = WorkflowNodeState.COMPLETED;
                         processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
                         break;
                     case POST_PROCESSING:
+                        wfNodeState = WorkflowNodeState.COMPLETED;
                         processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
                         break;
                     case COMPLETED:
+                        wfNodeState = WorkflowNodeState.COMPLETED;
                         processContext.getWorkflowNode().setState(NodeState.EXECUTED);
                         break;
                     case FAILED:
+                        wfNodeState = WorkflowNodeState.FAILED;
                         processContext.getWorkflowNode().setState(NodeState.FAILED);
                         break;
                     case UNKNOWN:
+                        wfNodeState = WorkflowNodeState.UNKNOWN;
                         break;
                     case CONFIGURING_WORKSPACE:
+                        wfNodeState = WorkflowNodeState.COMPLETED;
                         break;
                     case CANCELED:
                     case CANCELING:
+                        wfNodeState = WorkflowNodeState.CANCELED;
                         processContext.getWorkflowNode().setState(NodeState.FAILED);
                         break;
                     default:
@@ -420,7 +430,9 @@ public class SimpleWorkflowInterpreter implements Runnable{
                     try {
                         updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState);
                     } catch (RegistryException e) {
-                        // TODO: handle this.
+                        log.error("Error while updating workflow node status update to the registry. nodeInstanceId :"
+                                + processContext.getWfNodeDetails().getNodeInstanceId() + " status to: "
+                                + processContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
index 23fc4c2..e70f062 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
@@ -23,13 +23,11 @@ package org.apache.airavata.simple.workflow.engine;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.registry.cpi.RegistryException;
 import org.apache.airavata.simple.workflow.engine.parser.AiravataWorkflowParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 
 /**
  * Singleton class, only one instance can exist in runtime.

http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java
index 3bc380d..ae7498a 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java
@@ -21,8 +21,8 @@
 
 package org.apache.airavata.simple.workflow.engine.dag.edge;
 
-import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
 import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
 
 
 public class DirectedEdge implements Edge {

http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
index 52b0595..1233a9d 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
@@ -72,8 +72,11 @@ public class ApplicationNodeImpl implements ApplicationNode {
 
     @Override
     public void setState(NodeState newState) {
-        // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
-        myState = newState;
+        if (newState.getLevel() > myState.getLevel()) {
+            myState = newState;
+        } else {
+            throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString());
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java
index 333fcb2..edbeec5 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java
@@ -22,13 +22,23 @@
 package org.apache.airavata.simple.workflow.engine.dag.nodes;
 
 public enum NodeState {
-    WAITING, // waiting on inputs
-    READY, // all inputs are available and ready to execute
-    QUEUED, //
-    PRE_PROCESSING, //
-    EXECUTING, // task has been submitted , not yet finish
-    EXECUTED, // task executed
-    POST_PROCESSING, //
-    FAILED,
-    COMPLETE // all works done
+    WAITING(0), // waiting on inputs
+    READY(1), // all inputs are available and ready to execute
+    QUEUED(2), //
+    PRE_PROCESSING(3), //
+    EXECUTING(4), // task has been submitted , not yet finish
+    EXECUTED(5), // task executed
+    POST_PROCESSING(6), //
+    FAILED(7),
+    COMPLETE(8); // all works done
+
+    private int level;
+
+    NodeState(int level) {
+        this.level = level;
+    }
+
+    public int getLevel() {
+        return level;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
index b3dfa62..7ba8908 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
@@ -62,8 +62,11 @@ public class WorkflowInputNodeImpl implements WorkflowInputNode {
 
     @Override
     public void setState(NodeState newState) {
-        // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
-        myState = newState;
+        if (newState.getLevel() > myState.getLevel()) {
+            myState = newState;
+        } else {
+            throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString());
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
index 5924212..6c80517 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
@@ -63,8 +63,11 @@ public class WorkflowOutputNodeImpl implements WorkflowOutputNode {
 
     @Override
     public void setState(NodeState newState) {
-        // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
-        myState = newState;
+        if (newState.getLevel() > myState.getLevel()) {
+            myState = newState;
+        } else {
+            throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString());
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
index a430879..f7d53be 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
@@ -31,9 +31,19 @@ import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryException;
 import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.airavata.simple.workflow.engine.WorkflowParser;
+import org.apache.airavata.simple.workflow.engine.dag.edge.DirectedEdge;
+import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
 import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNodeImpl;
 import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
 import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNodeImpl;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNodeImpl;
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+import org.apache.airavata.simple.workflow.engine.dag.port.InputPortIml;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
 import org.apache.airavata.simple.workflow.engine.dag.port.OutPortImpl;
 import org.apache.airavata.workflow.model.component.ComponentException;
 import org.apache.airavata.workflow.model.component.system.ConstantComponent;
@@ -49,16 +59,6 @@ import org.apache.airavata.workflow.model.graph.system.SystemDataPort;
 import org.apache.airavata.workflow.model.graph.ws.WSNode;
 import org.apache.airavata.workflow.model.graph.ws.WSPort;
 import org.apache.airavata.workflow.model.wf.Workflow;
-import org.apache.airavata.simple.workflow.engine.WorkflowParser;
-import org.apache.airavata.simple.workflow.engine.dag.edge.DirectedEdge;
-import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNodeImpl;
-import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
-import org.apache.airavata.simple.workflow.engine.dag.port.InputPortIml;
-import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -103,7 +103,7 @@ public class AiravataWorkflowParser implements WorkflowParser {
             wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName());
             wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getId()));
             if (wfInputNode.getInputObject() == null) {
-                // TODO: throw an error and exit.
+                throw new RuntimeException("Workflow Input object is not set, workflow node id: " + wfInputNode.getId());
             }
             portContainers.addAll(processOutPorts(gNode, wfInputNode));
             wfInputNodes.add(wfInputNode);
@@ -139,7 +139,7 @@ public class AiravataWorkflowParser implements WorkflowParser {
                 } else if (wfNode instanceof ApplicationNode) {
                     wfApplicationNode = (ApplicationNode) wfNode;
                 } else {
-                    // TODO : handle this scenario
+                    throw new IllegalArgumentException("Only support for ApplicationNode implementation, but found other type for node implementation");
                 }
                 inPort.setNode(wfApplicationNode);
                 wfApplicationNode.addInPort(inPort);

http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/PortContainer.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/PortContainer.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/PortContainer.java
index db3dda5..4ddb8b9 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/PortContainer.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/PortContainer.java
@@ -21,8 +21,8 @@
 
 package org.apache.airavata.simple.workflow.engine.parser;
 
-import org.apache.airavata.workflow.model.graph.DataPort;
 import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+import org.apache.airavata.workflow.model.graph.DataPort;
 
 
 public class PortContainer {

http://git-wip-us.apache.org/repos/asf/airavata/blob/917adad5/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java b/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java
index 6443806..d843abe 100644
--- a/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java
+++ b/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java
@@ -26,9 +26,9 @@ import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
 import org.apache.airavata.model.workspace.experiment.Experiment;
 import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
 import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
 import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
 import org.apache.airavata.workflow.model.wf.Workflow;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;