You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2016/02/08 17:49:20 UTC

[38/50] [abbrv] airavata git commit: Fixed all compilation issues of workflow-core module

Fixed all compilation issues of workflow-core module


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ce795581
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ce795581
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ce795581

Branch: refs/heads/master
Commit: ce7955813f79812a58bc6395914f3a978b0ae1e9
Parents: e21fae7
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Fri Feb 5 16:29:41 2016 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Fri Feb 5 16:29:41 2016 -0500

----------------------------------------------------------------------
 .../cpi/impl/SimpleOrchestratorImpl.java        |   3 +-
 .../server/OrchestratorServerHandler.java       |  75 +++----
 modules/workflow/workflow-core/pom.xml          |   6 +
 .../core/SimpleWorkflowInterpreter.java         | 197 ++++++-------------
 .../airavata/workflow/core/WorkflowContext.java |  60 ------
 .../workflow/core/WorkflowEnactmentService.java |   2 +-
 .../airavata/workflow/core/WorkflowFactory.java |   1 -
 pom.xml                                         |   1 +
 8 files changed, 105 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index ff515ef..c875180 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -269,11 +269,10 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         return processModels;
     }
 
-    public String createAndSaveTasks(String gatewayId, ExperimentModel experimentModel, ProcessModel processModel) throws OrchestratorException {
+    public String createAndSaveTasks(String gatewayId, ProcessModel processModel, boolean autoSchedule) throws OrchestratorException {
         try {
             ExperimentCatalog experimentCatalog = orchestratorContext.getRegistry().getExperimentCatalog();
             AppCatalog appCatalog = orchestratorContext.getRegistry().getAppCatalog();
-            boolean autoSchedule = experimentModel.getUserConfigurationData().isAiravataAutoSchedule();
             ComputationalResourceSchedulingModel resourceSchedule = processModel.getResourceSchedule();
             String userGivenQueueName = resourceSchedule.getQueueName();
             int userGivenWallTime = resourceSchedule.getWallTimeLimit();

http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index a461ba4..977191e 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -136,45 +136,46 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
         ExperimentModel experiment = null;
         try {
 
-            List<ProcessModel> processes = orchestrator.createProcesses(experimentId, gatewayId);
-            experiment = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
-            for (ProcessModel processModel : processes){
-                String taskDag = orchestrator.createAndSaveTasks(gatewayId, experiment, processModel);
-                processModel.setTaskDag(taskDag);
-                experimentCatalog.update(ExperimentCatalogModelType.PROCESS,processModel, processModel.getProcessId());
-            }
-            if (experiment == null) {
-                log.error(experimentId, "Error retrieving the Experiment by the given experimentID: {} ", experimentId);
-                return false;
-            }
+            String experimentNodePath = GFacUtils.getExperimentNodePath (experimentId);
+			ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentNodePath);
+			String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+			ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode);
 
-            if (!validateProcess(experimentId, processes)) {
-                log.error("Validating process fails for given experiment Id : {}", experimentId);
-                return false;
-            }
-            ComputeResourcePreference computeResourcePreference = appCatalog.getGatewayProfile().
+			ComputeResourcePreference computeResourcePreference = appCatalog.getGatewayProfile().
 					getComputeResourcePreference(gatewayId,
 							experiment.getUserConfigurationData().getComputationalResourceScheduling().getResourceHostId());
-            String token = computeResourcePreference.getResourceSpecificCredentialStoreToken();
-            if (token == null || token.isEmpty()){
-                // try with gateway profile level token
-                GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId);
-                token = gatewayProfile.getCredentialStoreToken();
-            }
-            // still the token is empty, then we fail the experiment
-            if (token == null || token.isEmpty()){
-                log.error("You have not configured credential store token at gateway profile or compute resource preference. Please provide the correct token at gateway profile or compute resource preference.");
-                return false;
-            }
-            String experimentNodePath = GFacUtils.getExperimentNodePath (experimentId);
-	        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentNodePath);
-	        String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
-	        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode);
+			String token = computeResourcePreference.getResourceSpecificCredentialStoreToken();
+			if (token == null || token.isEmpty()){
+				// try with gateway profile level token
+				GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId);
+				token = gatewayProfile.getCredentialStoreToken();
+			}
+			// still the token is empty, then we fail the experiment
+			if (token == null || token.isEmpty()){
+				log.error("You have not configured credential store token at gateway profile or compute resource preference. Please provide the correct token at gateway profile or compute resource preference.");
+				return false;
+			}
+			ExperimentType executionType = experiment.getExperimentType();
+			if (executionType == ExperimentType.SINGLE_APPLICATION) {
+				//its an single application execution experiment
+				List<ProcessModel> processes = orchestrator.createProcesses(experimentId, gatewayId);
+				experiment = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+				if (experiment == null) {
+					log.error(experimentId, "Error retrieving the Experiment by the given experimentID: {} ", experimentId);
+					return false;
+				}
+				for (ProcessModel processModel : processes){
+					String taskDag = orchestrator.createAndSaveTasks(gatewayId, processModel, experiment.getUserConfigurationData().isAiravataAutoSchedule());
+					processModel.setTaskDag(taskDag);
+					experimentCatalog.update(ExperimentCatalogModelType.PROCESS,processModel, processModel.getProcessId());
+				}
+
+				if (!validateProcess(experimentId, processes)) {
+					log.error("Validating process fails for given experiment Id : {}", experimentId);
+					return false;
+				}
 
-	        ExperimentType executionType = experiment.getExperimentType();
-            if (executionType == ExperimentType.SINGLE_APPLICATION) {
-                //its an single application execution experiment
-                log.debug(experimentId, "Launching single application experiment {}.", experimentId);
+				log.debug(experimentId, "Launching single application experiment {}.", experimentId);
                 ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED);
                 status.setReason("submitted all processes");
                 status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
@@ -184,7 +185,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
             } else if (executionType == ExperimentType.WORKFLOW) {
                 //its a workflow execution experiment
                 log.debug(experimentId, "Launching workflow experiment {}.", experimentId);
-                launchWorkflowExperiment(experimentId, token);
+                launchWorkflowExperiment(experimentId, token, gatewayId);
             } else {
                 log.error(experimentId, "Couldn't identify experiment type, experiment {} is neither single application nor workflow.", experimentId);
                 throw new TException("Experiment '" + experimentId + "' launch failed. Unable to figureout execution type for application " + experiment.getExecutionId());
@@ -367,7 +368,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 		}
     }
 
