You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/07/09 21:39:29 UTC

[2/2] airavata git commit: Implemented basic task execution steps

Implemented basic task execution steps


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/8225ab55
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/8225ab55
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/8225ab55

Branch: refs/heads/master
Commit: 8225ab5563c5203eb6028074c64bc4e358d0d7b1
Parents: c857f82
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Thu Jul 9 15:39:20 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Thu Jul 9 15:39:20 2015 -0400

----------------------------------------------------------------------
 .../apache/airavata/gfac/core/GFacEngine.java   |   2 -
 .../gfac/core/context/ProcessContext.java       |  13 +-
 .../airavata/gfac/core/context/TaskContext.java |  26 +++
 .../apache/airavata/gfac/core/task/Task.java    |   8 +
 .../org/apache/airavata/gfac/impl/Factory.java  |   2 +-
 .../airavata/gfac/impl/GFacEngineImpl.java      | 198 ++++++++++++++++---
 .../apache/airavata/gfac/impl/GFacWorker.java   |   5 +-
 .../gfac/impl/task/AbstractSCPTask.java         |  14 +-
 .../gfac/impl/task/ForkJobSubmissionTask.java   |   6 +
 .../gfac/impl/task/LocalJobSubmissionTask.java  |   6 +
 .../gfac/impl/task/SCPInputDataStageTask.java   |   5 +
 .../gfac/impl/task/SCPOutputDataStatgeTask.java |   5 +
 .../gfac/impl/task/SSHEnvironmentSetupTask.java |   6 +
 .../gfac/impl/task/SSHJobSubmissionTask.java    |   6 +
 14 files changed, 252 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
