You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/07/16 21:57:15 UTC

airavata git commit: Send acknowledgement to processed processes, Handle task status after execute each task and handle failed scenarios.

Repository: airavata
Updated Branches:
  refs/heads/master e290cfe17 -> b7e914ee3


Send acknowledgement to processed processes, Handle task status after execute each task and handle failed scenarios.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b7e914ee
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b7e914ee
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b7e914ee

Branch: refs/heads/master
Commit: b7e914ee3c4c90b77676bd2c217bec3b3183dc76
Parents: e290cfe
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Thu Jul 16 15:57:12 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Thu Jul 16 15:57:12 2015 -0400

----------------------------------------------------------------------
 .../apache/airavata/gfac/core/GFacUtils.java    |  45 ++--
 .../gfac/core/context/ProcessContext.java       |  17 +-
 .../airavata/gfac/core/context/TaskContext.java |   3 +
 .../apache/airavata/gfac/core/task/Task.java    |  11 +-
 .../org/apache/airavata/gfac/impl/Factory.java  |   9 +
 .../airavata/gfac/impl/GFacEngineImpl.java      | 169 ++++++++-------
 .../apache/airavata/gfac/impl/GFacWorker.java   |  28 +++
 .../gfac/impl/task/AbstractSCPTask.java         |   2 -
 .../gfac/impl/task/ForkJobSubmissionTask.java   |   5 +-
 .../gfac/impl/task/LocalJobSubmissionTask.java  |  15 +-
 .../gfac/impl/task/SCPDataStageTask.java        |  81 ++++---
 .../gfac/impl/task/SCPInputDataStageTask.java   |  14 +-
 .../gfac/impl/task/SCPOutputDataStatgeTask.java |  10 +-
 .../gfac/impl/task/SSHEnvironmentSetupTask.java |  25 ++-
 .../gfac/impl/task/SSHJobSubmissionTask.java    | 212 ++++++++++++-------
 .../airavata/gfac/server/GfacServerHandler.java |   6 +-
 16 files changed, 413 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index b00240b..af10218 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -266,9 +266,7 @@ public class GFacUtils {
 	        // first we save job jobModel to the registry for sa and then save the job status.
 	        ProcessContext processContext = taskContext.getParentProcessContext();
 	        ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
-	        TaskStatus status = new TaskStatus();
-	        status.setState(state);
-	        taskContext.setTaskStatus(status);
+	        TaskStatus status = taskContext.getTaskStatus();
 	        status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
 	        experimentCatalog.add(ExpCatChildDataType.TASK_STATUS, status, taskContext.getTaskId());
 	        TaskIdentifier identifier = new TaskIdentifier(taskContext.getTaskId(),
@@ -286,20 +284,17 @@ public class GFacUtils {
         }
     }
 
-    public static void saveProcessStatus(ProcessContext processContext,
-                                      ProcessState state) throws GFacException {
+    public static void saveAndPublishProcessStatus(ProcessContext processContext) throws GFacException {
         try {
             // first we save job jobModel to the registry for sa and then save the job status.
             ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
-            ProcessStatus status = new ProcessStatus();
-            status.setState(state);
-            processContext.getProcessModel().setProcessStatus(status);
+            ProcessStatus status = processContext.getProcessStatus();
             status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
             experimentCatalog.add(ExpCatChildDataType.PROCESS_STATUS, status, processContext.getProcessId());
             ProcessIdentifier identifier = new ProcessIdentifier(processContext.getProcessId(),
                                                                  processContext.getProcessModel().getExperimentId(),
                                                                  processContext.getGatewayId());
-            ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(state, identifier);
+            ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier);
 	        MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
 			        AiravataUtils.getId(MessageType.PROCESS.name()), processContext.getGatewayId());
 	        msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
@@ -1094,27 +1089,35 @@ public class GFacUtils {
 		return GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + experimentId;
 	}
 
-	public static void createExperimentNode(CuratorFramework curatorClient, String gfacServerName, String
+	public static void createProcessZKNode(CuratorFramework curatorClient, String gfacServerName, String
 			processId, long deliveryTag, String token) throws Exception {
-		// create /experiments/processId node and set data - serverName, add redelivery listener
-		String experimentPath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
-		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath);
-		curatorClient.setData().withVersion(-1).forPath(experimentPath, gfacServerName.getBytes());
-		curatorClient.getData().usingWatcher(new RedeliveryRequestWatcher()).forPath(experimentPath);
-
-		// create /experiments/processId/deliveryTag node and set data - deliveryTag
-		String deliveryTagPath = ZKPaths.makePath(experimentPath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
+		// TODO - To handle multiple processes per experiment, need to create a /experiment/{expId}/{processId} node
+		// create /experiments/{processId} node and set data - serverName, add redelivery listener
+		String zkProcessNodePath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
+		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessNodePath);
+		curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes());
+		curatorClient.getData().usingWatcher(new RedeliveryRequestWatcher()).forPath(zkProcessNodePath);
+
+		// create /experiments/{processId}/deliveryTag node and set data - deliveryTag
+		String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
 		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), deliveryTagPath);
 		curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag));
 
-		// create /experiments/processId/token node and set data - token
+		// create /experiments/{processId}/token node and set data - token
 		String tokenNodePath = ZKPaths.makePath(processId, GFacConstants.ZOOKEEPER_TOKEN_NODE);
 		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), tokenNodePath);
 		curatorClient.setData().withVersion(-1).forPath(tokenNodePath, token.getBytes());
 
