You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/10/15 23:51:50 UTC

[1/2] airavata git commit: AutoScheduling gfac side changes

Repository: airavata
Updated Branches:
  refs/heads/orchestratorTaskBreakdown 24b83d8f9 -> 10d593c95


http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 946055e..52cd395 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -21,22 +21,24 @@
 
 package org.apache.airavata.gfac.impl;
 
-import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.gfac.core.GFacEngine;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.gfac.core.monitor.JobMonitor;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.TaskModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.text.MessageFormat;
+import java.util.List;
+import java.util.Map;
 
 public class GFacWorker implements Runnable {
 
@@ -46,7 +48,7 @@ public class GFacWorker implements Runnable {
 	private String processId;
 	private String gatewayId;
 	private String tokenId;
-	private boolean runOutflow = false;
+	private boolean continueTaskFlow = false;
 
 
 	/**
@@ -61,7 +63,7 @@ public class GFacWorker implements Runnable {
 		this.tokenId = processContext.getTokenId();
 		engine = Factory.getGFacEngine();
 		this.processContext = processContext;
-		runOutflow = true;
+		continueTaskFlow = true;
 	}
 
 	/**
@@ -98,19 +100,15 @@ public class GFacWorker implements Runnable {
 				case CONFIGURING_WORKSPACE:
 				case INPUT_DATA_STAGING:
 				case EXECUTING:
-					recoverProcess();
-					break;
 				case MONITORING:
-					if (runOutflow) {
-						runProcessOutflow();
-					} else {
-						monitorProcess();
-					}
-					break;
-				case OUTPUT_DATA_STAGING:
-				case POST_PROCESSING:
-					recoverProcessOutflow();
-					break;
+                case OUTPUT_DATA_STAGING:
+                case POST_PROCESSING:
+                    if (continueTaskFlow) {
+                        continueTaskExecution();
+                    } else {
+                        recoverProcess();
+                    }
+                    break;
 				case COMPLETED:
 					completeProcess();
 					break;
@@ -160,56 +158,94 @@ public class GFacWorker implements Runnable {
 		Factory.getGfacContext().removeProcess(processContext.getProcessId());
 	}
 
-	private void recoverProcessOutflow() throws GFacException {
-		engine.recoverProcessOutflow(processContext);
-		if (processContext.isInterrupted()) {
-			return;
-		}
-		completeProcess();
-	}
+	private void continueTaskExecution() throws GFacException {
+        // checkpoint
+        if (processContext.isInterrupted()) {
+            return;
+        }
+        processContext.setPauseTaskExecution(false);
+        List<String> taskExecutionOrder = processContext.getTaskExecutionOrder();
+        String currentExecutingTaskId = processContext.getCurrentExecutingTaskId();
+        boolean found = false;
+        String nextTaskId = null;
+        for (String taskId : taskExecutionOrder) {
+            if (!found) {
+                if (taskId.equalsIgnoreCase(currentExecutingTaskId)) {
+                    found = true;
+                }
+                continue;
+            } else {
+                nextTaskId = taskId;
+                break;
+            }
+        }
+        if (nextTaskId != null) {
+            engine.continueProcess(processContext, nextTaskId);
+        }
+        // checkpoint
+        if (processContext.isInterrupted()) {
+            return;
+        }
 
-	private void runProcessOutflow() throws GFacException {
-		engine.runProcessOutflow(processContext);
-		if (processContext.isInterrupted()) {
-			return;
-		}
-		completeProcess();
-	}
+        if (processContext.isComplete()) {
+            completeProcess();
+        }
+    }
 
 	private void recoverProcess() throws GFacException {
-		engine.recoverProcess(processContext);
-		if (processContext.isInterrupted()) {
-			return;
-		}
-		monitorProcess();
-	}
+
+        String taskDag = processContext.getProcessModel().getTaskDag();
+        List<String> taskExecutionOrder = GFacUtils.parseTaskDag(taskDag);
+        Map<String, TaskModel> taskMap = processContext.getTaskMap();
+        String recoverTaskId = null;
+        for (String taskId : taskExecutionOrder) {
+            TaskModel taskModel = taskMap.get(taskId);
+            TaskState state = taskModel.getTaskStatus().getState();
+            if (state == TaskState.CREATED || state == TaskState.EXECUTING) {
+                recoverTaskId = taskId;
+                break;
+            }
+        }
+
+        engine.recoverProcess(processContext, recoverTaskId);
+        if (processContext.isInterrupted()) {
+            return;
+        }
+
+        if (processContext.isComplete()) {
+            completeProcess();
+        }
+    }
 
 	private void executeProcess() throws GFacException {
 		engine.executeProcess(processContext);
 		if (processContext.isInterrupted()) {
 			return;
 		}
-		monitorProcess();
-	}
 
-	private void monitorProcess() throws GFacException {
-		try {
-			JobMonitor monitorService = Factory.getMonitorService(processContext.getMonitorMode());
-			if (monitorService != null) {
-				monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
-                ProcessStatus status = new ProcessStatus(ProcessState.MONITORING);
-                status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-                processContext.setProcessStatus(status);
-				GFacUtils.saveAndPublishProcessStatus(processContext);
-			} else {
-				// we directly invoke outflow
-				runProcessOutflow();
-			}
-		} catch (AiravataException e) {
-			throw new GFacException("Error while retrieving moniot service", e);
-		}
+        if (processContext.isComplete()) {
+            completeProcess();
+        }
 	}
 
+//	private void monitorProcess() throws GFacException {
+//		try {
+//			JobMonitor monitorService = Factory.getMonitorService(processContext.getMonitorMode());
+//			if (monitorService != null) {
+//				monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
+//                ProcessStatus status = new ProcessStatus(ProcessState.MONITORING);
+//                status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+//                processContext.setProcessStatus(status);
+//				GFacUtils.saveAndPublishProcessStatus(processContext);
+//			} else {
+//				// we directly invoke outflow
+//				continueTaskExecution();
+//			}
+//		} catch (AiravataException e) {
+//			throw new GFacException("Error while retrieving moniot service", e);
+//		}
+//	}
+
 	private void sendAck() {
 		try {
 			long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(),

http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 359d39c..c4d4676 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -32,10 +32,6 @@ import org.apache.airavata.gfac.core.monitor.EmailParser;
 import org.apache.airavata.gfac.core.monitor.JobMonitor;
 import org.apache.airavata.gfac.core.monitor.JobStatusResult;
 import org.apache.airavata.gfac.impl.GFacWorker;
-import org.apache.airavata.gfac.monitor.email.parser.LSFEmailParser;
-import org.apache.airavata.gfac.monitor.email.parser.PBSEmailParser;
-import org.apache.airavata.gfac.monitor.email.parser.SLURMEmailParser;
-import org.apache.airavata.gfac.monitor.email.parser.UGEEmailParser;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.JobState;
@@ -52,7 +48,6 @@ import javax.mail.Session;
 import javax.mail.Store;
 import javax.mail.search.FlagTerm;
 import javax.mail.search.SearchTerm;
-import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -124,6 +119,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
 	public void monitor(String jobId, ProcessContext processContext) {
 		log.info("[EJM]: Added monitor Id : " + jobId + " to email based monitor map");
 		jobMonitorMap.put(jobId, processContext);
+        processContext.setPauseTaskExecution(true);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index b892a00..4468a1c 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -371,7 +371,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
         taskModel.setTaskStatus(taskStatus);
         taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
         JobSubmissionTaskModel submissionSubTask = new JobSubmissionTaskModel();
-        submissionSubTask.setMonitorMode(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR);
+//        submissionSubTask.setMonitorMode(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR);
         byte[] bytes = ThriftUtils.serializeThriftObject(submissionSubTask);
         taskModel.setSubTaskModel(bytes);
         orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, taskModel,


[2/2] airavata git commit: AutoScheduling gfac side changes

Posted by sh...@apache.org.
AutoScheduling gfac side changes


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/10d593c9
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/10d593c9
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/10d593c9

Branch: refs/heads/orchestratorTaskBreakdown
Commit: 10d593c95260b1d3c2b4a578652be4e0dd4a7f95
Parents: 24b83d8
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Thu Oct 15 17:51:38 2015 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Thu Oct 15 17:51:38 2015 -0400

----------------------------------------------------------------------
 .../apache/airavata/gfac/core/GFacEngine.java   |    6 +-
 .../apache/airavata/gfac/core/GFacUtils.java    |  260 +----
 .../gfac/core/context/ProcessContext.java       |   88 +-
 .../airavata/gfac/core/context/TaskContext.java |   10 +
 .../airavata/gfac/impl/GFacEngineImpl.java      | 1085 ++++++++----------
 .../apache/airavata/gfac/impl/GFacWorker.java   |  144 ++-
 .../gfac/monitor/email/EmailBasedMonitor.java   |    6 +-
 .../cpi/impl/SimpleOrchestratorImpl.java        |    2 +-
 8 files changed, 684 insertions(+), 917 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
index 176f74c..1d3ad30 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
@@ -29,11 +29,9 @@ public interface GFacEngine {
 
 	public void executeProcess(ProcessContext processContext) throws GFacException ;
 
-	public void recoverProcess(ProcessContext processContext) throws GFacException ;
+    public void recoverProcess(ProcessContext processContext, String taskId) throws GFacException;
 
-	public void runProcessOutflow(ProcessContext processContext) throws GFacException ;
-
-	public void recoverProcessOutflow(ProcessContext processContext) throws GFacException ;
+	public void continueProcess(ProcessContext processContext, String taskId) throws GFacException ;
 
 	public void cancelProcess(ProcessContext processContext) throws GFacException ;
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index e51e32b..871a9ab 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -21,11 +21,7 @@
 package org.apache.airavata.gfac.core;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.DBUtil;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ZkConstants;
+import org.apache.airavata.common.utils.*;
 import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
 import org.apache.airavata.gfac.core.context.ProcessContext;
@@ -46,14 +42,12 @@ import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
 import org.apache.airavata.model.status.*;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.core.experiment.catalog.model.ProcessOutput;
 import org.apache.airavata.registry.cpi.*;
 import org.apache.airavata.registry.cpi.utils.Constants;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
@@ -86,7 +80,6 @@ public class GFacUtils {
     private GFacUtils() {
     }
 
-
     /**
      * Read data from inputStream and convert it to String.
      *
@@ -117,21 +110,6 @@ public class GFacUtils {
         }
     }
 
-//	/**
-//	 * This returns true if the give job is finished
-//	 * otherwise false
-//	 *
-//	 * @param job
-//	 * @return
-//	 */
-//	public static boolean isJobFinished(JobDescriptor job) {
-//		if (org.apache.airavata.gfac.core.cluster.JobStatus.C.toString().equals(job.getStatus())) {
-//			return true;
-//		} else {
-//			return false;
-//		}
-//	}
-
     /**
      * This will read
      *
@@ -158,17 +136,6 @@ public class GFacUtils {
         }
     }
 
-//	public static boolean isSynchronousMode(
-//			JobExecutionContext jobExecutionContext) {
-//		GFacConfiguration gFacConfiguration = jobExecutionContext
-//				.getGFacConfiguration();
-//		if (ExecutionMode.ASYNCHRONOUS.equals(gFacConfiguration
-//				.getExecutionMode())) {
-//			return false;
-//		}
-//		return true;
-//	}
-
     public static String readFileToString(String file)
             throws FileNotFoundException, IOException {
         BufferedReader instream = null;
@@ -315,225 +282,6 @@ public class GFacUtils {
         }
     }
 
-/*    public static void saveExperimentStatus(ProcessContext processContext,
-                                         ExperimentState state) throws GFacException {
-        try {
-            // first we save job jobModel to the registry for sa and then save the job status.
-            ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
-            ExperimentStatus status = new ExperimentStatus();
-            status.setState(state);
-            status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-            experimentCatalog.add(ExpCatChildDataType.EXPERIMENT_STATUS, status, processContext.getProcessModel().getExperimentId());
-            ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent(state, processContext.getProcessModel().getExperimentId(), processContext.getGatewayId());
-
-            processContext.getLocalEventPublisher().publish(experimentStatusChangeEvent);
-        } catch (Exception e) {
-            throw new GFacException("Error persisting experiment status"
-                    + e.getLocalizedMessage(), e);
-        }
-    }*/
-
-//	public static void updateJobStatus(JobExecutionContext jobExecutionContext,
-//			JobDetails details, JobState state) throws GFacException {
-//		try {
-//			ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
-//			JobStatus status = new JobStatus();
-//			status.setJobState(state);
-//			status.setTimeOfStateChange(Calendar.getInstance()
-//					.getTimeInMillis());
-//			details.setJobStatus(status);
-//			experimentCatalog.update(
-//					ExperimentCatalogModelType.JOB_DETAIL,
-//					details, details.getJobID());
-//		} catch (Exception e) {
-//			throw new GFacException("Error persisting job status"
-//					+ e.getLocalizedMessage(), e);
-//		}
-//	}
-
-//	public static void saveErrorDetails(
-//			ProcessContext processContext, String errorMessage)
-//			throws GFacException {
-//		try {
-//			ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
-//			ErrorModel details = new ErrorModel();
-//			details.setActualErrorMessage(errorMessage);
-//			details.setCreationTime(Calendar.getInstance().getTimeInMillis());
-//			// FIXME : Save error model according to new data model
-////            experimentCatalog.add(ExpCatChildDataType.ERROR_DETAIL, details,
-////					jobExecutionContext.getTaskData().getTaskID());
-//		} catch (Exception e) {
-//			throw new GFacException("Error persisting job status"
-//					+ e.getLocalizedMessage(), e);
-//		}
-//	}
-
-    public static Map<String, Object> getInputParamMap(List<InputDataObjectType> experimentData) throws GFacException {
-        Map<String, Object> map = new HashMap<String, Object>();
-        for (InputDataObjectType objectType : experimentData) {
-            map.put(objectType.getName(), objectType);
-        }
-        return map;
-    }
-
-    public static Map<String, Object> getOuputParamMap(List<OutputDataObjectType> experimentData) throws GFacException {
-        Map<String, Object> map = new HashMap<String, Object>();
-        for (OutputDataObjectType objectType : experimentData) {
-            map.put(objectType.getName(), objectType);
-        }
-        return map;
-    }
-
-//	public static GfacExperimentState getZKExperimentState(CuratorFramework curatorClient,
-//			JobExecutionContext jobExecutionContext)
-//			throws Exception {
-//		String expState = AiravataZKUtils.getExpState(curatorClient, jobExecutionContext
-//				.getExperimentID());
-//        if (expState == null || expState.isEmpty()) {
-//            return GfacExperimentState.UNKNOWN;
-//        }
-//        return GfacExperimentState.findByValue(Integer.valueOf(expState));
-//    }
-//
-//	public static boolean createHandlerZnode(CuratorFramework curatorClient,
-//                                             JobExecutionContext jobExecutionContext, String className)
-//			throws Exception {
-//		String expState = AiravataZKUtils.getExpZnodeHandlerPath(
-//				jobExecutionContext.getExperimentID(), className);
-//		Stat exists = curatorClient.checkExists().forPath(expState);
-//		if (exists == null) {
-//			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(expState, new byte[0]);
-//			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-//					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
-//		} else {
-//			exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-//			if (exists == null) {
-//				curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-//						.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
-//			}
-//		}
-//
-//		exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-//		if (exists != null) {
-//			curatorClient.setData().withVersion(exists.getVersion())
-//					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-//							String.valueOf(GfacHandlerState.INVOKING.getValue()).getBytes());
-//		}
-//		return true;
-//	}
-
-//	public static boolean createHandlerZnode(CuratorFramework curatorClient,
-//                                             JobExecutionContext jobExecutionContext, String className,
-//                                             GfacHandlerState state) throws Exception {
-//		String expState = AiravataZKUtils.getExpZnodeHandlerPath(
-//				jobExecutionContext.getExperimentID(), className);
-//		Stat exists = curatorClient.checkExists().forPath(expState);
-//		if (exists == null) {
-//			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-//					.forPath(expState, new byte[0]);
-//			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-//					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
-//		} else {
-//			exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-//			if (exists == null) {
-//				curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-//						.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
-//			}
-//		}
-//
-//		exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-//		if (exists != null) {
-//			curatorClient.setData().withVersion(exists.getVersion())
-//					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-//							String.valueOf(state.getValue()).getBytes());
-//		}
-//		return true;
-//	}
-
-//	public static boolean updateHandlerState(CuratorFramework curatorClient,
-//                                             JobExecutionContext jobExecutionContext, String className,
-//                                             GfacHandlerState state) throws Exception {
-//		String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath(
-//				jobExecutionContext.getExperimentID(), className);
-//		Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-//		if (exists != null) {
-//			curatorClient.setData().withVersion(exists.getVersion())
-//					.forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes());
-//		} else {
-//			createHandlerZnode(curatorClient, jobExecutionContext, className, state);
-//		}
-//		return false;
-//	}
-
-//	public static GfacHandlerState getHandlerState(CuratorFramework curatorClient,
-//                                                  JobExecutionContext jobExecutionContext, String className) {
-//		try {
-//			String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath( jobExecutionContext.getExperimentID(), className);
-//			Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-//			if (exists != null) {
-//				String stateVal = new String(curatorClient.getData().storingStatIn(exists)
-//						.forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE));
-//				return GfacHandlerState.findByValue(Integer.valueOf(stateVal));
-//			}
-//			return GfacHandlerState.UNKNOWN; // if the node doesn't exist or any other error we
-//							// return false
-//		} catch (Exception e) {
-//			log.error("Error occured while getting zk node status", e);
-//			return null;
-//		}
-//	}
-
-//	// This method is dangerous because of moving the experiment data
-//	public static boolean createExperimentEntryForPassive(String experimentID,
-//														  String taskID, CuratorFramework curatorClient, String experimentNode,
-//														  String pickedChild, String tokenId, long deliveryTag) throws Exception {
-//		String experimentPath = experimentNode + File.separator + pickedChild;
-//		String newExperimentPath = experimentPath + File.separator + experimentID;
-//		Stat exists1 = curatorClient.checkExists().forPath(newExperimentPath);
-//		String oldExperimentPath = GFacUtils.findExperimentEntry(experimentID, curatorClient);
-//		if (oldExperimentPath == null) {  // this means this is a very new experiment
-//			// are going to create a new node
-//			log.info("This is a new Job, so creating all the experiment docs from the scratch");
-//			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath, new byte[0]);
-//            String stateNodePath = curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-//					.forPath(newExperimentPath + File.separator + "state",
-//							String .valueOf(GfacExperimentState.LAUNCHED.getValue()) .getBytes());
-//
-//			if(curatorClient.checkExists().forPath(stateNodePath)!=null) {
-//				log.info("Created the node: " + stateNodePath + " successfully !");
-//			}else {
-//				log.error("Error creating node: " + stateNodePath + " successfully !");
-//			}
-//			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-//					.forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag));
-//		} else {
-//			log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " was running by some Gfac instance,but it failed");
-//            removeCancelDeliveryTagNode(oldExperimentPath, curatorClient); // remove previous cancel deliveryTagNode
-//            if(newExperimentPath.equals(oldExperimentPath)){
-//                log.info("Re-launch experiment came to the same GFac instance");
-//            }else {
-//				log.info("Re-launch experiment came to a new GFac instance so we are moving data to new gfac node");
-//				curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath,
-//						curatorClient.getData().storingStatIn(exists1).forPath(oldExperimentPath)); // recursively copy children
-//                copyChildren(curatorClient, oldExperimentPath, newExperimentPath, 2); // we need to copy children up to depth 2
-//				String oldDeliveryTag = oldExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX;
-//				Stat exists = curatorClient.checkExists().forPath(oldDeliveryTag);
-//				if(exists!=null) {
-//					curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
-//							.forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX,
-//									curatorClient.getData().storingStatIn(exists).forPath(oldDeliveryTag));
-//					ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldDeliveryTag, true);
-//				}
-//				// After all the files are successfully transfered we delete the // old experiment,otherwise we do
-//				// not delete a single file
-//				log.info("After a successful copying of experiment data for an old experiment we delete the old data");
-//				log.info("Deleting experiment data: " + oldExperimentPath);
-//				ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldExperimentPath, true);
-//			}
-//		}
-//		return true;
-//	}
-
     private static void removeCancelDeliveryTagNode(String experimentPath, CuratorFramework curatorClient) throws Exception {
         Stat exists = curatorClient.checkExists().forPath(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
         if (exists != null) {
@@ -1202,4 +950,10 @@ public class GFacUtils {
         }
         return jobModel;
     }
+
+    public static List<String> parseTaskDag(String taskDag) {
+        // TODO - parse taskDag and create taskId list
+        List<String> taskIds = new ArrayList<>();
+        return taskIds;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index c2f4d83..47a7430 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -38,12 +38,14 @@ import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.task.TaskModel;
 import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.registry.cpi.ExperimentCatalog;
 import org.apache.curator.framework.CuratorFramework;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -63,7 +65,6 @@ public class ProcessContext {
 	private String inputDir;
 	private String outputDir;
 	private String localWorkingDir;
-	private List<TaskContext> taskChain;
 	private GatewayResourceProfile gatewayResourceProfile;
 	private ComputeResourceDescription computeResourceDescription;
 	private ApplicationDeploymentDescription applicationDeploymentDescription;
@@ -81,6 +82,13 @@ public class ProcessContext {
 	private boolean handOver;
 	private boolean cancel;
     private ServerInfo serverInfo;
+    private List<String> taskExecutionOrder;
+    private List<TaskModel> taskList;
+    private Map<String, TaskModel> taskMap;
+    private String currentExecutingTaskId; // TaskId of current executing task.
+    private boolean pauseTaskExecution = false;  // Task can pause task execution by setting this value
+    private boolean complete = false; // all tasks executed?
+    private boolean recovery = false; // is process in recovery mode?
 
     /**
 	 * Note: process context property use lazy loading approach. In runtime you will see some properties as null
@@ -162,14 +170,6 @@ public class ProcessContext {
 		this.workingDir = workingDir;
 	}
 
-	public List<TaskContext> getTaskChain() {
-		return taskChain;
-	}
-
-	public void setTaskChain(List<TaskContext> taskChain) {
-		this.taskChain = taskChain;
-	}
-
 	public GatewayResourceProfile getGatewayResourceProfile() {
 		return gatewayResourceProfile;
 	}
@@ -279,6 +279,44 @@ public class ProcessContext {
 		this.dataMovementProtocol = dataMovementProtocol;
 	}
 
+    public String getTaskDag() {
+        return getProcessModel().getTaskDag();
+    }
+
+    public List<TaskModel> getTaskList() {
+        if (taskList == null) {
+            synchronized (TaskModel.class){
+                if (taskList == null) {
+                    taskList = getProcessModel().getTasks();
+                }
+            }
+        }
+        return taskList;
+    }
+
+
+    public List<String> getTaskExecutionOrder() {
+        return taskExecutionOrder;
+    }
+
+    public void setTaskExecutionOrder(List<String> taskExecutionOrder) {
+        this.taskExecutionOrder = taskExecutionOrder;
+    }
+
+    public Map<String, TaskModel> getTaskMap() {
+        if (taskMap == null) {
+            synchronized (TaskModel.class) {
+                if (taskMap == null) {
+                    taskMap = new HashMap<>();
+                    for (TaskModel taskModel : getTaskList()) {
+                        taskMap.put(taskModel.getTaskId(), taskModel);
+                    }
+                }
+            }
+        }
+        return taskMap;
+    }
+
 	public JobModel getJobModel() {
 		if (jobModel == null) {
 			jobModel = new JobModel();
@@ -376,4 +414,36 @@ public class ProcessContext {
     public ServerInfo getServerInfo() {
         return serverInfo;
     }
+
+    public String getCurrentExecutingTaskId() {
+        return currentExecutingTaskId;
+    }
+
+    public void setCurrentExecutingTaskId(String currentExecutingTaskId) {
+        this.currentExecutingTaskId = currentExecutingTaskId;
+    }
+
+    public boolean isPauseTaskExecution() {
+        return pauseTaskExecution;
+    }
+
+    public void setPauseTaskExecution(boolean pauseTaskExecution) {
+        this.pauseTaskExecution = pauseTaskExecution;
+    }
+
+    public boolean isComplete() {
+        return complete;
+    }
+
+    public void setComplete(boolean complete) {
+        this.complete = complete;
+    }
+
+    public boolean isRecovery() {
+        return recovery;
+    }
+
+    public void setRecovery(boolean recovery) {
+        this.recovery = recovery;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
index f1400e3..ae92ba1 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
@@ -20,12 +20,14 @@
  */
 package org.apache.airavata.gfac.core.context;
 
+import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskModel;
 import org.apache.airavata.model.task.TaskTypes;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +38,7 @@ public class TaskContext {
 	private ProcessContext parentProcessContext;
     private InputDataObjectType processInput;
     private OutputDataObjectType processOutput;
+    private Object subTaskModel = null;
 
 	public TaskModel getTaskModel() {
 		return taskModel;
@@ -107,4 +110,11 @@ public class TaskContext {
 	public String getExperimentId() {
 		return parentProcessContext.getExperimentId();
 	}
+
+    public Object getSubTaskModel() throws TException {
+        if (subTaskModel == null) {
+            subTaskModel = ThriftUtils.getSubTaskModel(getTaskModel());
+        }
+        return subTaskModel;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/10d593c9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 834399e..07a03c1 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -25,7 +25,6 @@ import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.gfac.core.GFacConstants;
 import org.apache.airavata.gfac.core.GFacEngine;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
@@ -38,11 +37,7 @@ import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.gfac.impl.task.SSHEnvironmentSetupTask;
 import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
-import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
-import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.*;
 import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
 import org.apache.airavata.model.application.io.DataType;
 import org.apache.airavata.model.application.io.InputDataObjectType;
@@ -56,9 +51,7 @@ import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.model.status.TaskStatus;
-import org.apache.airavata.model.task.DataStagingTaskModel;
-import org.apache.airavata.model.task.TaskModel;
-import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.model.task.*;
 import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.airavata.registry.cpi.ExpCatChildDataType;
@@ -69,44 +62,40 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
 
 public class GFacEngineImpl implements GFacEngine {
 
-	private static final Logger log = LoggerFactory.getLogger(GFacEngineImpl.class);
-
-	public GFacEngineImpl() throws GFacException {
-
-	}
-
-	@Override
-	public ProcessContext populateProcessContext(String processId, String gatewayId, String
-			tokenId) throws GFacException {
-		try {
-			ProcessContext processContext = new ProcessContext(processId, gatewayId, tokenId);
-			AppCatalog appCatalog = Factory.getDefaultAppCatalog();
-			processContext.setAppCatalog(appCatalog);
-			ExperimentCatalog expCatalog = Factory.getDefaultExpCatalog();
-			processContext.setExperimentCatalog(expCatalog);
-			processContext.setCuratorClient(Factory.getCuratorClient());
-			processContext.setStatusPublisher(Factory.getStatusPublisher());
-
-			ProcessModel processModel = (ProcessModel) expCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
-			processContext.setProcessModel(processModel);
-			GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId);
-			processContext.setGatewayResourceProfile(gatewayProfile);
-			processContext.setComputeResourcePreference(appCatalog.getGatewayProfile().getComputeResourcePreference
-					(gatewayId, processModel.getComputeResourceId()));
-			processContext.setComputeResourceDescription(appCatalog.getComputeResource().getComputeResource
-					(processContext.getComputeResourcePreference().getComputeResourceId()));
-			processContext.setApplicationDeploymentDescription(appCatalog.getApplicationDeployment()
-					.getApplicationDeployement(processModel.getApplicationDeploymentId()));
+    private static final Logger log = LoggerFactory.getLogger(GFacEngineImpl.class);
+
+    public GFacEngineImpl() throws GFacException {
+
+    }
+
+    @Override
+    public ProcessContext populateProcessContext(String processId, String gatewayId, String
+            tokenId) throws GFacException {
+        try {
+            ProcessContext processContext = new ProcessContext(processId, gatewayId, tokenId);
+            AppCatalog appCatalog = Factory.getDefaultAppCatalog();
+            processContext.setAppCatalog(appCatalog);
+            ExperimentCatalog expCatalog = Factory.getDefaultExpCatalog();
+            processContext.setExperimentCatalog(expCatalog);
+            processContext.setCuratorClient(Factory.getCuratorClient());
+            processContext.setStatusPublisher(Factory.getStatusPublisher());
+
+            ProcessModel processModel = (ProcessModel) expCatalog.get(ExperimentCatalogModelType.PROCESS, processId);
+            processContext.setProcessModel(processModel);
+            GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId);
+            processContext.setGatewayResourceProfile(gatewayProfile);
+            processContext.setComputeResourcePreference(appCatalog.getGatewayProfile().getComputeResourcePreference
+                    (gatewayId, processModel.getComputeResourceId()));
+            processContext.setComputeResourceDescription(appCatalog.getComputeResource().getComputeResource
+                    (processContext.getComputeResourcePreference().getComputeResourceId()));
+            processContext.setApplicationDeploymentDescription(appCatalog.getApplicationDeployment()
+                    .getApplicationDeployement(processModel.getApplicationDeploymentId()));
             ApplicationInterfaceDescription applicationInterface = appCatalog.getApplicationInterface()
                     .getApplicationInterface(processModel.getApplicationInterfaceId());
             processContext.setApplicationInterfaceDescription(applicationInterface);
@@ -115,22 +104,22 @@ public class GFacEngineImpl implements GFacEngine {
             ServerInfo serverInfo = new ServerInfo(processContext.getComputeResourcePreference().getLoginUserName(), hostName);
             processContext.setServerInfo(serverInfo);
             List<OutputDataObjectType> applicationOutputs = applicationInterface.getApplicationOutputs();
-            if (applicationOutputs != null && !applicationOutputs.isEmpty()){
-                for (OutputDataObjectType outputDataObjectType : applicationOutputs){
-                    if (outputDataObjectType.getType().equals(DataType.STDOUT)){
-                        if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")){
-                            outputDataObjectType.setValue(applicationInterface.getApplicationName()+ ".stdout");
-                            processContext.setStdoutLocation(applicationInterface.getApplicationName()+ ".stdout");
-                        }else {
+            if (applicationOutputs != null && !applicationOutputs.isEmpty()) {
+                for (OutputDataObjectType outputDataObjectType : applicationOutputs) {
+                    if (outputDataObjectType.getType().equals(DataType.STDOUT)) {
+                        if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")) {
+                            outputDataObjectType.setValue(applicationInterface.getApplicationName() + ".stdout");
+                            processContext.setStdoutLocation(applicationInterface.getApplicationName() + ".stdout");
+                        } else {
                             processContext.setStdoutLocation(outputDataObjectType.getValue());
                         }
                     }
-                    if (outputDataObjectType.getType().equals(DataType.STDERR)){
-                        if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")){
+                    if (outputDataObjectType.getType().equals(DataType.STDERR)) {
+                        if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")) {
                             String stderrLocation = applicationInterface.getApplicationName() + ".stderr";
                             outputDataObjectType.setValue(stderrLocation);
                             processContext.setStderrLocation(stderrLocation);
-                        }else {
+                        } else {
                             processContext.setStderrLocation(outputDataObjectType.getValue());
                         }
                     }
@@ -139,479 +128,400 @@ public class GFacEngineImpl implements GFacEngine {
             expCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processId);
             processModel.setProcessOutputs(applicationOutputs);
             processContext.setResourceJobManager(getResourceJobManager(processContext));
-			processContext.setRemoteCluster(Factory.getRemoteCluster(processContext));
-
-			String inputPath = ServerSettings.getLocalDataLocation();
-			if (inputPath != null) {
-				processContext.setLocalWorkingDir((inputPath.endsWith("/") ? inputPath : inputPath + "/") +
-						processContext.getProcessId());
-			}
-
-			List<Object> jobModels = expCatalog.get(ExperimentCatalogModelType.JOB, "processId", processId);
-			if (jobModels != null && !jobModels.isEmpty()) {
-				if (jobModels.size() > 1) {
-					log.warn("Process has more than one job model, take first one");
-				}
-				processContext.setJobModel(((JobModel) jobModels.get(0)));
-			}
-			return processContext;
-		} catch (AppCatalogException e) {
-			throw new GFacException("App catalog access exception ", e);
-		} catch (RegistryException e) {
-			throw new GFacException("Registry access exception", e);
-		} catch (AiravataException e) {
-			throw new GFacException("Remote cluster initialization error", e);
-		}
-	}
-
-	@Override
-	public void executeProcess(ProcessContext processContext) throws GFacException {
-		if (processContext.isInterrupted()) {
-			GFacUtils.handleProcessInterrupt(processContext);
-			return;
-		}
-//		List<TaskContext> taskChain = new ArrayList<>();
-		if (configureWorkspace(processContext, false)) return;
-
-		// exit if process is handed over to another instance while input staging
-		if (inputDataStaging(processContext, false)) return;
-
-		// exit if process is handed orver to another instance while job submission.
-		if (executeJobSubmission(processContext)) return;
-//		processContext.setTaskChain(taskChain);
-		if (processContext.isInterrupted()) {
-			GFacUtils.handleProcessInterrupt(processContext);
-			return;
-		}
-	}
-
-	private boolean executeJobSubmission(ProcessContext processContext) throws GFacException {
-		if (processContext.isInterrupted()) {
-			GFacUtils.handleProcessInterrupt(processContext);
-			return true;
-		}
-		TaskContext taskCtx;
-		TaskStatus taskStatus;
-        ProcessStatus status = new ProcessStatus(ProcessState.EXECUTING);
-        status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-        processContext.setProcessStatus(status);
-		JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol());
-		if (processContext.isInterrupted()) {
-			GFacUtils.handleProcessInterrupt(processContext);
-			return true;
-		}
-		GFacUtils.saveAndPublishProcessStatus(processContext);
-		taskCtx = getJobSubmissionTaskContext(processContext);
-		saveTaskModel(taskCtx);
-		GFacUtils.saveAndPublishTaskStatus(taskCtx);
-		taskStatus = executeTask(taskCtx, jobSubmissionTask, false);
-		if (taskStatus.getState() == TaskState.FAILED) {
-            log.error("expId: {}, processId: {}, taskId: {} type: {},:- Job submission task failed, " +
-                    "reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx
-                    .getParentProcessContext().getProcessId(), taskCtx.getTaskId(), jobSubmissionTask.getType
-                    ().name(), taskStatus.getReason());
-            String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Job submission task failed, " +
-                    "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() + taskCtx.getTaskId() + jobSubmissionTask.getType().name() + taskStatus.getReason();
-            ErrorModel errorModel = new ErrorModel();
-            errorModel.setUserFriendlyMessage("Job submission task failed");
-            errorModel.setActualErrorMessage(errorMsg);
-            GFacUtils.saveTaskError(taskCtx, errorModel);
-            ProcessStatus processStatus = processContext.getProcessStatus();
-            processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-            processStatus.setReason(errorMsg);
-            processStatus.setState(ProcessState.FAILED);
-            processContext.setProcessStatus(processStatus);
-            GFacUtils.saveAndPublishProcessStatus(processContext);
-			throw new GFacException("Job submission task failed");
-		}
-		if (processContext.isInterrupted()) {
-			GFacUtils.handleProcessInterrupt(processContext);
-			return true;
-		}
-		return false;
-	}
-
-	private boolean inputDataStaging(ProcessContext processContext, boolean recover) throws GFacException {
-		if (processContext.isInterrupted()) {
-			GFacUtils.handleProcessInterrupt(processContext);
-			return true;
-		}
-		TaskContext taskCtx;
-		TaskStatus taskStatus;// execute process inputs
-        ProcessStatus status = new ProcessStatus(ProcessState.INPUT_DATA_STAGING);
-        status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-        processContext.setProcessStatus(status);
-		GFacUtils.saveAndPublishProcessStatus(processContext);
-		List<InputDataObjectType> processInputs = processContext.getProcessModel().getProcessInputs();
-		sortByInputOrder(processInputs);
-		if (processInputs != null) {
-			for (InputDataObjectType processInput : processInputs) {
-				if (processContext.isInterrupted()) {
-					GFacUtils.handleProcessInterrupt(processContext);
-					return true;
-				}
-				DataType type = processInput.getType();
-				switch (type) {
-					case STDERR:
-						break;
-					case STDOUT:
-						break;
-					case URI:
-						try {
-							taskCtx = getDataStagingTaskContext(processContext, processInput);
-						} catch (TException | TaskException e) {
-							throw new GFacException("Error while serializing data staging sub task model");
-						}
-						saveTaskModel(taskCtx);
-						GFacUtils.saveAndPublishTaskStatus(taskCtx);
-						Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
-						taskStatus = executeTask(taskCtx, dMoveTask, false);
-						if (taskStatus.getState() == TaskState.FAILED) {
-							log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " +
-                                    "reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx
-                                    .getParentProcessContext().getProcessId(), taskCtx.getTaskId(), dMoveTask.getType
-                                    ().name(), taskStatus.getReason());
-                            String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Input staging failed, " +
-                                    "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() + taskCtx.getTaskId() + dMoveTask.getType().name() + taskStatus.getReason();
-                            ErrorModel errorModel = new ErrorModel();
-                            errorModel.setUserFriendlyMessage("Error while staging input data");
-                            errorModel.setActualErrorMessage(errorMsg);
-                            GFacUtils.saveTaskError(taskCtx, errorModel);
-                            ProcessStatus processStatus = processContext.getProcessStatus();
-                            processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-                            processStatus.setReason(errorMsg);
-                            processStatus.setState(ProcessState.FAILED);
-                            processContext.setProcessStatus(processStatus);
+            processContext.setRemoteCluster(Factory.getRemoteCluster(processContext));
+
+            String inputPath = ServerSettings.getLocalDataLocation();
+            if (inputPath != null) {
+                processContext.setLocalWorkingDir((inputPath.endsWith("/") ? inputPath : inputPath + "/") +
+                        processContext.getProcessId());
+            }
+
+            List<Object> jobModels = expCatalog.get(ExperimentCatalogModelType.JOB, "processId", processId);
+            if (jobModels != null && !jobModels.isEmpty()) {
+                if (jobModels.size() > 1) {
+                    log.warn("Process has more than one job model, take first one");
+                }
+                processContext.setJobModel(((JobModel) jobModels.get(0)));
+            }
+            return processContext;
+        } catch (AppCatalogException e) {
+            throw new GFacException("App catalog access exception ", e);
+        } catch (RegistryException e) {
+            throw new GFacException("Registry access exception", e);
+        } catch (AiravataException e) {
+            throw new GFacException("Remote cluster initialization error", e);
+        }
+    }
+
+    @Override
+    public void executeProcess(ProcessContext processContext) throws GFacException {
+        if (processContext.isInterrupted()) {
+            GFacUtils.handleProcessInterrupt(processContext);
+            return;
+        }
+        String taskDag = processContext.getTaskDag();
+        List<String> taskIds = GFacUtils.parseTaskDag(taskDag);
+        processContext.setTaskExecutionOrder(taskIds);
+        executeTaskListFrom(processContext, taskIds.get(0));
+    }
+
+    private void executeTaskListFrom(ProcessContext processContext, String startingTaskId) throws GFacException {
+        // checkpoint
+        if (processContext.isInterrupted()) {
+            GFacUtils.handleProcessInterrupt(processContext);
+            return;
+        }
+        List<TaskModel> taskList = processContext.getTaskList();
+        Map<String, TaskModel> taskMap = processContext.getTaskMap();
+        boolean fastForward = true;
+        for (String taskId : processContext.getTaskExecutionOrder()) {
+            if (fastForward) {
+                if (taskId.equalsIgnoreCase(startingTaskId)) {
+                    fastForward = false;
+                } else {
+                    continue;
+                }
+            }
+
+            TaskModel taskModel = taskMap.get(taskId);
+            TaskTypes taskType = taskModel.getTaskType();
+            TaskContext taskContext = getTaskContext(processContext);
+            taskContext.setTaskModel(taskModel);
+            ProcessStatus status = null;
+            processContext.setCurrentExecutingTaskId(taskId);
+            switch (taskType) {
+                case ENV_SETUP:
+                    status = new ProcessStatus(ProcessState.CONFIGURING_WORKSPACE);
+                    status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                    processContext.setProcessStatus(status);
+                    GFacUtils.saveAndPublishProcessStatus(processContext);
+                    // checkpoint
+                    if (processContext.isInterrupted()) {
+                        GFacUtils.handleProcessInterrupt(processContext);
+                        return;
+                    }
+                    configureWorkspace(taskContext, processContext.isRecovery());
+                    // checkpoint
+                    if (processContext.isInterrupted()) {
+                        GFacUtils.handleProcessInterrupt(processContext);
+                        return;
+                    }
+                    break;
+                case DATA_STAGING:
+                    try {
+                        // checkpoint
+                        if (processContext.isInterrupted()) {
+                            GFacUtils.handleProcessInterrupt(processContext);
+                            return;
+                        }
+                        DataStagingTaskModel subTaskModel = (DataStagingTaskModel) taskContext.getSubTaskModel();
+                        if (subTaskModel.getType() == DataStageType.INPUT) {
+                            status = new ProcessStatus(ProcessState.INPUT_DATA_STAGING);
+                            status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                            processContext.setProcessStatus(status);
                             GFacUtils.saveAndPublishProcessStatus(processContext);
-							throw new GFacException("Error while staging input data");
-						}
-						break;
-					default:
-						// nothing to do
-						break;
-				}
-			}
-		}
-		if (processContext.isInterrupted()) {
-			GFacUtils.handleProcessInterrupt(processContext);
-			return true;
-		}
-		return false;
-	}
-
-	private boolean configureWorkspace(ProcessContext processContext, boolean recover) throws GFacException {
-		if (processContext.isInterrupted()) {
-			GFacUtils.handleProcessInterrupt(processContext);
-			return true;
-		}
-		TaskContext taskCtx;
-        ProcessStatus status = new ProcessStatus(ProcessState.CONFIGURING_WORKSPACE);
-        status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-        processContext.setProcessStatus(status);
-		GFacUtils.saveAndPublishProcessStatus(processContext);
-		// Run all environment setup tasks
-		taskCtx = getEnvSetupTaskContext(processContext);
-		saveTaskModel(taskCtx);
-		SSHEnvironmentSetupTask envSetupTask = new SSHEnvironmentSetupTask();
-        TaskStatus taskStatus = executeTask(taskCtx, envSetupTask, recover);
-        GFacUtils.saveAndPublishTaskStatus(taskCtx);
+                            inputDataStaging(taskContext, processContext.isRecovery());
+                        } else {
+                            status = new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING);
+                            status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                            processContext.setProcessStatus(status);
+                            GFacUtils.saveAndPublishProcessStatus(processContext);
+                            outputDataStaging(taskContext, processContext.isRecovery());
+                        }
+                        // checkpoint
+                        if (processContext.isInterrupted()) {
+                            GFacUtils.handleProcessInterrupt(processContext);
+                            return;
+                        }
+                    } catch (TException e) {
+                        throw new GFacException(e);
+                    }
+                    break;
+
+                case JOB_SUBMISSION:
+                    // checkpoint
+                    if (processContext.isInterrupted()) {
+                        GFacUtils.handleProcessInterrupt(processContext);
+                        return;
+                    }
+                    status = new ProcessStatus(ProcessState.EXECUTING);
+                    status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                    processContext.setProcessStatus(status);
+                    GFacUtils.saveAndPublishProcessStatus(processContext);
+                    executeJobSubmission(taskContext, processContext.isRecovery());
+                    // checkpoint
+                    if (processContext.isInterrupted()) {
+                        GFacUtils.handleProcessInterrupt(processContext);
+                        return;
+                    }
+                    break;
+
+                case MONITORING:
+                    JobMonitor monitorService = null;
+                    try {
+                        MonitorTaskModel monitorTaskModel = ((MonitorTaskModel) taskContext.getSubTaskModel());
+                        status = new ProcessStatus(ProcessState.MONITORING);
+                        status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                        processContext.setProcessStatus(status);
+                        GFacUtils.saveAndPublishProcessStatus(processContext);
+                        monitorService = Factory.getMonitorService(monitorTaskModel.getMonitorMode());
+                        monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
+                    } catch (AiravataException | TException e) {
+                        throw new GFacException(e);
+                    }
+                    break;
+
+                case ENV_CLEANUP:
+                    // TODO implement environment clean up task logic
+                    break;
+
+                default:
+                    throw new GFacException("Unsupported Task type");
+
+            }
+
+            if (processContext.isPauseTaskExecution()) {
+                return;   // If any task put processContext to wait, the same task should continue processContext execution.
+            }
+
+        }
+        processContext.setComplete(true);
+    }
+
+    private boolean executeJobSubmission(TaskContext taskContext, boolean recovery) throws GFacException {
+        TaskStatus taskStatus;
+        try {
+            JobSubmissionTaskModel jobSubmissionTaskModel = ((JobSubmissionTaskModel) taskContext.getSubTaskModel());
+            JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(jobSubmissionTaskModel.getJobSubmissionProtocol());
+
+            ProcessContext processContext = taskContext.getParentProcessContext();
+            taskStatus = executeTask(taskContext, jobSubmissionTask, recovery);
+            if (taskStatus.getState() == TaskState.FAILED) {
+                log.error("expId: {}, processId: {}, taskId: {} type: {},:- Job submission task failed, " +
+                        "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
+                        .getParentProcessContext().getProcessId(), taskContext.getTaskId(), jobSubmissionTask.getType
+                        ().name(), taskStatus.getReason());
+                String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Job submission task failed, " +
+                        "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() +
+                        taskContext.getTaskId() + jobSubmissionTask.getType().name() + taskStatus.getReason();
+                ErrorModel errorModel = new ErrorModel();
+                errorModel.setUserFriendlyMessage("Job submission task failed");
+                errorModel.setActualErrorMessage(errorMsg);
+                GFacUtils.saveTaskError(taskContext, errorModel);
+                throw new GFacException("Job submission task failed");
+            }
+            return false;
+        } catch (TException e) {
+            throw new GFacException(e);
+        }
+    }
+
+    private boolean configureWorkspace(TaskContext taskContext, boolean recover) throws GFacException {
+
+        try {
+            EnvironmentSetupTaskModel subTaskModel = (EnvironmentSetupTaskModel) taskContext.getSubTaskModel();
+            Task envSetupTask = null;
+            if (subTaskModel.getProtocol() == SecurityProtocol.SSH_KEYS) {
+                envSetupTask = new SSHEnvironmentSetupTask();
+            } else {
+                throw new GFacException("Unsupported security protocol, Airavata doesn't support " +
+                        subTaskModel.getProtocol().name() + " protocol yet.");
+            }
+            TaskStatus status = new TaskStatus(TaskState.EXECUTING);
+            status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            taskContext.setTaskStatus(status);
+            GFacUtils.saveAndPublishTaskStatus(taskContext);
+            TaskStatus taskStatus = executeTask(taskContext, envSetupTask, recover);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            taskContext.setTaskStatus(taskStatus);
+            GFacUtils.saveAndPublishTaskStatus(taskContext);
+
+            if (taskStatus.getState() == TaskState.FAILED) {
+                log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input staging failed, " +
+                        "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
+                        .getParentProcessContext().getProcessId(), taskContext.getTaskId(), envSetupTask.getType
+                        ().name(), taskStatus.getReason());
+                String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Input staging failed, " +
+                        "reason:" + " {}" + taskContext.getExperimentId() + taskContext.getProcessId() + taskContext.getTaskId() + envSetupTask.getType().name() + taskStatus.getReason();
+                ErrorModel errorModel = new ErrorModel();
+                errorModel.setUserFriendlyMessage("Error while environment setup");
+                errorModel.setActualErrorMessage(errorMsg);
+                GFacUtils.saveTaskError(taskContext, errorModel);
+                throw new GFacException("Error while environment setup");
+            }
+        } catch (TException e) {
+            throw new GFacException("Couldn't get environment setup task model", e);
+        }
+        return false;
+    }
+
+    private boolean inputDataStaging(TaskContext taskContext, boolean recover) throws GFacException {
+        TaskStatus taskStatus;// execute process inputs
+        ProcessContext processContext = taskContext.getParentProcessContext();
+        Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
+        taskStatus = executeTask(taskContext, dMoveTask, false);
         if (taskStatus.getState() == TaskState.FAILED) {
-			log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input staging failed, " +
-					"reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx
-					.getParentProcessContext().getProcessId(), taskCtx.getTaskId(), envSetupTask.getType
-					().name(), taskStatus.getReason());
+            log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " +
+                    "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
+                    .getParentProcessContext().getProcessId(), taskContext.getTaskId(), dMoveTask.getType
+                    ().name(), taskStatus.getReason());
             String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Input staging failed, " +
-                    "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() + taskCtx.getTaskId() + envSetupTask.getType().name() + taskStatus.getReason();
+                    "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() +
+                    taskContext.getTaskId() + dMoveTask.getType().name() + taskStatus.getReason();
             ErrorModel errorModel = new ErrorModel();
-            errorModel.setUserFriendlyMessage("Error while environment setup");
+            errorModel.setUserFriendlyMessage("Error while staging input data");
             errorModel.setActualErrorMessage(errorMsg);
-            GFacUtils.saveTaskError(taskCtx, errorModel);
-            ProcessStatus processStatus = processContext.getProcessStatus();
-            processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-            processStatus.setReason(errorMsg);
-            processContext.setProcessStatus(processStatus);
-            GFacUtils.saveAndPublishProcessStatus(processContext);
-			throw new GFacException("Error while environment setup");
-		}
-		if (processContext.isInterrupted()) {
-			GFacUtils.handleProcessInterrupt(processContext);
-			return true;
-		}
-		return false;
-	}
-
-
-	@Override
-	public void recoverProcess(ProcessContext processContext) throws GFacException {
-		ProcessState state = processContext.getProcessStatus().getState();
-		switch (state) {
-			case CREATED:
-			case VALIDATED:
-			case STARTED:
-				executeProcess(processContext);
-				break;
-			case PRE_PROCESSING:
-			case CONFIGURING_WORKSPACE:
-				if (configureWorkspace(processContext, true)) return;
-				if (inputDataStaging(processContext, false))  return;
-				if (executeJobSubmission(processContext))  return;
-				break;
-			case INPUT_DATA_STAGING:
-				if (inputDataStaging(processContext, true))  return;
-				if (executeJobSubmission(processContext))  return;
-				break;
-			case EXECUTING:
-				JobModel jobModel = getJobModel(processContext);
-				if (jobModel == null || jobModel.getJobId() == null) {
-					if (executeJobSubmission(processContext)) return;
-				} else {
-					processContext.setJobModel(jobModel);
-					return;
-				}
-				break;
-			default:
-				throw new GFacException("Invalid process recovery invocation");
-		}
-	}
-
-	private JobModel getJobModel(ProcessContext processContext) {
-		try {
-			return GFacUtils.getJobModel(processContext);
-		} catch (RegistryException e) {
-			log.error("Error while retrieving jobId,", e);
-			return null;
-		}
-	}
-
-	@Override
-	public void runProcessOutflow(ProcessContext processContext) throws GFacException {
-		if (processContext.isInterrupted()) {
-			GFacUtils.handleProcessInterrupt(processContext);
-			return;
-		}
-		// exit if process is handed over to another instance while output staging.
-		if (outputDataStaging(processContext, false)) return;
-
-		if (processContext.isInterrupted()) {
-			GFacUtils.handleProcessInterrupt(processContext);
-			return;
-		}
-
-		postProcessing(processContext,false);
-
-		if (processContext.isInterrupted()) {
-			GFacUtils.handleProcessInterrupt(processContext);
-		}
-	}
-
-	/**
-	 *
-	 * @param processContext
-	 * @param recovery
-	 * @return <code>true</code> if you need to interrupt processing <code>false</code> otherwise.
-	 * @throws GFacException
-	 */
-	private boolean postProcessing(ProcessContext processContext, boolean recovery) throws GFacException {
+            GFacUtils.saveTaskError(taskContext, errorModel);
+            throw new GFacException("Error while staging input data");
+        }
+        return false;
+    }
+
+
+    @Override
+    public void recoverProcess(ProcessContext processContext, String recoverTaskId) throws GFacException {
+        processContext.setRecovery(true);
+        continueProcess(processContext, recoverTaskId);
+    }
+
+    private JobModel getJobModel(ProcessContext processContext) {
+        try {
+            return GFacUtils.getJobModel(processContext);
+        } catch (RegistryException e) {
+            log.error("Error while retrieving jobId,", e);
+            return null;
+        }
+    }
+
+    @Override
+    public void continueProcess(ProcessContext processContext, String taskId) throws GFacException {
+        if (processContext.isInterrupted()) {
+            GFacUtils.handleProcessInterrupt(processContext);
+            return;
+        }
+        executeTaskListFrom(processContext, taskId);
+    }
+
+    /**
+     * @param processContext
+     * @param recovery
+     * @return <code>true</code> if you need to interrupt processing <code>false</code> otherwise.
+     * @throws GFacException
+     */
+    private boolean postProcessing(ProcessContext processContext, boolean recovery) throws GFacException {
         ProcessStatus status = new ProcessStatus(ProcessState.POST_PROCESSING);
         status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
         processContext.setProcessStatus(status);
-		GFacUtils.saveAndPublishProcessStatus(processContext);
+        GFacUtils.saveAndPublishProcessStatus(processContext);
 //		taskCtx = getEnvCleanupTaskContext(processContext);
-		if (processContext.isInterrupted()) {
-			GFacUtils.handleProcessInterrupt(processContext);
-			return true;
-		}
-		return false;
-	}
-
-	/**
-	 *
-	 * @param processContext
-	 * @param recovery
-	 * @return <code>true</code> if process execution interrupted , <code>false</code> otherwise.
-	 * @throws GFacException
-	 */
-	private boolean outputDataStaging(ProcessContext processContext, boolean recovery) throws GFacException {
-		TaskContext taskCtx;
-        ProcessStatus status = new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING);
-        status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-        processContext.setProcessStatus(status);
-		GFacUtils.saveAndPublishProcessStatus(processContext);
-        File localWorkingdir = new File(processContext.getLocalWorkingDir());
-        localWorkingdir.mkdirs(); // make local dir if not exist
-        List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
-		for (OutputDataObjectType processOutput : processOutputs) {
-			if (processContext.isInterrupted()) {
-				GFacUtils.handleProcessInterrupt(processContext);
-				return true;
-			}
-			DataType type = processOutput.getType();
-			switch (type) {
-				case URI: case STDERR: case STDOUT:
-					try {
-						taskCtx = getDataStagingTaskContext(processContext, processOutput);
-					} catch (TException | TaskException e) {
-						throw new GFacException("Thrift model to byte[] conversion issue", e);
-					}
-					saveTaskModel(taskCtx);
-					GFacUtils.saveAndPublishTaskStatus(taskCtx);
-					Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
-					TaskStatus taskStatus = executeTask(taskCtx, dMoveTask, recovery);
-					if (taskStatus.getState() == TaskState.FAILED) {
-						log.error("expId: {}, processId: {}, taskId: {} type: {},:- output staging failed, " +
-								"reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx
-								.getParentProcessContext().getProcessId(), taskCtx.getTaskId(), dMoveTask.getType
-								().name(), taskStatus.getReason());
-
-                        String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- output staging failed, " +
-                                "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() + taskCtx.getTaskId() + dMoveTask.getType().name() + taskStatus.getReason();
-                        ErrorModel errorModel = new ErrorModel();
-                        errorModel.setUserFriendlyMessage("Error while staging output data");
-                        errorModel.setActualErrorMessage(errorMsg);
-                        GFacUtils.saveTaskError(taskCtx, errorModel);
-                        ProcessStatus processStatus = processContext.getProcessStatus();
-                        processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-                        processStatus.setReason(errorMsg);
-                        processStatus.setState(ProcessState.FAILED);
-                        processContext.setProcessStatus(processStatus);
-                        GFacUtils.saveAndPublishProcessStatus(processContext);
-						throw new GFacException("Error while staging output data");
-					}
-					break;
-				default:
-					// nothing to do
-					break;
-			}
-		}
-		return false;
-	}
-
-	@Override
-	public void recoverProcessOutflow(ProcessContext processContext) throws GFacException {
-		ProcessState processState = processContext.getProcessStatus().getState();
-		switch (processState) {
-			case OUTPUT_DATA_STAGING:
-				if (outputDataStaging(processContext, true)) return;
-				if (postProcessing(processContext, false)) return;
-			case POST_PROCESSING:
-				postProcessing(processContext, true);
-				break;
-		}
-		runProcessOutflow(processContext); // TODO implement recover steps
-	}
-
-	@Override
-	public void cancelProcess(ProcessContext processContext) throws GFacException {
-		if (processContext.getProcessState() == ProcessState.MONITORING) {
-			// get job submission task and invoke cancel
-			JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol());
-			TaskContext taskCtx = getJobSubmissionTaskContext(processContext);
-			executeCancel(taskCtx, jobSubmissionTask);
-		}
-	}
-
-	private TaskStatus executeTask(TaskContext taskCtx, Task task, boolean recover) throws GFacException {
-        TaskStatus status = new TaskStatus(TaskState.EXECUTING);
-        status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-        taskCtx.setTaskStatus(status);
-		GFacUtils.saveAndPublishTaskStatus(taskCtx);
-		TaskStatus taskStatus = null;
-		if (recover) {
-			taskStatus = task.recover(taskCtx);
-		} else {
-			taskStatus = task.execute(taskCtx);
-		}
-        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-		taskCtx.setTaskStatus(taskStatus);
-		GFacUtils.saveAndPublishTaskStatus(taskCtx);
-		return taskCtx.getTaskStatus();
-	}
-
-	private void executeCancel(TaskContext taskContext, JobSubmissionTask jSTask) throws GFacException {
-		try {
-			JobStatus oldJobStatus = jSTask.cancel(taskContext);
-
-			if (oldJobStatus != null && oldJobStatus.getJobState() == JobState.QUEUED) {
-				JobMonitor monitorService = Factory.getMonitorService(taskContext.getParentProcessContext().getMonitorMode());
-				monitorService.stopMonitor(taskContext.getParentProcessContext().getJobModel().getJobId(), true);
-				JobStatus newJobStatus = new JobStatus(JobState.CANCELED);
-				newJobStatus.setReason("Job cancelled");
+        if (processContext.isInterrupted()) {
+            GFacUtils.handleProcessInterrupt(processContext);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * @param taskContext
+     * @param recovery
+     * @return <code>true</code> if process execution interrupted , <code>false</code> otherwise.
+     * @throws GFacException
+     */
+    private boolean outputDataStaging(TaskContext taskContext, boolean recovery) throws GFacException {
+        ProcessContext processContext = taskContext.getParentProcessContext();
+        Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
+        TaskStatus taskStatus = executeTask(taskContext, dMoveTask, recovery);
+        if (taskStatus.getState() == TaskState.FAILED) {
+            log.error("expId: {}, processId: {}, taskId: {} type: {},:- output staging failed, " +
+                    "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
+                    .getParentProcessContext().getProcessId(), taskContext.getTaskId(), dMoveTask.getType
+                    ().name(), taskStatus.getReason());
+
+            String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- output staging failed, " +
+                    "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() +
+                    taskContext.getTaskId() + dMoveTask.getType().name() + taskStatus.getReason();
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setUserFriendlyMessage("Error while staging output data");
+            errorModel.setActualErrorMessage(errorMsg);
+            GFacUtils.saveTaskError(taskContext, errorModel);
+        }
+        return false;
+    }
+
+    @Override
+    public void cancelProcess(ProcessContext processContext) throws GFacException {
+        if (processContext.getProcessState() == ProcessState.MONITORING) {
+            // get job submission task and invoke cancel
+            JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol());
+            TaskContext taskCtx = getJobSubmissionTaskContext(processContext);
+            executeCancel(taskCtx, jobSubmissionTask);
+        }
+    }
+
+    private TaskStatus executeTask(TaskContext taskCtx, Task task, boolean recover) throws GFacException {
+        TaskStatus taskStatus = null;
+        if (recover) {
+            taskStatus = task.recover(taskCtx);
+        } else {
+            taskStatus = task.execute(taskCtx);
+        }
+        return taskStatus;
+    }
+
+    private void executeCancel(TaskContext taskContext, JobSubmissionTask jSTask) throws GFacException {
+        try {
+            JobStatus oldJobStatus = jSTask.cancel(taskContext);
+
+            if (oldJobStatus != null && oldJobStatus.getJobState() == JobState.QUEUED) {
+                JobMonitor monitorService = Factory.getMonitorService(taskContext.getParentProcessContext().getMonitorMode());
+                monitorService.stopMonitor(taskContext.getParentProcessContext().getJobModel().getJobId(), true);
+                JobStatus newJobStatus = new JobStatus(JobState.CANCELED);
+                newJobStatus.setReason("Job cancelled");
                 newJobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-				taskContext.getParentProcessContext().getJobModel().setJobStatus(newJobStatus);
-				GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), taskContext.getParentProcessContext()
-						.getJobModel());
-			}
-		} catch (TaskException e) {
-			throw new GFacException("Error while cancelling job");
-		} catch (AiravataException e) {
-			throw new GFacException("Error wile getting monitoring service");
-		}
-	}
-
-	private TaskContext getJobSubmissionTaskContext(ProcessContext processContext) throws GFacException {
-		TaskContext taskCtx = new TaskContext();
-		taskCtx.setParentProcessContext(processContext);
-
-		TaskModel taskModel = new TaskModel();
-		taskModel.setParentProcessId(processContext.getProcessId());
-		taskModel.setCreationTime(new Date().getTime());
-		taskModel.setLastUpdateTime(taskModel.getCreationTime());
-        TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
-        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-        taskModel.setTaskStatus(taskStatus);
-		taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
-		taskCtx.setTaskModel(taskModel);
-		return taskCtx;
-	}
+                taskContext.getParentProcessContext().getJobModel().setJobStatus(newJobStatus);
+                GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), taskContext.getParentProcessContext()
+                        .getJobModel());
+            }
+        } catch (TaskException e) {
+            throw new GFacException("Error while cancelling job");
+        } catch (AiravataException e) {
+            throw new GFacException("Error wile getting monitoring service");
+        }
+    }
 
-	private TaskContext getDataStagingTaskContext(ProcessContext processContext, InputDataObjectType processInput)
-            throws TException, TaskException {
-		TaskContext taskCtx = new TaskContext();
-		taskCtx.setParentProcessContext(processContext);
-		// create new task model for this task
-		TaskModel taskModel = new TaskModel();
-		taskModel.setParentProcessId(processContext.getProcessId());
-		taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
-		taskModel.setLastUpdateTime(taskModel.getCreationTime());
+    private TaskContext getJobSubmissionTaskContext(ProcessContext processContext) throws GFacException {
+        TaskContext taskCtx = new TaskContext();
+        taskCtx.setParentProcessContext(processContext);
+
+        TaskModel taskModel = new TaskModel();
+        taskModel.setParentProcessId(processContext.getProcessId());
+        taskModel.setCreationTime(new Date().getTime());
+        taskModel.setLastUpdateTime(taskModel.getCreationTime());
         TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
         taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
         taskModel.setTaskStatus(taskStatus);
-		taskModel.setTaskType(TaskTypes.DATA_STAGING);
-		// create data staging sub task model
-		DataStagingTaskModel submodel = new DataStagingTaskModel();
-		submodel.setSource(processInput.getValue());
-        ServerInfo serverInfo = processContext.getServerInfo();
-        URI destination = null;
-        try {
-            destination = new URI(processContext.getDataMovementProtocol().name(), serverInfo.getUserName(),
-                    serverInfo.getHost(), serverInfo.getPort(), processContext.getWorkingDir(), null, null);
-        } catch (URISyntaxException e) {
-            throw new TaskException("Error while constructing destination file URI");
-        }
-        submodel.setDestination(destination.toString());
-		taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
-		taskCtx.setTaskModel(taskModel);
-        taskCtx.setProcessInput(processInput);
-		return taskCtx;
-	}
-
-	private TaskContext getDataStagingTaskContext(ProcessContext processContext, OutputDataObjectType processOutput)
+        taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
+        taskCtx.setTaskModel(taskModel);
+        return taskCtx;
+    }
+
+    private TaskContext getDataStagingTaskContext(ProcessContext processContext, OutputDataObjectType processOutput)
             throws TException, TaskException {
-		TaskContext taskCtx = new TaskContext();
-		taskCtx.setParentProcessContext(processContext);
-		// create new task model for this task
-		TaskModel taskModel = new TaskModel();
-		taskModel.setParentProcessId(processContext.getProcessId());
-		taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
-		taskModel.setLastUpdateTime(taskModel.getCreationTime());
+        TaskContext taskCtx = new TaskContext();
+        taskCtx.setParentProcessContext(processContext);
+        // create new task model for this task
+        TaskModel taskModel = new TaskModel();
+        taskModel.setParentProcessId(processContext.getProcessId());
+        taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+        taskModel.setLastUpdateTime(taskModel.getCreationTime());
         TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
         taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
         taskModel.setTaskStatus(taskStatus);
-		taskModel.setTaskType(TaskTypes.DATA_STAGING);
-		// create data staging sub task model
-		String remoteOutputDir = processContext.getOutputDir();
-		remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
-		DataStagingTaskModel submodel = new DataStagingTaskModel();
+        taskModel.setTaskType(TaskTypes.DATA_STAGING);
+        // create data staging sub task model
+        String remoteOutputDir = processContext.getOutputDir();
+        remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/";
+        DataStagingTaskModel submodel = new DataStagingTaskModel();
         ServerInfo serverInfo = processContext.getServerInfo();
         URI source = null;
         try {
@@ -623,89 +533,82 @@ public class GFacEngineImpl implements GFacEngine {
         submodel.setSource(source.toString());
         // We don't know destination location at this time, data staging task will set this.
         // because destination is required field we set dummy destination
-		submodel.setDestination("dummy://temp/file/location");
-		taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
-		taskCtx.setTaskModel(taskModel);
+        submodel.setDestination("dummy://temp/file/location");
+        taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+        taskCtx.setTaskModel(taskModel);
         taskCtx.setProcessOutput(processOutput);
-		return taskCtx;
-	}
-
-	/**
-	 * Persist task model
-	 */
-	private void saveTaskModel(TaskContext taskContext) throws GFacException {
-		try {
-			TaskModel taskModel = taskContext.getTaskModel();
-			taskContext.getParentProcessContext().getExperimentCatalog().add(ExpCatChildDataType.TASK, taskModel,
-					taskModel.getParentProcessId());
-		} catch (RegistryException e) {
-			throw new GFacException("Error while saving task model", e);
-		}
-	}
-
-	private TaskContext getEnvSetupTaskContext(ProcessContext processContext) {
-		TaskContext taskCtx = new TaskContext();
-		taskCtx.setParentProcessContext(processContext);
-		TaskModel taskModel = new TaskModel();
-		taskModel.setParentProcessId(processContext.getProcessId());
-		taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
-		taskModel.setLastUpdateTime(taskModel.getCreationTime());
-		taskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
-		taskModel.setTaskType(TaskTypes.ENV_SETUP);
-		taskCtx.setTaskModel(taskModel);
-		return taskCtx;
-	}
-
-
-	/**
-	 * Sort input data type by input order.
-	 */
-	private void sortByInputOrder(List<InputDataObjectType> processInputs) {
-		Collections.sort(processInputs, new Comparator<InputDataObjectType>() {
-			@Override
-			public int compare(InputDataObjectType inputDT_1, InputDataObjectType inputDT_2) {
-				return inputDT_1.getInputOrder() - inputDT_2.getInputOrder();
-			}
-		});
-	}
-
-	public static ResourceJobManager getResourceJobManager(ProcessContext processCtx) throws AppCatalogException, GFacException {
-		List<JobSubmissionInterface> jobSubmissionInterfaces = Factory.getDefaultAppCatalog().getComputeResource()
-				.getComputeResource(processCtx.getComputeResourceId()).getJobSubmissionInterfaces();
-
-		ResourceJobManager resourceJobManager = null;
-		JobSubmissionInterface jsInterface = null;
-		for (JobSubmissionInterface jobSubmissionInterface : jobSubmissionInterfaces) {
-			if (jobSubmissionInterface.getJobSubmissionProtocol() == processCtx.getJobSubmissionProtocol()) {
-				jsInterface = jobSubmissionInterface;
-				break;
-			}
-		}
-		if (jsInterface == null) {
-	        throw new GFacException("Job Submission interface cannot be empty at this point");
-		} else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
-			SSHJobSubmission sshJobSubmission = Factory.getDefaultAppCatalog().getComputeResource().getSSHJobSubmission
-					(jsInterface.getJobSubmissionInterfaceId());
-			processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process
-			// context method.
-			resourceJobManager = sshJobSubmission.getResourceJobManager();
-		} else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.LOCAL) {
-			LOCALSubmission localSubmission = Factory.getDefaultAppCatalog().getComputeResource().getLocalJobSubmission
-					(jsInterface.getJobSubmissionInterfaceId());
-			resourceJobManager = localSubmission.getResourceJobManager();
-		} else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH_FORK) {
-			SSHJobSubmission sshJobSubmission = Factory.getDefaultAppCatalog().getComputeResource().getSSHJobSubmission
-					(jsInterface.getJobSubmissionInterfaceId());
-			processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process
-			resourceJobManager = sshJobSubmission.getResourceJobManager();
-		} else {
-			throw new GFacException("Unsupported JobSubmissionProtocol - " + jsInterface.getJobSubmissionProtocol()
-					.name());
-		}
-
-		if (resourceJobManager == null) {
-			throw new GFacException("Resource Job Manager is empty.");
-		}
-		return resourceJobManager;
-	}
+        return taskCtx;
+    }
+
+    /**
+     * Persist task model
+     */
+    private void saveTaskModel(TaskContext taskContext) throws GFacException {
+        try {
+            TaskModel taskModel = taskContext.getTaskModel();
+            taskContext.getParentProcessContext().getExperimentCatalog().add(ExpCatChildDataType.TASK, taskModel,
+                    taskModel.getParentProcessId());
+        } catch (RegistryException e) {
+            throw new GFacException("Error while saving task model", e);
+        }
+    }
+
+    private TaskContext getTaskContext(ProcessContext processContext) {
+        TaskContext taskCtx = new TaskContext();
+        taskCtx.setParentProcessContext(processContext);
+        return taskCtx;
+    }
+
+
+    /**
+     * Sort input data type by input order.
+     */
+    private void sortByInputOrder(List<InputDataObjectType> processInputs) {
+        Collections.sort(processInputs, new Comparator<InputDataObjectType>() {
+            @Override
+            public int compare(InputDataObjectType inputDT_1, InputDataObjectType inputDT_2) {
+                return inputDT_1.getInputOrder() - inputDT_2.getInputOrder();
+            }
+        });
+    }
+
+    public static ResourceJobManager getResourceJobManager(ProcessContext processCtx) throws AppCatalogException, GFacException {
+        List<JobSubmissionInterface> jobSubmissionInterfaces = Factory.getDefaultAppCatalog().getComputeResource()
+                .getComputeResource(processCtx.getComputeResourceId()).getJobSubmissionInterfaces();
+
+        ResourceJobManager resourceJobManager = null;
+        JobSubmissionInterface jsInterface = null;
+        for (JobSubmissionInterface jobSubmissionInterface : jobSubmissionInterfaces) {
+            if (jobSubmissionInterface.getJobSubmissionProtocol() == processCtx.getJobSubmissionProtocol()) {
+                jsInterface = jobSubmissionInterface;
+                break;
+            }
+        }
+        if (jsInterface == null) {
+            throw new GFacException("Job Submission interface cannot be empty at this point");
+        } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+            SSHJobSubmission sshJobSubmission = Factory.getDefaultAppCatalog().getComputeResource().getSSHJobSubmission
+                    (jsInterface.getJobSubmissionInterfaceId());
+            processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process
+            // context method.
+            resourceJobManager = sshJobSubmission.getResourceJobManager();
+        } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.LOCAL) {
+            LOCALSubmission localSubmission = Factory.getDefaultAppCatalog().getComputeResource().getLocalJobSubmission
+                    (jsInterface.getJobSubmissionInterfaceId());
+            resourceJobManager = localSubmission.getResourceJobManager();
+        } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH_FORK) {
+            SSHJobSubmission sshJobSubmission = Factory.getDefaultAppCatalog().getComputeResource().getSSHJobSubmission
+                    (jsInterface.getJobSubmissionInterfaceId());
+            processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process
+            resourceJobManager = sshJobSubmission.getResourceJobManager();
+        } else {
+            throw new GFacException("Unsupported JobSubmissionProtocol - " + jsInterface.getJobSubmissionProtocol()
+                    .name());
+        }
+
+        if (resourceJobManager == null) {
+            throw new GFacException("Resource Job Manager is empty.");
+        }
+        return resourceJobManager;
+    }
 }