index 8708625..09158fb 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
@@ -27,8 +27,6 @@ public interface GFacEngine {
 
 	public ProcessContext populateProcessContext(String processId, String gatewayId, String tokenId) throws GFacException;
 
-	public void createTaskChain(ProcessContext processContext) throws GFacException;
-
 	public void executeProcess(ProcessContext processContext) throws GFacException ;
 
 	public void recoverProcess(ProcessContext processContext) throws GFacException ;

http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index a3796d4..fe76bf5 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -34,6 +34,7 @@ import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfil
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
 import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.registry.cpi.ExperimentCatalog;
 import org.apache.curator.framework.CuratorFramework;
@@ -54,7 +55,7 @@ public class ProcessContext {
 	private String workingDir;
 	private String inputDir;
     private String outputDir;
-	private List<Task> taskChain;
+	private List<TaskContext> taskChain;
 	private GatewayResourceProfile gatewayResourceProfile;
     private ComputeResourceDescription computeResourceDescription;
     private ApplicationDeploymentDescription applicationDeploymentDescription;
@@ -136,11 +137,11 @@ public class ProcessContext {
 		this.workingDir = workingDir;
 	}
 
-	public List<Task> getTaskChain() {
+	public List<TaskContext> getTaskChain() {
 		return taskChain;
 	}
 
-	public void setTaskChain(List<Task> taskChain) {
+	public void setTaskChain(List<TaskContext> taskChain) {
 		this.taskChain = taskChain;
 	}
 
@@ -266,4 +267,10 @@ public class ProcessContext {
 		return processModel.getProcessStatus().getState();
 	}
 
+	public void setProcessStatus(ProcessStatus status) {
+		if (status != null) {
+			processModel.setProcessStatus(status);
+			// TODO publish process status change.
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
index 477c1ae..1be5142 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
@@ -20,7 +20,10 @@
  */
 package org.apache.airavata.gfac.core.context;
 
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.model.task.TaskTypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,4 +49,27 @@ public class TaskContext {
 		this.parentProcessContext = parentProcessContext;
 	}
 
+	public String getWorkingDir() {
+		return getParentProcessContext().getWorkingDir();
+	}
+
+	public void setTaskStatus(TaskStatus taskStatus) {
+		taskModel.setTaskStatus(taskStatus);
+	}
+
+	public TaskStatus getTaskStatus() {
+		return taskModel.getTaskStatus();
+	}
+
+	public TaskState getTaskState() {
+		return taskModel.getTaskStatus().getState();
+	}
+
+	public TaskTypes getTaskType() {
+		return taskModel.getTaskType();
+	}
+
+	public String getTaskId() {
+		return taskModel.getTaskId();
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
index 0cf1ae8..62c069a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
@@ -22,6 +22,7 @@ package org.apache.airavata.gfac.core.task;
 
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.TaskTypes;
 
 import java.util.Map;
 
@@ -54,4 +55,11 @@ public interface Task {
 	 * @return
 	 */
 	public TaskState recover(TaskContext taskContext) throws TaskException;
+
+	/**
+	 * Task type will be used to identify the task behaviour. eg : DATA_STAGING , JOB_SUBMISSION
+	 * @return type of this task object
+	 */
+	public TaskTypes getType();
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index fbb8f2f..fc68bb1 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -36,6 +36,7 @@ import org.apache.airavata.gfac.core.config.DataTransferTaskConfig;
 import org.apache.airavata.gfac.core.config.GFacYamlConfigruation;
 import org.apache.airavata.gfac.core.config.JobSubmitterTaskConfig;
 import org.apache.airavata.gfac.core.config.ResourceConfig;
+import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.monitor.JobMonitor;
 import org.apache.airavata.gfac.core.scheduler.HostScheduler;
 import org.apache.airavata.gfac.core.task.JobSubmissionTask;
@@ -290,5 +291,4 @@ public abstract class Factory {
 		}
 	}
 
-
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 9dab5ca..5732ca5 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -22,28 +22,47 @@
 package org.apache.airavata.gfac.impl;
 
 import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.gfac.core.GFacEngine;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.task.SSHEnvironmentSetupTask;
 import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
 import org.apache.airavata.model.application.io.DataType;
 import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ExpCatChildDataType;
 import org.apache.airavata.registry.cpi.ExperimentCatalog;
 import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
 import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.List;
 
 public class GFacEngineImpl implements GFacEngine {
 
+	private static final Logger log = LoggerFactory.getLogger(GFacEngineImpl.class);
+
 	public GFacEngineImpl() throws GFacException {
 
 	}
@@ -55,7 +74,8 @@ public class GFacEngineImpl implements GFacEngine {
 			ProcessContext processContext = new ProcessContext(processId, gatewayId, tokenId);
 			AppCatalog appCatalog = Factory.getDefaultAppCatalog();
 			ExperimentCatalog expCatalog = Factory.getDefaultExpCatalog();
-			processContext.setProcessModel((ProcessModel) expCatalog.get(ExperimentCatalogModelType.PROCESS, processId));
+			processContext.setProcessModel((ProcessModel) expCatalog.get(ExperimentCatalogModelType.PROCESS,
+					processId));
 			GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId);
 			processContext.setGatewayResourceProfile(gatewayProfile);
 			processContext.setComputeResourcePreference(appCatalog.getGatewayProfile().getComputeResourcePreference
@@ -73,23 +93,38 @@ public class GFacEngineImpl implements GFacEngine {
 	}
 
 	@Override
-	public void createTaskChain(ProcessContext processContext) throws GFacException {
+	public void executeProcess(ProcessContext processContext) throws GFacException {
+		TaskContext taskCtx = null;
+		List<TaskContext> taskChain = new ArrayList<>();
+		processContext.setProcessStatus(new ProcessStatus(ProcessState.CONFIGURING_WORKSPACE));
+		// Run all environment setup tasks
+		taskCtx = getEnvSetupTaskContext(processContext);
+		saveTaskModel(taskCtx);
+		publishTaskStatus(taskCtx);
+		SSHEnvironmentSetupTask envSetupTask = new SSHEnvironmentSetupTask();
+		executeTask(taskCtx, envSetupTask);
+		// execute process inputs
+		processContext.setProcessStatus(new ProcessStatus(ProcessState.INPUT_DATA_STAGING));
 		List<InputDataObjectType> processInputs = processContext.getProcessModel().getProcessInputs();
 		sortByInputOrder(processInputs);
-		List<Task> taskChain = new ArrayList<>();
 		if (processInputs != null) {
 			for (InputDataObjectType processInput : processInputs) {
 				DataType type = processInput.getType();
 				switch (type) {
 					case STDERR:
-						//
 						break;
 					case STDOUT:
-						//
 						break;
 					case URI:
-						// TODO : provide data staging data model
-						taskChain.add(Factory.getDataMovementTask(processContext.getDataMovementProtocol()));
+						try {
+							taskCtx = getDataStagingTaskContext(processContext, processInput);
+						} catch (TException e) {
+							throw new GFacException("Error while serializing data staging sub task model");
+						}
+						saveTaskModel(taskCtx);
+						publishTaskStatus(taskCtx);
+						Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
+						executeTask(taskCtx, dMoveTask);
 						break;
 					default:
 						// nothing to do
@@ -97,29 +132,106 @@ public class GFacEngineImpl implements GFacEngine {
 				}
 			}
 		}
-		taskChain.add(Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol()));
-		List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
-		for (OutputDataObjectType processOutput : processOutputs) {
-			DataType type = processOutput.getType();
-			switch (type) {
-				case STDERR:
-					break;
-				case STDOUT:
-					break;
-				case URI:
-					// TODO : Provide data staging data model
-					taskChain.add(Factory.getDataMovementTask(processContext.getDataMovementProtocol()));
-					break;
-			}
+		processContext.setProcessStatus(new ProcessStatus(ProcessState.EXECUTING));
+		taskCtx = getJobSubmissionTaskContext(processContext);
+		saveTaskModel(taskCtx);
+		publishTaskStatus(taskCtx);
+		JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol());
+		executeTask(taskCtx, jobSubmissionTask);
+		processContext.setTaskChain(taskChain);
+	}
+
+	private void executeTask(TaskContext taskCtx, Task task) throws GFacException {
+		try {
+			taskCtx.getTaskModel().setTaskStatus(new TaskStatus(TaskState.EXECUTING));
+			updateTaskStatus(taskCtx);
+			publishTaskStatus(taskCtx);
+			task.execute(taskCtx);
+			taskCtx.getTaskModel().setTaskStatus(new TaskStatus(TaskState.COMPLETED));
+			updateTaskStatus(taskCtx);
+			publishTaskStatus(taskCtx);
+		} catch (TaskException e) {
+			TaskStatus status = new TaskStatus(TaskState.FAILED);
+			status.setReason(taskCtx.getTaskType().toString() + " Task Failed to execute");
+			taskCtx.setTaskStatus(status);
+			updateTaskStatus(taskCtx);
 		}
 
-		processContext.setTaskChain(taskChain);
 	}
 
+	private TaskContext getJobSubmissionTaskContext(ProcessContext processContext) throws GFacException {
+		TaskContext taskCtx = new TaskContext();
+		taskCtx.setParentProcessContext(processContext);
 
-	@Override
-	public void executeProcess(ProcessContext processContext) throws GFacException {
+		TaskModel taskModel = new TaskModel();
+		taskModel.setParentProcessId(processContext.getProcessId());
+		taskModel.setCreationTime(new Date().getTime());
+		taskModel.setLastUpdateTime(taskModel.getCreationTime());
+		taskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
+		taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
+		taskCtx.setTaskModel(taskModel);
+		return taskCtx;
+	}
+
+	private TaskContext getDataStagingTaskContext(ProcessContext processContext, InputDataObjectType processInput) throws TException {
+		TaskContext taskCtx = new TaskContext();
+		taskCtx.setParentProcessContext(processContext);
+		// create new task model for this task
+		TaskModel taskModel = new TaskModel();
+		taskModel.setParentProcessId(processContext.getProcessId());
+		taskModel.setCreationTime(new Date().getTime());
+		taskModel.setLastUpdateTime(taskModel.getCreationTime());
+		taskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
+		taskModel.setTaskType(TaskTypes.DATA_STAGING);
+		// create data staging sub task model
+		DataStagingTaskModel submodel = new DataStagingTaskModel();
+		submodel.setSource(processInput.getValue());
+		submodel.setDestination(processContext.getWorkingDir());
+		taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+		taskCtx.setTaskModel(taskModel);
+		return taskCtx;
+	}
 
+	private void publishTaskStatus(TaskContext taskCtx) {
+		Factory.getLocalEventPublisher().publish(taskCtx);
+	}
+
+	/**
+	 * Persist task model
+	 * @param taskContext
+	 */
+	private void saveTaskModel(TaskContext taskContext) throws GFacException {
+		try {
+			TaskModel taskModel = taskContext.getTaskModel();
+			Factory.getDefaultExpCatalog().add(ExpCatChildDataType.TASK, taskModel, taskModel.getParentProcessId ());
+		} catch (RegistryException e) {
+			throw new GFacException("Error while saving task model", e);
+		}
+	}
+
+	private void updateTaskStatus(TaskContext taskContext) throws GFacException {
+		try {
+			TaskStatus taskStatus = taskContext.getTaskStatus();
+			Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.TASK_STATUS, taskStatus, taskContext
+					.getTaskModel().getTaskId());
+		} catch (RegistryException e) {
+			log.error("taskId: {}, taskState: {} :- Error while updating task staus", taskContext.getTaskId(),
+					taskContext.getTaskState().toString());
+			throw new GFacException("Error while updating task status", e);
+		}
+	}
+
+	private TaskContext getEnvSetupTaskContext(ProcessContext processContext) {
+		TaskContext taskCtx = new TaskContext();
+		taskCtx.setParentProcessContext(processContext);
+		TaskModel taskModel = new TaskModel();
+		taskModel.setParentProcessId(processContext.getProcessId());
+		taskModel.setCreationTime(new Date().getTime());
+		taskModel.setLastUpdateTime(taskModel.getCreationTime());
+		taskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
+		taskModel.setTaskType(TaskTypes.ENV_SETUP);
+		taskCtx.setTaskModel(taskModel);
+		return taskCtx;
 	}
 
 	@Override
@@ -129,6 +241,40 @@ public class GFacEngineImpl implements GFacEngine {
 
 	@Override
 	public void runProcessOutflow(ProcessContext processContext) throws GFacException {
+		TaskContext taskCtx = null;
+		TaskModel taskModel = null;
+		List<TaskContext> taskChain = new ArrayList<>();
+		List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
+		for (OutputDataObjectType processOutput : processOutputs) {
+			DataType type = processOutput.getType();
+			switch (type) {
+				case STDERR:
+					break;
+				case STDOUT:
+					break;
+				case URI:
+					// TODO : Provide data staging data model
+					try {
+						taskCtx = new TaskContext();
+						taskCtx.setParentProcessContext(processContext);
+
+						// create new task model for this task
+						taskModel = new TaskModel();
+						taskModel.setParentProcessId(processContext.getProcessId());
+						taskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
+						taskModel.setTaskType(TaskTypes.DATA_STAGING);
+						// create data staging sub task model
+						DataStagingTaskModel submodel = new DataStagingTaskModel();
+						submodel.setSource(processContext.getWorkingDir() + "/" + processOutput.getValue());
+						submodel.setDestination(processOutput.getValue());
+						taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+						taskChain.add(taskCtx);
+					} catch (TException e) {
+						throw new GFacException("Thift model to byte[] convertion issue", e);
+					}
+					break;
+			}
+		}
 
 	}
 
@@ -144,7 +290,6 @@ public class GFacEngineImpl implements GFacEngine {
 
 	/**
 	 * Sort input data type by input order.
-	 * @param processInputs
 	 */
 	private void sortByInputOrder(List<InputDataObjectType> processInputs) {
 		Collections.sort(processInputs, new Comparator<InputDataObjectType>() {
@@ -156,7 +301,4 @@ public class GFacEngineImpl implements GFacEngine {
 	}
 
 
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 334538d..51aa8f8 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -74,13 +74,12 @@ public class GFacWorker implements Runnable {
 		    try {
 			    switch (type) {
 				    case NEW:
-					    engine.createTaskChain(processContext);
 					    engine.executeProcess(processContext);
 					    break;
 				    case RECOVER:
 					    // recover the process
-					    engine.createTaskChain(processContext);
-					    engine.recoverProcess(processContext);
+//					    engine.recoverProcess(processContext);
+					    engine.executeProcess(processContext);
 					    break;
 				    case OUTFLOW:
 					    // run the outflow task

http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
index ed9ddca..a34997c 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
@@ -27,7 +27,7 @@ import org.apache.airavata.model.status.TaskState;
 
 import java.util.Map;
 
-public class AbstractSCPTask implements Task {
+public abstract class AbstractSCPTask implements Task {
 	protected static final int DEFAULT_SSH_PORT = 22;
 	protected String password;
 	protected String publicKeyPath;
@@ -48,16 +48,4 @@ public class AbstractSCPTask implements Task {
 		inputPath = propertyMap.get("inputPath");
 	}
 
-	@Override
-	public TaskState execute(TaskContext taskContext) throws TaskException {
-		return null;
-	}
-
-	@Override
-	public TaskState recover(TaskContext taskContext) throws TaskException {
-		return null;
-	}
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
index 29c5695..dbc1a97 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
@@ -25,6 +25,7 @@ import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.TaskTypes;
 
 import java.util.Map;
 
@@ -43,4 +44,9 @@ public class ForkJobSubmissionTask implements JobSubmissionTask {
     public TaskState recover(TaskContext taskContext) throws TaskException {
         return null;
     }
+
+	@Override
+	public TaskTypes getType() {
+		return TaskTypes.JOB_SUBMISSION;
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
index b51b788..ad7ab6d 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
@@ -36,6 +36,7 @@ import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.JobState;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.TaskTypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -173,4 +174,9 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
             }
         }
     }
+
+	@Override
+	public TaskTypes getType() {
+		return TaskTypes.JOB_SUBMISSION;
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
index 8c649a2..76be7a0 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
@@ -84,5 +84,10 @@ public class SCPInputDataStageTask extends AbstractSCPTask {
 		return null;
 	}
 
+	@Override
+	public TaskTypes getType() {
+		return TaskTypes.DATA_STAGING;
+	}
+
 
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
index 30619c0..72e071c 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
@@ -77,4 +77,9 @@ public class SCPOutputDataStatgeTask extends AbstractSCPTask {
 	public TaskState recover(TaskContext taskContext) throws TaskException {
 		return null;
 	}
+
+	@Override
+	public TaskTypes getType() {
+		return TaskTypes.DATA_STAGING;
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
index 4b93a80..e541644 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
@@ -26,6 +26,7 @@ import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.TaskTypes;
 
 import java.util.Map;
 
@@ -52,4 +53,9 @@ public class SSHEnvironmentSetupTask implements Task {
 	public TaskState recover(TaskContext taskContext) throws TaskException {
 		return execute(taskContext);
 	}
+
+	@Override
+	public TaskTypes getType() {
+		return TaskTypes.ENV_SETUP;
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
index 9873de5..24a9238 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
@@ -34,6 +34,7 @@ import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.JobState;
 import org.apache.airavata.model.status.JobStatus;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -160,4 +161,9 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
                 return TaskState.COMPLETED;
             }
     }
+
+	@Override
+	public TaskTypes getType() {
+		return TaskTypes.JOB_SUBMISSION;
+	}
 }