-		// create /experiments/processId/cancelListener node and set watcher for data changes
-		String cancelListenerNode = ZKPaths.makePath(experimentPath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+		// create /experiments/{processId}/cancelListener node and set watcher for data changes
+		String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
 		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), cancelListenerNode);
 		curatorClient.getData().usingWatcher(new CancelRequestWatcher()).forPath(cancelListenerNode);
 	}
+
+	public static long getProcessDeliveryTag(CuratorFramework curatorClient, String processId) throws Exception {
+		String deliveryTagPath = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + GFacConstants
+				.ZOOKEEPER_DELIVERYTAG_NODE;
+		byte[] bytes = curatorClient.getData().forPath(deliveryTagPath);
+		return GFacUtils.bytesToLong(bytes);
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index dc8dace..9e1ac06 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -21,7 +21,7 @@
 
 package org.apache.airavata.gfac.core.context;
 
-import org.apache.airavata.common.utils.LocalEventPublisher;
+import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
@@ -40,11 +40,15 @@ import org.apache.airavata.model.status.ProcessStatus;
 import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.registry.cpi.ExperimentCatalog;
 import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
 
 public class ProcessContext {
+
+	private static final Logger log = LoggerFactory.getLogger(ProcessContext.class);
 	// process model
 	private ExperimentCatalog experimentCatalog;
 	private AppCatalog appCatalog;
@@ -289,11 +293,16 @@ public class ProcessContext {
 
 	public void setProcessStatus(ProcessStatus status) {
 		if (status != null) {
+			log.info("expId: {}, processId: {} :- Status changed {} -> {}", getExperimentId(), processId,
+					getProcessState().name(), status.getState().name());
 			processModel.setProcessStatus(status);
-			// TODO publish process status change.
 		}
 	}
 
+	public ProcessStatus getProcessStatus(){
+		return processModel.getProcessStatus();
+	}
+
 	public String getComputeResourceId() {
 		return getComputeResourceDescription().getComputeResourceId();
 	}
@@ -321,4 +330,8 @@ public class ProcessContext {
 	public void setLocalWorkingDir(String localWorkingDir) {
 		this.localWorkingDir = localWorkingDir;
 	}
+
+	public String getExperimentId() {
+		return processModel.getExperimentId();
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
index 95d2fb9..597fd2e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
@@ -54,6 +54,9 @@ public class TaskContext {
 	}
 
 	public void setTaskStatus(TaskStatus taskStatus) {
+		log.info("expId: {}, processId: {}, taskId: {}, type: {}:- Status changed {} -> {}", parentProcessContext
+				.getExperimentId(), parentProcessContext.getProcessId(), getTaskId(), getTaskType().name(),
+				getTaskState().name(), taskStatus .getState().name());
 		taskModel.setTaskStatus(taskStatus);
 	}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
index 62c069a..f4eec85 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
@@ -22,6 +22,7 @@ package org.apache.airavata.gfac.core.task;
 
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskTypes;
 
 import java.util.Map;
@@ -42,19 +43,17 @@ public interface Task {
 	 * This method will be called at the first time of task chain execution. This method should called before recover
 	 * method. For a given task chain execute method only call one time. recover method may be called more than once.
 	 * @param taskContext
-	 * @throws TaskException
-	 * @return
+	 * @return completed task status if success otherwise failed task status.
 	 */
-	public TaskState execute(TaskContext taskContext) throws TaskException;
+	public TaskStatus execute(TaskContext taskContext);
 
 	/**
 	 * This methond will be invoked at recover path.Before this method is invoked, execute method should be invoked.
 	 * This method may be called zero or few time in a process chain.
 	 * @param taskContext
-	 * @throws TaskException
-	 * @return
+	 * @return completed task status if success otherwise failed task status.
 	 */
-	public TaskState recover(TaskContext taskContext) throws TaskException;
+	public TaskStatus recover(TaskContext taskContext);
 
 	/**
 	 * Task type will be used to identify the task behaviour. eg : DATA_STAGING , JOB_SUBMISSION

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index 51db6f3..a5fa5ed 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -52,6 +52,7 @@ import org.apache.airavata.gfac.impl.job.UGEJobConfiguration;
 import org.apache.airavata.gfac.impl.job.UGEOutputParser;
 import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
 import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer;
 import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
 import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
@@ -101,6 +102,7 @@ public abstract class Factory {
 	private static Map<DataMovementProtocol, Task> dataMovementTask = new HashMap<>();
 	private static Map<ResourceJobManagerType, ResourceConfig> resources = new HashMap<>();
 	private static Map<MonitorMode, JobMonitor> jobMonitorServices = new HashMap<>();
+	private static RabbitMQProcessLaunchConsumer processLaunchConsumer;
 
 	public static GFacEngine getGFacEngine() throws GFacException {
 		if (engine == null) {
@@ -145,6 +147,13 @@ public abstract class Factory {
 		return curatorClient;
 	}
 
+	public static RabbitMQProcessLaunchConsumer getProcessLaunchConsumer() throws AiravataException {
+		if (processLaunchConsumer == null) {
+			processLaunchConsumer = new RabbitMQProcessLaunchConsumer();
+		}
+		return processLaunchConsumer;
+	}
+
 	public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws GFacException {
 		ResourceConfig resourceConfig = Factory.getResourceConfig(resourceJobManager.getResourceJobManagerType());
 		OutputParser outputParser;

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index c7eba99..a4c8381 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -124,14 +124,23 @@ public class GFacEngineImpl implements GFacEngine {
 		TaskContext taskCtx = null;
 		List<TaskContext> taskChain = new ArrayList<>();
 		processContext.setProcessStatus(new ProcessStatus(ProcessState.CONFIGURING_WORKSPACE));
+		GFacUtils.saveAndPublishProcessStatus(processContext);
 		// Run all environment setup tasks
 		taskCtx = getEnvSetupTaskContext(processContext);
 		saveTaskModel(taskCtx);
 		GFacUtils.saveAndPublishTaskStatus(taskCtx);
 		SSHEnvironmentSetupTask envSetupTask = new SSHEnvironmentSetupTask();
-		executeTask(taskCtx, envSetupTask);
+		TaskStatus taskStatus = executeTask(taskCtx, envSetupTask);
+		if (taskStatus.getState() == TaskState.FAILED) {
+			log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " +
+					"reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx
+					.getParentProcessContext().getProcessId(), taskCtx.getTaskId(), envSetupTask.getType
+					().name(), taskStatus.getReason());
+			throw new GFacException("Error while environment setup");
+		}
 		// execute process inputs
 		processContext.setProcessStatus(new ProcessStatus(ProcessState.INPUT_DATA_STAGING));
+		GFacUtils.saveAndPublishProcessStatus(processContext);
 		List<InputDataObjectType> processInputs = processContext.getProcessModel().getProcessInputs();
 		sortByInputOrder(processInputs);
 		if (processInputs != null) {
@@ -151,7 +160,14 @@ public class GFacEngineImpl implements GFacEngine {
 						saveTaskModel(taskCtx);
 						GFacUtils.saveAndPublishTaskStatus(taskCtx);
 						Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
-						executeTask(taskCtx, dMoveTask);
+						taskStatus = executeTask(taskCtx, dMoveTask);
+						if (taskStatus.getState() == TaskState.FAILED) {
+							log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " +
+									"reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx
+									.getParentProcessContext().getProcessId(), taskCtx.getTaskId(), dMoveTask.getType
+									().name(), taskStatus.getReason());
+							throw new GFacException("Error while staging input data");
+						}
 						break;
 					default:
 						// nothing to do
@@ -160,29 +176,87 @@ public class GFacEngineImpl implements GFacEngine {
 			}
 		}
 		processContext.setProcessStatus(new ProcessStatus(ProcessState.EXECUTING));
+		GFacUtils.saveAndPublishProcessStatus(processContext);
 		taskCtx = getJobSubmissionTaskContext(processContext);
+		saveTaskModel(taskCtx);
 		GFacUtils.saveAndPublishTaskStatus(taskCtx);
 		JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol());
-		executeTask(taskCtx, jobSubmissionTask);
+		taskStatus = executeTask(taskCtx, jobSubmissionTask);
+		if (taskStatus.getState() == TaskState.FAILED) {
+			throw new GFacException("Job submission task failed");
+		}
 		processContext.setTaskChain(taskChain);
 	}
 
-	private void executeTask(TaskContext taskCtx, Task task) throws GFacException {
-		try {
-			taskCtx.setTaskStatus(new TaskStatus(TaskState.EXECUTING));
-			GFacUtils.saveAndPublishTaskStatus(taskCtx);
-			task.execute(taskCtx);
-			taskCtx.setTaskStatus(new TaskStatus(TaskState.COMPLETED));
-			GFacUtils.saveAndPublishTaskStatus(taskCtx);
-		} catch (TaskException e) {
-			TaskStatus status = new TaskStatus(TaskState.FAILED);
-			status.setReason(taskCtx.getTaskType().toString() + " Task Failed to execute");
-			taskCtx.setTaskStatus(status);
-			GFacUtils.saveAndPublishTaskStatus(taskCtx);
+
+	@Override
+	public void recoverProcess(ProcessContext processContext) throws GFacException {
+
+	}
+
+	@Override
+	public void runProcessOutflow(ProcessContext processContext) throws GFacException {
+		TaskContext taskCtx = null;
+		processContext.setProcessStatus(new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING));
+		GFacUtils.saveAndPublishProcessStatus(processContext);
+		List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
+		for (OutputDataObjectType processOutput : processOutputs) {
+			DataType type = processOutput.getType();
+			switch (type) {
+				case STDERR:
+					break;
+				case STDOUT:
+					break;
+				case URI:
+					try {
+						taskCtx = getDataStagingTaskContext(processContext, processOutput);
+					} catch (TException e) {
+						throw new GFacException("Thrift model to byte[] convertion issue", e);
+					}
+					File localWorkingdir = new File(taskCtx.getLocalWorkingDir());
+					localWorkingdir.mkdirs(); // make local dir if not exist
+					saveTaskModel(taskCtx);
+					GFacUtils.saveAndPublishTaskStatus(taskCtx);
+					Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
+					TaskStatus taskStatus = executeTask(taskCtx, dMoveTask);
+					if (taskStatus.getState() == TaskState.FAILED) {
+						log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " +
+								"reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx
+								.getParentProcessContext().getProcessId(), taskCtx.getTaskId(), dMoveTask.getType
+								().name(), taskStatus.getReason());
+						throw new GFacException("Error while staging input data");
+					}
+					break;
+				default:
+					// nothing to do
+					break;
+			}
 		}
+		processContext.setProcessStatus(new ProcessStatus(ProcessState.POST_PROCESSING));
+		GFacUtils.saveAndPublishProcessStatus(processContext);
+//		taskCtx = getEnvCleanupTaskContext(processContext);
+
+	}
+
+	@Override
+	public void recoverProcessOutflow(ProcessContext processContext) throws GFacException {
+
+	}
+
+	@Override
+	public void cancelProcess() throws GFacException {
 
 	}
 
+	private TaskStatus executeTask(TaskContext taskCtx, Task task) throws GFacException {
+		taskCtx.setTaskStatus(new TaskStatus(TaskState.EXECUTING));
+		GFacUtils.saveAndPublishTaskStatus(taskCtx);
+		TaskStatus taskStatus = task.execute(taskCtx);
+		taskCtx.setTaskStatus(taskStatus);
+		GFacUtils.saveAndPublishTaskStatus(taskCtx);
+		return taskCtx.getTaskStatus();
+	}
+
 	private TaskContext getJobSubmissionTaskContext(ProcessContext processContext) throws GFacException {
 		TaskContext taskCtx = new TaskContext();
 		taskCtx.setParentProcessContext(processContext);
@@ -197,7 +271,8 @@ public class GFacEngineImpl implements GFacEngine {
 		return taskCtx;
 	}
 
-	private TaskContext getDataStagingTaskContext(ProcessContext processContext, InputDataObjectType processInput) throws TException {
+	private TaskContext getDataStagingTaskContext(ProcessContext processContext, InputDataObjectType processInput)
+			throws TException {
 		TaskContext taskCtx = new TaskContext();
 		taskCtx.setParentProcessContext(processContext);
 		// create new task model for this task
@@ -210,13 +285,15 @@ public class GFacEngineImpl implements GFacEngine {
 		// create data staging sub task model
 		DataStagingTaskModel submodel = new DataStagingTaskModel();
 		submodel.setSource(processInput.getValue());
-		submodel.setDestination(processContext.getDataMovementProtocol().name() + ":" + processContext.getWorkingDir());
+		submodel.setDestination(processContext.getDataMovementProtocol().name() + ":" + processContext.getWorkingDir
+				());
 		taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
 		taskCtx.setTaskModel(taskModel);
 		return taskCtx;
 	}
 
-	private TaskContext getDataStagingTaskContext(ProcessContext processContext, OutputDataObjectType processOutput) throws TException {
+	private TaskContext getDataStagingTaskContext(ProcessContext processContext, OutputDataObjectType processOutput)
+			throws TException {
 		TaskContext taskCtx = new TaskContext();
 		taskCtx.setParentProcessContext(processContext);
 		// create new task model for this task
@@ -241,7 +318,6 @@ public class GFacEngineImpl implements GFacEngine {
 
 	/**
 	 * Persist task model
-	 * @param taskContext
 	 */
 	private void saveTaskModel(TaskContext taskContext) throws GFacException {
 		try {
@@ -266,56 +342,6 @@ public class GFacEngineImpl implements GFacEngine {
 		return taskCtx;
 	}
 
-	@Override
-	public void recoverProcess(ProcessContext processContext) throws GFacException {
-
-	}
-
-	@Override
-	public void runProcessOutflow(ProcessContext processContext) throws GFacException {
-		TaskContext taskCtx = null;
-		processContext.setProcessStatus(new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING));
-		List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
-		for (OutputDataObjectType processOutput : processOutputs) {
-			DataType type = processOutput.getType();
-			switch (type) {
-				case STDERR:
-					break;
-				case STDOUT:
-					break;
-				case URI:
-					// TODO : Provide data staging data model
-					try {
-						taskCtx = getDataStagingTaskContext(processContext, processOutput);
-					} catch (TException e) {
-						throw new GFacException("Thrift model to byte[] convertion issue", e);
-					}
-					File localWorkingdir = new File(taskCtx.getLocalWorkingDir());
-					localWorkingdir.mkdirs(); // make local dir if not exist
-					saveTaskModel(taskCtx);
-					GFacUtils.saveAndPublishTaskStatus(taskCtx);
-					Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
-					executeTask(taskCtx, dMoveTask);
-					break;
-				default:
-					// nothing to do
-					break;
-			}
-		}
-		processContext.setProcessStatus(new ProcessStatus(ProcessState.POST_PROCESSING));
-//		taskCtx = getEnvCleanupTaskContext(processContext);
-
-	}
-
-	@Override
-	public void recoverProcessOutflow(ProcessContext processContext) throws GFacException {
-
-	}
-
-	@Override
-	public void cancelProcess() throws GFacException {
-
-	}
 
 	/**
 	 * Sort input data type by input order.
@@ -331,7 +357,7 @@ public class GFacEngineImpl implements GFacEngine {
 
 	public static ResourceJobManager getResourceJobManager(ProcessContext processCtx) throws AppCatalogException {
 		List<JobSubmissionInterface> jobSubmissionInterfaces = Factory.getDefaultAppCatalog().getComputeResource()
-				.getComputeResource(processCtx.getComputeResourceId()) .getJobSubmissionInterfaces();
+				.getComputeResource(processCtx.getComputeResourceId()).getJobSubmissionInterfaces();
 
 		ResourceJobManager resourceJobManager = null;
 		JobSubmissionInterface jsInterface = null;
@@ -346,7 +372,8 @@ public class GFacEngineImpl implements GFacEngine {
 		} else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
 			SSHJobSubmission sshJobSubmission = Factory.getDefaultAppCatalog().getComputeResource().getSSHJobSubmission
 					(jsInterface.getJobSubmissionInterfaceId());
-			processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process context method.
+			processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process
+			// context method.
 			resourceJobManager = sshJobSubmission.getResourceJobManager();
 		} else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.LOCAL) {
 			LOCALSubmission localSubmission = Factory.getDefaultAppCatalog().getComputeResource().getLocalJobSubmission

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 899f684..a759f90 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -25,6 +25,7 @@ import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.gfac.core.GFac;
 import org.apache.airavata.gfac.core.GFacEngine;
 import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.gfac.core.monitor.JobMonitor;
 import org.apache.airavata.model.status.ProcessState;
@@ -32,6 +33,8 @@ import org.apache.airavata.model.status.ProcessStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.text.MessageFormat;
+
 public class GFacWorker implements Runnable {
 
 	private static final Logger log = LoggerFactory.getLogger(GFacWorker.class);
@@ -85,11 +88,15 @@ public class GFacWorker implements Runnable {
 						// run the outflow task
 						engine.runProcessOutflow(processContext);
 						processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED));
+						GFacUtils.saveAndPublishProcessStatus(processContext);
+						sendAck();
 						break;
 					case RECOVER_OUTFLOW:
 						// recover  outflow task;
 						engine.recoverProcessOutflow(processContext);
 						processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED));
+						GFacUtils.saveAndPublishProcessStatus(processContext);
+						sendAck();
 						break;
 					default:
 						throw new GFacException("process Id : " + processId + " Couldn't identify process type");
@@ -113,6 +120,14 @@ public class GFacWorker implements Runnable {
 			}
 		} catch (GFacException e) {
 			log.error("GFac Worker throws an exception", e);
+			processContext.setProcessStatus(new ProcessStatus(ProcessState.FAILED));
+			try {
+				GFacUtils.saveAndPublishProcessStatus(processContext);
+			} catch (GFacException e1) {
+				log.error("expId: {}, processId: {} :- Couldn't save and publish process status {}", processContext
+						.getExperimentId(), processContext.getProcessId(), processContext.getProcessState());
+			}
+			sendAck();
 		}
 	}
 
@@ -142,6 +157,19 @@ public class GFacWorker implements Runnable {
 		}
 	}
 
+	private void sendAck() {
+		try {
+			long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(), processId);
+			Factory.getProcessLaunchConsumer().sendAck(processDeliveryTag);
+			log.info("expId: {}, procesId: {} :- Sent ack for deliveryTag {}", processContext.getExperimentId(),
+					processId, processDeliveryTag);
+		} catch (Exception e1) {
+			String format = MessageFormat.format("expId: {0}, processId: {1} :- Couldn't send ack for deliveryTag ",
+					processContext .getExperimentId(), processId);
+			log.error(format, e1);
+		}
+	}
+
 	private ProcessType getProcessType(ProcessContext processContext) {
 		// check the status and return correct type of process.
 		switch (processContext.getProcessState()) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
index a34997c..17746f4 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
@@ -20,10 +20,8 @@
  */
 package org.apache.airavata.gfac.impl.task;
 
-import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.model.status.TaskState;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
index dbc1a97..f8ef0ea 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
@@ -25,6 +25,7 @@ import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskTypes;
 
 import java.util.Map;
@@ -36,12 +37,12 @@ public class ForkJobSubmissionTask implements JobSubmissionTask {
     }
 
     @Override
-    public TaskState execute(TaskContext taskContext) throws TaskException {
+    public TaskStatus execute(TaskContext taskContext) {
         return null;
     }
 
     @Override
-    public TaskState recover(TaskContext taskContext) throws TaskException {
+    public TaskStatus recover(TaskContext taskContext) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
index ad7ab6d..5201de6 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
@@ -36,6 +36,7 @@ import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.JobState;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskTypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,8 +54,8 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
     }
 
     @Override
-    public TaskState execute(TaskContext taskContext) throws TaskException {
-        try {
+    public TaskStatus execute(TaskContext taskContext) {
+     /*   try {
             ProcessContext processContext = taskContext.getParentProcessContext();
             // build command with all inputs
             List<String> cmdList = buildCommand(processContext);
@@ -97,10 +98,10 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
             standardOutWriter.join();
             standardErrorWriter.join();
 
-            /*
+            *//*
              * check return value. usually not very helpful to draw conclusions based on return values so don't bother.
              * just provide warning in the log messages
-             */
+             *//*
             if (returnValue != 0) {
                 log.error("Process finished with non zero return value. Process may have failed");
             } else {
@@ -124,12 +125,12 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
         } catch (IOException e) {
             log.error("Error while submitting local job", e);
             throw new TaskException("Error while submitting local job", e);
-        }
-        return TaskState.COMPLETED;
+        }*/
+	    return new TaskStatus(TaskState.COMPLETED);
     }
 
     @Override
-    public TaskState recover(TaskContext taskContext) throws TaskException {
+    public TaskStatus recover(TaskContext taskContext) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index 089535e..b2a83ed 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -20,63 +20,88 @@
  */
 package org.apache.airavata.gfac.impl.task;
 
-import com.jcraft.jsch.JSch;
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.DataStagingTaskModel;
 import org.apache.airavata.model.task.TaskTypes;
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Map;
 
 public class SCPDataStageTask implements Task {
+	private static final Logger log = LoggerFactory.getLogger(SCPDataStageTask.class);
+
 	@Override
 	public void init(Map<String, String> propertyMap) throws TaskException {
 
 	}
 
 	@Override
-	public TaskState execute(TaskContext taskContext) throws TaskException {
-
+	public TaskStatus execute(TaskContext taskContext) {
+		TaskStatus status = new TaskStatus(TaskState.COMPLETED);
 		if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
-			throw new TaskException("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found "
+			status.setState(TaskState.FAILED);
+			status.setReason("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found "
 					+ taskContext.getTaskModel().getTaskType().toString());
-		}
-		try {
-			DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext
-					.getTaskModel());
-			URI sourceURI = new URI(subTaskModel.getSource());
-			URI destinationURI = new URI(subTaskModel.getDestination());
+		} else {
+			try {
+				DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext
+						.getTaskModel());
+				URI sourceURI = new URI(subTaskModel.getSource());
+				URI destinationURI = new URI(subTaskModel.getDestination());
 
-			if (sourceURI.getScheme().equalsIgnoreCase("file")) {  //  Airavata --> RemoteCluster
-				taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(), destinationURI
-						.getPath());
-			} else { // RemoteCluster --> Airavata
-				taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(), destinationURI
-						.getPath());
+				if (sourceURI.getScheme().equalsIgnoreCase("file")) {  //  Airavata --> RemoteCluster
+					taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(), destinationURI
+							.getPath());
+				} else { // RemoteCluster --> Airavata
+					taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(), destinationURI
+							.getPath());
+				}
+				status.setReason("Successfully staged data");
+			} catch (SSHApiException e) {
+				String msg = "Scp attempt failed";
+				log.error(msg, e);
+				status.setState(TaskState.FAILED);
+				status.setReason(msg);
+				ErrorModel errorModel = new ErrorModel();
+				errorModel.setActualErrorMessage(e.getMessage());
+				errorModel.setUserFriendlyMessage(msg);
+				taskContext.getTaskModel().setTaskError(errorModel);
+			} catch (TException e) {
+				String msg = "Invalid task invocation";
+				log.error(msg, e);
+				status.setState(TaskState.FAILED);
+				status.setReason(msg);
+				ErrorModel errorModel = new ErrorModel();
+				errorModel.setActualErrorMessage(e.getMessage());
+				errorModel.setUserFriendlyMessage(msg);
+				taskContext.getTaskModel().setTaskError(errorModel);
+			} catch (URISyntaxException e) {
+				String msg = "source or destination is not a valid URI";
+				log.error(msg, e);
+				status.setState(TaskState.FAILED);
+				status.setReason(msg);
+				ErrorModel errorModel = new ErrorModel();
+				errorModel.setActualErrorMessage(e.getMessage());
+				errorModel.setUserFriendlyMessage(msg);
+				taskContext.getTaskModel().setTaskError(errorModel);
 			}
-		} catch (SSHApiException e) {
-			throw new TaskException("Scp attempt failed", e);
-		} catch (TException e) {
-			throw new TaskException("Invalid task invocation");
-		} catch (URISyntaxException e) {
-			throw new TaskException("source or destination is not a valid URI");
 		}
-		return null;
+		return status;
 	}
 
 	@Override
-	public TaskState recover(TaskContext taskContext) throws TaskException {
+	public TaskStatus recover(TaskContext taskContext) {
 		return null;
 	}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
index 332a0aa..fc4d634 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
@@ -23,23 +23,19 @@ package org.apache.airavata.gfac.impl.task;
 import com.jcraft.jsch.JSch;
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
-import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.Factory;
 import org.apache.airavata.gfac.impl.SSHUtils;
-import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.DataStagingTaskModel;
 import org.apache.airavata.model.task.TaskTypes;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
-import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.net.URL;
 
 public class SCPInputDataStageTask extends AbstractSCPTask {
 
@@ -47,9 +43,9 @@ public class SCPInputDataStageTask extends AbstractSCPTask {
 	}
 
 	@Override
-	public TaskState execute(TaskContext taskContext) throws TaskException {
+	public TaskStatus execute(TaskContext taskContext) {
 
-		if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
+/*		if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
 			throw new TaskException("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found "
 					+ taskContext.getTaskModel().getTaskType().toString());
 		}
@@ -81,12 +77,12 @@ public class SCPInputDataStageTask extends AbstractSCPTask {
 			throw new TaskException("Invalid task invocation");
 		} catch (URISyntaxException e) {
 			e.printStackTrace();
-		}
+		}*/
 		return null;
 	}
 
 	@Override
-	public TaskState recover(TaskContext taskContext) throws TaskException {
+	public TaskStatus recover(TaskContext taskContext) {
 		return null;
 	}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
index 72e071c..6fa87c4 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
@@ -28,7 +28,7 @@ import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.gfac.impl.SSHUtils;
-import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.DataStagingTaskModel;
 import org.apache.airavata.model.task.TaskTypes;
 import org.apache.thrift.TException;
@@ -41,8 +41,8 @@ public class SCPOutputDataStatgeTask extends AbstractSCPTask {
 
 
 	@Override
-	public TaskState execute(TaskContext taskContext) throws TaskException {
-		if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
+	public TaskStatus execute(TaskContext taskContext) {
+/*		if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
 			throw new TaskException("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found "
 					+ taskContext.getTaskModel().getTaskType().toString());
 		}
@@ -69,12 +69,12 @@ public class SCPOutputDataStatgeTask extends AbstractSCPTask {
 			throw new TaskException("Scp failed", e);
 		} catch (TException e) {
 			throw new TaskException("Invalid task invocation");
-		}
+		}*/
 		return null;
 	}
 
 	@Override
-	public TaskState recover(TaskContext taskContext) throws TaskException {
+	public TaskStatus recover(TaskContext taskContext) {
 		return null;
 	}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
index e541644..74f5826 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
@@ -20,37 +20,50 @@
  */
 package org.apache.airavata.gfac.impl.task;
 
-import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
 public class SSHEnvironmentSetupTask implements Task {
 
+	private static final Logger log = LoggerFactory.getLogger(SSHEnvironmentSetupTask.class);
 	@Override
 	public void init(Map<String, String> propertyMap) throws TaskException {
 
 	}
 
 	@Override
-	public TaskState execute(TaskContext taskContext) throws TaskException {
-
+	public TaskStatus execute(TaskContext taskContext) {
+		TaskStatus status = new TaskStatus(TaskState.COMPLETED);
 		try {
 			RemoteCluster remoteCluster = taskContext.getParentProcessContext().getRemoteCluster();
 			remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir());
+			status.setReason("Successfully createded environment");
 		} catch (SSHApiException e) {
-			throw new TaskException("Error while environment setup", e);
+			String msg = "Error while environment setup";
+			log.error(msg, e);
+			status.setState(TaskState.FAILED);
+			status.setReason(msg);
+			ErrorModel errorModel = new ErrorModel();
+			errorModel.setActualErrorMessage(e.getMessage());
+			errorModel.setUserFriendlyMessage(msg);
+			taskContext.getTaskModel().setTaskError(errorModel);
 		}
-		return null;
+		return status;
 	}
 
 	@Override
-	public TaskState recover(TaskContext taskContext) throws TaskException {
+	public TaskStatus recover(TaskContext taskContext) {
 		return execute(taskContext);
 	}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
index ff3a6f8..c282f17 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
@@ -31,10 +31,12 @@ import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.gfac.impl.Factory;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.JobState;
 import org.apache.airavata.model.status.JobStatus;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.commons.io.FileUtils;
@@ -53,87 +55,143 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
     }
 
     @Override
-    public TaskState execute(TaskContext taskContext) throws TaskException {
-        try {
-            ProcessContext processContext = taskContext.getParentProcessContext();
-            JobModel jobModel = processContext.getJobModel();
-            if (jobModel == null){
-                jobModel = new JobModel();
-	            jobModel.setWorkingDir(processContext.getWorkingDir());
-	            jobModel.setTaskId(taskContext.getTaskId());
-	            jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
-            }
-            RemoteCluster remoteCluster = processContext.getRemoteCluster();
-            JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext);
-            jobModel.setJobName(jobDescriptor.getJobName());
-            ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
-            JobManagerConfiguration jConfig = null;
-            if (resourceJobManager != null) {
-                jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
-            }
-            File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig);
-            if (jobFile != null && jobFile.exists()){
-                jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
-                String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir());
-                if (jobId != null && !jobId.isEmpty()) {
-                    jobModel.setJobId(jobId);
-                    GFacUtils.saveJobStatus(taskContext, jobModel, JobState.SUBMITTED);
+    public TaskStatus execute(TaskContext taskContext){
+	    TaskStatus status = new TaskStatus(TaskState.COMPLETED); // set to completed.
+	    try {
+		    ProcessContext processContext = taskContext.getParentProcessContext();
+		    JobModel jobModel = processContext.getJobModel();
+		    if (jobModel == null) {
+			    jobModel = new JobModel();
+			    jobModel.setWorkingDir(processContext.getWorkingDir());
+			    jobModel.setTaskId(taskContext.getTaskId());
+			    jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+		    }
+		    RemoteCluster remoteCluster = processContext.getRemoteCluster();
+		    JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext);
+		    jobModel.setJobName(jobDescriptor.getJobName());
+		    ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+		    JobManagerConfiguration jConfig = null;
+		    if (resourceJobManager != null) {
+			    jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
+		    }
+		    File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig);
+		    if (jobFile != null && jobFile.exists()) {
+			    jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+			    String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir());
+			    if (jobId != null && !jobId.isEmpty()) {
+				    jobModel.setJobId(jobId);
+				    GFacUtils.saveJobStatus(taskContext, jobModel, JobState.SUBMITTED);
 //                    publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
 //                            , GfacExperimentState.JOBSUBMITTED));
-                    processContext.setJobModel(jobModel);
-                    if (verifyJobSubmissionByJobId(remoteCluster, jobId)) {
+				    processContext.setJobModel(jobModel);
+				    if (verifyJobSubmissionByJobId(remoteCluster, jobId)) {
 //                        publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
 //                                , GfacExperimentState.JOBSUBMITTED));
-                        GFacUtils.saveJobStatus(taskContext, jobModel, JobState.QUEUED);
-                    }
-                } else {
-                    processContext.setJobModel(jobModel);
-                    int verificationTryCount = 0;
-                    while (verificationTryCount++ < 3) {
-                        String verifyJobId = verifyJobSubmission(remoteCluster, jobModel);
-                        if (verifyJobId != null && !verifyJobId.isEmpty()) {
-                            // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
-                            jobId = verifyJobId;
-                            jobModel.setJobId(jobId);
+					    GFacUtils.saveJobStatus(taskContext, jobModel, JobState.QUEUED);
+				    }
+				    status = new TaskStatus(TaskState.COMPLETED);
+				    status.setReason("Submitted job to compute resource");
+			    } else {
+				    processContext.setJobModel(jobModel);
+				    int verificationTryCount = 0;
+				    while (verificationTryCount++ < 3) {
+					    String verifyJobId = verifyJobSubmission(remoteCluster, jobModel);
+					    if (verifyJobId != null && !verifyJobId.isEmpty()) {
+						    // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
+						    jobId = verifyJobId;
+						    jobModel.setJobId(jobId);
 //                            publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
 //                                    , GfacExperimentState.JOBSUBMITTED));
-                            GFacUtils.saveJobStatus(taskContext, jobModel, JobState.QUEUED);
-                            break;
-                        }
-                        Thread.sleep(verificationTryCount * 1000);
-                    }
-                }
+						    GFacUtils.saveJobStatus(taskContext, jobModel, JobState.QUEUED);
+						    status.setState(TaskState.COMPLETED);
+						    status.setReason("Submitted job to compute resource");
+						    break;
+					    }
+					    Thread.sleep(verificationTryCount * 1000);
+				    }
+			    }
 
-                if (jobId == null || jobId.isEmpty()) {
-                    String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find remote jobId for JobName:"
-                            + jobModel.getJobName() + ", both submit and verify steps doesn't return a valid JobId. Hence changing experiment state to Failed";
-                    log.error(msg);
-                    GFacUtils.saveErrorDetails(processContext, msg);
-                    // FIXME : Need to handle according to status update chain
-//                    GFacUtils.publishTaskStatus(jobExecutionContext, publisher, TaskState.FAILED);
-                    return TaskState.FAILED;
-                }
-            }
-            return TaskState.COMPLETED;
-        } catch (AppCatalogException e) {
-            log.error("Error while instatiating app catalog",e);
-            throw new TaskException("Error while instatiating app catalog", e);
-        } catch (ApplicationSettingsException e) {
-            log.error("Error occurred while creating job descriptor", e);
-            throw new TaskException("Error occurred while creating job descriptor", e);
-        } catch (GFacException e) {
-            log.error("Error occurred while creating job descriptor", e);
-            throw new TaskException("Error occurred while creating job descriptor", e);
-        } catch (SSHApiException e) {
-            log.error("Error occurred while submitting the job", e);
-            throw new TaskException("Error occurred while submitting the job", e);
-        } catch (IOException e) {
-            log.error("Error while reading the content of the job file", e);
-            throw new TaskException("Error while reading the content of the job file", e);
-        } catch (InterruptedException e) {
-            log.error("Error occurred while verifying the job submission", e);
-            throw new TaskException("Error occurred while verifying the job submission", e);
-        }
+			    if (jobId == null || jobId.isEmpty()) {
+				    String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find " +
+						    "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
+						    "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
+				    log.error(msg);
+				    GFacUtils.saveErrorDetails(processContext, msg);
+				    status.setState(TaskState.FAILED);
+				    status.setReason("Couldn't find job id in both submitted and verified steps");
+			    }
+		    } else {
+			    status.setState(TaskState.FAILED);
+			    if (jobFile == null) {
+				    status.setReason("JobFile is null");
+			    } else {
+				    status.setReason("Job file doesn't exist");
+			    }
+		    }
+
+	    } catch (AppCatalogException e) {
+		    String msg = "Error while instatiating app catalog";
+		    log.error(msg, e);
+		    status.setState(TaskState.FAILED);
+		    status.setReason(msg);
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskError(errorModel);
+	    } catch (ApplicationSettingsException e) {
+		    String msg = "Error occurred while creating job descriptor";
+		    log.error(msg, e);
+		    status.setState(TaskState.FAILED);
+		    status.setReason(msg);
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskError(errorModel);
+	    } catch (GFacException e) {
+		    String msg = "Error occurred while creating job descriptor";
+		    log.error(msg, e);
+		    status.setState(TaskState.FAILED);
+		    status.setReason(msg);
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskError(errorModel);
+	    } catch (SSHApiException e) {
+		    String msg = "Error occurred while submitting the job";
+		    log.error(msg, e);
+		    status.setState(TaskState.FAILED);
+		    status.setReason(msg);
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskError(errorModel);
+	    } catch (IOException e) {
+		    String msg = "Error while reading the content of the job file";
+		    log.error(msg, e);
+		    status.setState(TaskState.FAILED);
+		    status.setReason(msg);
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskError(errorModel);
+	    } catch (InterruptedException e) {
+		    String msg = "Error occurred while verifying the job submission";
+		    log.error(msg, e);
+		    status.setState(TaskState.FAILED);
+		    status.setReason(msg);
+		    ErrorModel errorModel = new ErrorModel();
+		    errorModel.setActualErrorMessage(e.getMessage());
+		    errorModel.setUserFriendlyMessage(msg);
+		    taskContext.getTaskModel().setTaskError(errorModel);
+	    }
+
+	    taskContext.setTaskStatus(status);
+	    try {
+		    GFacUtils.saveAndPublishTaskStatus(taskContext);
+	    } catch (GFacException e) {
+		    log.error("Error while saving task status", e);
+	    }
+	    return status;
     }
 
     private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, String jobID) throws SSHApiException {
@@ -154,15 +212,15 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 
 
     @Override
-    public TaskState recover(TaskContext taskContext) throws TaskException {
+    public TaskStatus recover(TaskContext taskContext) {
             ProcessContext processContext = taskContext.getParentProcessContext();
             JobModel jobModel = processContext.getJobModel();
             // original job failed before submitting
             if (jobModel == null || jobModel.getJobId() == null ){
                 return execute(taskContext);
             }else {
-                // job is already submitted and monitor should handle the recovery
-                return TaskState.COMPLETED;
+	            // job is already submitted and monitor should handle the recovery
+	            return new TaskStatus(TaskState.COMPLETED);
             }
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 6fa1288..56d9d9c 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -38,7 +38,6 @@ import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.MessagingConstants;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
 import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
@@ -92,7 +91,7 @@ public class GfacServerHandler implements GfacService.Iface {
     }
 
     private void initAMQPClient() throws AiravataException {
-        rabbitMQProcessLaunchConsumer = new RabbitMQProcessLaunchConsumer();
+        rabbitMQProcessLaunchConsumer = Factory.getProcessLaunchConsumer();
         rabbitMQProcessLaunchConsumer.listen(new ProcessLaunchMessageHandler());
     }
 
@@ -220,7 +219,8 @@ public class GfacServerHandler implements GfacService.Iface {
                     status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
                     Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
                     try {
-	                    GFacUtils.createExperimentNode(curatorClient, gfacServerName, event.getProcessId(), message.getDeliveryTag(),
+	                    GFacUtils.createProcessZKNode(curatorClient, gfacServerName, event.getProcessId(), message
+					                    .getDeliveryTag(),
 			                    event.getTokenId());
                         submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
                     } catch (Exception e) {