-    private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken) throws TException {
+    private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken, String gatewayId) throws TException {
         // FIXME
 //        try {
 //            WorkflowEnactmentService.getInstance().

http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/workflow/workflow-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/pom.xml b/modules/workflow/workflow-core/pom.xml
index bb1ea79..72990c8 100644
--- a/modules/workflow/workflow-core/pom.xml
+++ b/modules/workflow/workflow-core/pom.xml
@@ -31,6 +31,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <!--Workflow Interpreter dependency-->
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-gfac-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!-- Airavata default parser dependency -->
         <dependency>
             <groupId>org.apache.airavata</groupId>

http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
index 7f8a8a5..cdbf2f2 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
@@ -22,35 +22,25 @@
 package org.apache.airavata.workflow.core;
 
 import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher;
 import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.model.ComponentState;
+import org.apache.airavata.model.ComponentStatus;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.experiment.ExperimentModel;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.ProcessIdentifier;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
 import org.apache.airavata.model.status.ProcessState;
-import org.apache.airavata.model.util.ExperimentModelUtil;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.core.experiment.catalog.model.Experiment;
 import org.apache.airavata.registry.cpi.*;
 import org.apache.airavata.workflow.core.dag.edge.Edge;
-import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode;
-import org.apache.airavata.workflow.core.dag.nodes.NodeState;
-import org.apache.airavata.workflow.core.dag.nodes.InputNode;
-import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
-import org.apache.airavata.workflow.core.dag.nodes.OutputNode;
-import org.apache.airavata.workflow.core.dag.port.InPort;
+import org.apache.airavata.workflow.core.dag.nodes.*;
 import org.apache.airavata.workflow.core.dag.port.OutPort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -68,12 +58,12 @@ class SimpleWorkflowInterpreter{
     private String gatewayName;
 
     private String workflowString;
-    private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<String, WorkflowNode>();
-    private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>();
-    private Map<String, WorkflowContext> processingQueue = new ConcurrentHashMap<String, WorkflowContext>();
-    private Map<String, WorkflowContext> completeList = new HashMap<String, WorkflowContext>();
+    private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<>();
+    private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<>();
+    private Map<String, WorkflowNode> processingQueue = new ConcurrentHashMap<>();
+    private Map<String, WorkflowNode> completeList = new HashMap<>();
     private Registry registry;
-    private List<OutputNode> completeWorkflowOutputs = new ArrayList<OutputNode>();
+    private List<OutputNode> completeWorkflowOutputs = new ArrayList<>();
     private RabbitMQProcessLaunchPublisher publisher;
     private RabbitMQStatusConsumer statusConsumer;
     private String consumerId;
@@ -120,10 +110,11 @@ class SimpleWorkflowInterpreter{
         processReadyList();
     }
 
-    private String getWorkflow() throws AppCatalogException {
+    private String getWorkflow() throws AppCatalogException, WorkflowCatalogException {
         WorkflowCatalog workflowCatalog = RegistryFactory.getAppCatalog().getWorkflowCatalog();
         //FIXME: parse workflowTemplateId or experimentId
-        workflowCatalog.getWorkflow("");
+//        workflowCatalog.getWorkflow("");
+        return "";
     }
 
     // try to remove synchronization tag
@@ -141,15 +132,16 @@ class SimpleWorkflowInterpreter{
                 OutputNode outputNode = (OutputNode) readyNode;
                 outputNode.getOutputObject().setValue(outputNode.getInPort().getInputObject().getValue());
                 addToCompleteOutputNodeList(outputNode);
-                continue;
+            } else if (readyNode instanceof InputNode) {
+                // set input object of applications and add applications to ready List.
+            } else if (readyNode instanceof ApplicationNode) {
+                //  call orchestrator to create process for the application
+            } else {
+                throw new RuntimeException("Unsupported workflow node type");
             }
-            WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
-            TaskDetails process = getProcess(workflowNodeDetails);
-            WorkflowContext workflowContext = new WorkflowContext(readyNode, workflowNodeDetails, process);
-            addToProcessingQueue(workflowContext);
-            publishToProcessQueue(process);
         }
-        if (processingQueue.isEmpty()) {
+
+        if (processingQueue.isEmpty() && waitingList.isEmpty()) {
             try {
                 saveWorkflowOutputs();
             } catch (AppCatalogException e) {
@@ -159,64 +151,16 @@ class SimpleWorkflowInterpreter{
     }
 
     private void saveWorkflowOutputs() throws AppCatalogException {
-        List<OutputDataObjectType> outputDataObjects = new ArrayList<OutputDataObjectType>();
+        List<OutputDataObjectType> outputDataObjects = new ArrayList<>();
         for (OutputNode completeWorkflowOutput : completeWorkflowOutputs) {
             outputDataObjects.add(completeWorkflowOutput.getOutputObject());
         }
-        RegistryFactory.getAppCatalog().getWorkflowCatalog()
-                .updateWorkflowOutputs(experiment.getApplicationId(), outputDataObjects);
-    }
-
-
-    private void publishToProcessQueue(TaskDetails process) throws AiravataException {
-        ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
-        processSubmitEvent.setCredentialToken(credentialToken);
-        processSubmitEvent.setTaskId(process.getTaskID());
-        MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.TASK, process.getTaskID(), null);
-        messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-        publisher.publish(messageContext);
-    }
-
-    private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException {
-        // create workflow taskDetails from workflowNodeDetails
-        TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromWorkflowNodeDetails(getExperiment(), wfNodeDetails);
-        taskDetails.setTaskID(getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK_DETAIL, taskDetails, wfNodeDetails.getNodeInstanceId()).toString());
-        return taskDetails;
-    }
-
-    private WorkflowNodeDetails createWorkflowNodeDetails(WorkflowNode readyNode) throws RegistryException {
-        WorkflowNodeDetails wfNodeDetails = ExperimentModelUtil.createWorkflowNode(readyNode.getId(), null);
-        ExecutionUnit executionUnit = ExecutionUnit.APPLICATION;
-        String executionData = null;
-        if (readyNode instanceof ApplicationNode) {
-            executionUnit = ExecutionUnit.APPLICATION;
-            executionData = ((ApplicationNode) readyNode).getApplicationId();
-            setupNodeDetailsInput(((ApplicationNode) readyNode), wfNodeDetails);
-        } else if (readyNode instanceof InputNode) {
-            executionUnit = ExecutionUnit.INPUT;
-        } else if (readyNode instanceof OutputNode) {
-            executionUnit = ExecutionUnit.OUTPUT;
-        }
-        wfNodeDetails.setExecutionUnit(executionUnit);
-        wfNodeDetails.setExecutionUnitData(executionData);
-        wfNodeDetails.setNodeInstanceId((String) getRegistry().getExperimentCatalog().add(ExpCatChildDataType.WORKFLOW_NODE_DETAIL, wfNodeDetails, getExperiment().getExperimentID()));
-        return wfNodeDetails;
+//        RegistryFactory.getAppCatalog().getWorkflowCatalog()
+//                .updateWorkflowOutputs(experiment.getApplicationId(), outputDataObjects);
     }
 
-    private void setupNodeDetailsInput(ApplicationNode readyAppNode, WorkflowNodeDetails wfNodeDetails) {
-        if (readyAppNode.isReady()) {
-            for (InPort inPort : readyAppNode.getInputPorts()) {
-                wfNodeDetails.addToNodeInputs(inPort.getInputObject());
-            }
-        } else {
-            throw new IllegalArgumentException("Application node should be in ready state to set inputs to the " +
-                    "workflow node details, nodeId = " + readyAppNode.getId());
-        }
-    }
-
-
     private void processWorkflowInputNodes(List<InputNode> inputNodes) {
-        Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>();
+        Set<WorkflowNode> tempNodeSet = new HashSet<>();
         for (InputNode inputNode : inputNodes) {
             if (inputNode.isReady()) {
                 log.debug("Workflow node : " + inputNode.getId() + " is ready to execute");
@@ -251,16 +195,6 @@ class SimpleWorkflowInterpreter{
         return registry;
     }
 
-    public Experiment getExperiment() {
-        return experiment;
-    }
-
-    private void updateWorkflowNodeStatus(WorkflowNodeDetails wfNodeDetails, WorkflowNodeState state) throws RegistryException{
-        WorkflowNodeStatus status = ExperimentModelUtil.createWorkflowNodeStatus(state);
-        wfNodeDetails.setWorkflowNodeStatus(status);
-        getRegistry().getExperimentCatalog().update(ExperimentCatalogModelType.WORKFLOW_NODE_STATUS, status, wfNodeDetails.getNodeInstanceId());
-    }
-
     /**
      * Package-Private method.
      * Remove the workflow node from waiting queue and add it to the ready queue.
@@ -278,16 +212,16 @@ class SimpleWorkflowInterpreter{
     /**
      * First remove the node from ready list and then add the WfNodeContainer to the process queue.
      * Note that underline data structure of the process queue is a Map.
-     * @param workflowContext - has both workflow and correspond workflowNodeDetails and TaskDetails
+     * @param applicationNode - has both workflow and correspond workflowNodeDetails and TaskDetails
      */
-    private synchronized void addToProcessingQueue(WorkflowContext workflowContext) {
-        readyList.remove(workflowContext.getWorkflowNode().getId());
-        processingQueue.put(workflowContext.getTaskDetails().getTaskID(), workflowContext);
+    private synchronized void addToProcessingQueue(ApplicationNode applicationNode) {
+        readyList.remove(applicationNode.getId());
+        processingQueue.put(applicationNode.getId(), applicationNode);
     }
 
-    private synchronized void addToCompleteQueue(WorkflowContext workflowContext) {
-        processingQueue.remove(workflowContext.getTaskDetails().getTaskID());
-        completeList.put(workflowContext.getTaskDetails().getTaskID(), workflowContext);
+    private synchronized void addToCompleteQueue(ApplicationNode applicationNode) {
+        processingQueue.remove(applicationNode.getId());
+        completeList.put(applicationNode.getId(), applicationNode);
     }
 
 
@@ -301,19 +235,18 @@ class SimpleWorkflowInterpreter{
     }
 
     private void setExperiment(String experimentId) throws RegistryException {
-        experiment = (Experiment) getRegistry().getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+        experiment = (ExperimentModel) getRegistry().getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
         log.debug("Retrieve Experiment for experiment id : " + experimentId);
     }
 
-    synchronized void handleTaskOutputChangeEvent(TaskOutputChangeEvent taskOutputChangeEvent) {
+/*    synchronized void handleTaskOutputChangeEvent(ProcessStatusChangeEvent taskOutputChangeEvent) {
 
         String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId();
         log.debug("Task Output changed event received for workflow node : " +
                 taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
-        WorkflowContext workflowContext = processingQueue.get(taskId);
-        Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
-        if (workflowContext != null) {
-            WorkflowNode workflowNode = workflowContext.getWorkflowNode();
+        WorkflowNode workflowNode = processingQueue.get(taskId);
+        Set<WorkflowNode> tempWfNodeSet = new HashSet<>();
+        if (workflowNode != null) {
             if (workflowNode instanceof ApplicationNode) {
                 ApplicationNode applicationNode = (ApplicationNode) workflowNode;
                 // Workflow node can have one to many output ports and each output port can have one to many links
@@ -331,9 +264,9 @@ class SimpleWorkflowInterpreter{
                         }
                     }
                 }
+                addToCompleteQueue(applicationNode);
+                log.debug("removed task from processing queue : " + taskId);
             }
-            addToCompleteQueue(workflowContext);
-            log.debug("removed task from processing queue : " + taskId);
             try {
                 processReadyList();
             } catch (Exception e) {
@@ -341,69 +274,55 @@ class SimpleWorkflowInterpreter{
                 continueWorkflow = false;
             }
         }
-    }
+    }*/
 
     void handleProcessStatusChangeEvent(ProcessStatusChangeEvent processStatusChangeEvent) {
         ProcessState processState = processStatusChangeEvent.getState();
         ProcessIdentifier processIdentity = processStatusChangeEvent.getProcessIdentity();
         String processId = processIdentity.getProcessId();
-        WorkflowContext workflowContext = processingQueue.get(processId);
-        if (workflowContext != null) {
-            WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED;
+        ApplicationNode applicationNode = (ApplicationNode) processingQueue.get(processId);
+        if (applicationNode != null) {
+            ComponentState state = applicationNode.getState();
             switch (processState) {
                 case CREATED:
                 case VALIDATED:
                 case STARTED:
                     break;
                 case CONFIGURING_WORKSPACE:
-                    wfNodeState = WorkflowNodeState.COMPLETED;
-                    break;
                 case PRE_PROCESSING:
-                    wfNodeState = WorkflowNodeState.INVOKED;
-                    workflowContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
-                    break;
                 case INPUT_DATA_STAGING:
-                    wfNodeState = WorkflowNodeState.INVOKED;
-                    workflowContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
-                    break;
                 case EXECUTING:
-                    wfNodeState = WorkflowNodeState.EXECUTING;
-                    workflowContext.getWorkflowNode().setState(NodeState.EXECUTING);
-                    break;
                 case OUTPUT_DATA_STAGING:
-                    wfNodeState = WorkflowNodeState.COMPLETED;
-                    workflowContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
-                    break;
                 case POST_PROCESSING:
-                    wfNodeState = WorkflowNodeState.COMPLETED;
-                    workflowContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+                    state = ComponentState.RUNNING;
                     break;
                 case COMPLETED:
-                    wfNodeState = WorkflowNodeState.COMPLETED;
-                    workflowContext.getWorkflowNode().setState(NodeState.EXECUTED);
+                    state = ComponentState.COMPLETED;
                     break;
                 case FAILED:
-                    wfNodeState = WorkflowNodeState.FAILED;
-                    workflowContext.getWorkflowNode().setState(NodeState.FAILED);
+                    state = ComponentState.FAILED;
                     break;
                 case CANCELED:
                 case CANCELLING:
-                    wfNodeState = WorkflowNodeState.CANCELED;
-                    workflowContext.getWorkflowNode().setState(NodeState.FAILED);
+                    state = ComponentState.CANCELED;
                     break;
                 default:
                     break;
             }
-            if (wfNodeState != WorkflowNodeState.UNKNOWN) {
+            if (state != applicationNode.getState()) {
                 try {
-                    updateWorkflowNodeStatus(workflowContext.getWfNodeDetails(), wfNodeState);
+                    updateWorkflowNodeStatus(applicationNode, new ComponentStatus(state));
                 } catch (RegistryException e) {
-                    log.error("Error while updating workflow node status update to the registry. nodeInstanceId :"
-                            + workflowContext.getWfNodeDetails().getNodeInstanceId() + " status to: "
-                            + workflowContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e);
+                    log.error("Error! Couldn't update new application state to registry. nodeInstanceId : {} "
+                            + applicationNode.getId() + " status to: " + applicationNode.getState().toString() , e);
                 }
             }
         }
 
     }
+
+    private void updateWorkflowNodeStatus(ApplicationNode applicationNode, ComponentStatus componentStatus) throws RegistryException {
+        // FIXME: save new workflow node status to registry.
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowContext.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowContext.java
deleted file mode 100644
index 47bd9ca..0000000
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowContext.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.core;
-
-import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
-
-public class WorkflowContext {
-    private WorkflowNode workflowNode;
-    private WorkflowNodeDetails wfNodeDetails;
-    private TaskDetails taskDetails;
-
-    public WorkflowContext(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) {
-        this.workflowNode = workflowNode;
-        this.wfNodeDetails = wfNodeDetails;
-        this.taskDetails = taskDetails;
-    }
-
-    public WorkflowNode getWorkflowNode() {
-        return workflowNode;
-    }
-
-    public void setWorkflowNode(WorkflowNode workflowNode) {
-        this.workflowNode = workflowNode;
-    }
-
-    public WorkflowNodeDetails getWfNodeDetails() {
-        return wfNodeDetails;
-    }
-
-    public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) {
-        this.wfNodeDetails = wfNodeDetails;
-    }
-
-    public TaskDetails getTaskDetails() {
-        return taskDetails;
-    }
-
-    public void setTaskDetails(TaskDetails taskDetails) {
-        this.taskDetails = taskDetails;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
index aaa3073..34ef8a7 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
@@ -142,7 +142,7 @@ public class WorkflowEnactmentService {
                 TaskIdentifier taskIdentifier = event.getTaskIdentity();
                 simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId());
                 if (simpleWorkflowInterpreter != null) {
-                    simpleWorkflowInterpreter.handleTaskOutputChangeEvent(event);
+//                    simpleWorkflowInterpreter.handleTaskOutputChangeEvent(event);
                     if (simpleWorkflowInterpreter.isAllDone()) {
                         workflowMap.remove(taskIdentifier.getExperimentId());
                     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
index e06fab5..9392461 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
@@ -23,7 +23,6 @@ package org.apache.airavata.workflow.core;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.workflow.core.parser.AiravataWorkflowBuilder;
 import org.apache.airavata.workflow.core.parser.JsonWorkflowParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/airavata/blob/ce795581/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c82cc8d..e41e3f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -557,6 +557,7 @@
 				<module>modules/credential-store</module>
 				<module>modules/orchestrator</module>
 				<module>modules/server</module>
+				<module>modules/workflow</module>
 				<module>modules/test-suite</module>
 				<!-- Deprecated Modules-->
 				<!--<module>modules/integration-tests</module>-->