You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/07/09 21:39:29 UTC
[2/2] airavata git commit: Implemented basic task execution steps
Implemented basic task execution steps
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/8225ab55
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/8225ab55
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/8225ab55
Branch: refs/heads/master
Commit: 8225ab5563c5203eb6028074c64bc4e358d0d7b1
Parents: c857f82
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Thu Jul 9 15:39:20 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Thu Jul 9 15:39:20 2015 -0400
----------------------------------------------------------------------
.../apache/airavata/gfac/core/GFacEngine.java | 2 -
.../gfac/core/context/ProcessContext.java | 13 +-
.../airavata/gfac/core/context/TaskContext.java | 26 +++
.../apache/airavata/gfac/core/task/Task.java | 8 +
.../org/apache/airavata/gfac/impl/Factory.java | 2 +-
.../airavata/gfac/impl/GFacEngineImpl.java | 198 ++++++++++++++++---
.../apache/airavata/gfac/impl/GFacWorker.java | 5 +-
.../gfac/impl/task/AbstractSCPTask.java | 14 +-
.../gfac/impl/task/ForkJobSubmissionTask.java | 6 +
.../gfac/impl/task/LocalJobSubmissionTask.java | 6 +
.../gfac/impl/task/SCPInputDataStageTask.java | 5 +
.../gfac/impl/task/SCPOutputDataStatgeTask.java | 5 +
.../gfac/impl/task/SSHEnvironmentSetupTask.java | 6 +
.../gfac/impl/task/SSHJobSubmissionTask.java | 6 +
14 files changed, 252 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
index 8708625..09158fb 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
@@ -27,8 +27,6 @@ public interface GFacEngine {
public ProcessContext populateProcessContext(String processId, String gatewayId, String tokenId) throws GFacException;
- public void createTaskChain(ProcessContext processContext) throws GFacException;
-
public void executeProcess(ProcessContext processContext) throws GFacException ;
public void recoverProcess(ProcessContext processContext) throws GFacException ;
http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index a3796d4..fe76bf5 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -34,6 +34,7 @@ import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfil
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.curator.framework.CuratorFramework;
@@ -54,7 +55,7 @@ public class ProcessContext {
private String workingDir;
private String inputDir;
private String outputDir;
- private List<Task> taskChain;
+ private List<TaskContext> taskChain;
private GatewayResourceProfile gatewayResourceProfile;
private ComputeResourceDescription computeResourceDescription;
private ApplicationDeploymentDescription applicationDeploymentDescription;
@@ -136,11 +137,11 @@ public class ProcessContext {
this.workingDir = workingDir;
}
- public List<Task> getTaskChain() {
+ public List<TaskContext> getTaskChain() {
return taskChain;
}
- public void setTaskChain(List<Task> taskChain) {
+ public void setTaskChain(List<TaskContext> taskChain) {
this.taskChain = taskChain;
}
@@ -266,4 +267,10 @@ public class ProcessContext {
return processModel.getProcessStatus().getState();
}
+ public void setProcessStatus(ProcessStatus status) {
+ if (status != null) {
+ processModel.setProcessStatus(status);
+ // TODO publish process status change.
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
index 477c1ae..1be5142 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
@@ -20,7 +20,10 @@
*/
package org.apache.airavata.gfac.core.context;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.model.task.TaskTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,4 +49,27 @@ public class TaskContext {
this.parentProcessContext = parentProcessContext;
}
+ public String getWorkingDir() {
+ return getParentProcessContext().getWorkingDir();
+ }
+
+ public void setTaskStatus(TaskStatus taskStatus) {
+ taskModel.setTaskStatus(taskStatus);
+ }
+
+ public TaskStatus getTaskStatus() {
+ return taskModel.getTaskStatus();
+ }
+
+ public TaskState getTaskState() {
+ return taskModel.getTaskStatus().getState();
+ }
+
+ public TaskTypes getTaskType() {
+ return taskModel.getTaskType();
+ }
+
+ public String getTaskId() {
+ return taskModel.getTaskId();
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
index 0cf1ae8..62c069a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
@@ -22,6 +22,7 @@ package org.apache.airavata.gfac.core.task;
import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.TaskTypes;
import java.util.Map;
@@ -54,4 +55,11 @@ public interface Task {
* @return
*/
public TaskState recover(TaskContext taskContext) throws TaskException;
+
+ /**
+ * Task type will be used to identify the task behaviour. eg : DATA_STAGING , JOB_SUBMISSION
+ * @return type of this task object
+ */
+ public TaskTypes getType();
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index fbb8f2f..fc68bb1 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -36,6 +36,7 @@ import org.apache.airavata.gfac.core.config.DataTransferTaskConfig;
import org.apache.airavata.gfac.core.config.GFacYamlConfigruation;
import org.apache.airavata.gfac.core.config.JobSubmitterTaskConfig;
import org.apache.airavata.gfac.core.config.ResourceConfig;
+import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.monitor.JobMonitor;
import org.apache.airavata.gfac.core.scheduler.HostScheduler;
import org.apache.airavata.gfac.core.task.JobSubmissionTask;
@@ -290,5 +291,4 @@ public abstract class Factory {
}
}
-
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 9dab5ca..5732ca5 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -22,28 +22,47 @@
package org.apache.airavata.gfac.impl;
import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.gfac.core.GFacEngine;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.task.SSHEnvironmentSetupTask;
import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
import org.apache.airavata.model.application.io.DataType;
import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.airavata.model.task.TaskModel;
+import org.apache.airavata.model.task.TaskTypes;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ExpCatChildDataType;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Date;
import java.util.List;
public class GFacEngineImpl implements GFacEngine {
+ private static final Logger log = LoggerFactory.getLogger(GFacEngineImpl.class);
+
public GFacEngineImpl() throws GFacException {
}
@@ -55,7 +74,8 @@ public class GFacEngineImpl implements GFacEngine {
ProcessContext processContext = new ProcessContext(processId, gatewayId, tokenId);
AppCatalog appCatalog = Factory.getDefaultAppCatalog();
ExperimentCatalog expCatalog = Factory.getDefaultExpCatalog();
- processContext.setProcessModel((ProcessModel) expCatalog.get(ExperimentCatalogModelType.PROCESS, processId));
+ processContext.setProcessModel((ProcessModel) expCatalog.get(ExperimentCatalogModelType.PROCESS,
+ processId));
GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId);
processContext.setGatewayResourceProfile(gatewayProfile);
processContext.setComputeResourcePreference(appCatalog.getGatewayProfile().getComputeResourcePreference
@@ -73,23 +93,38 @@ public class GFacEngineImpl implements GFacEngine {
}
@Override
- public void createTaskChain(ProcessContext processContext) throws GFacException {
+ public void executeProcess(ProcessContext processContext) throws GFacException {
+ TaskContext taskCtx = null;
+ List<TaskContext> taskChain = new ArrayList<>();
+ processContext.setProcessStatus(new ProcessStatus(ProcessState.CONFIGURING_WORKSPACE));
+ // Run all environment setup tasks
+ taskCtx = getEnvSetupTaskContext(processContext);
+ saveTaskModel(taskCtx);
+ publishTaskStatus(taskCtx);
+ SSHEnvironmentSetupTask envSetupTask = new SSHEnvironmentSetupTask();
+ executeTask(taskCtx, envSetupTask);
+ // execute process inputs
+ processContext.setProcessStatus(new ProcessStatus(ProcessState.INPUT_DATA_STAGING));
List<InputDataObjectType> processInputs = processContext.getProcessModel().getProcessInputs();
sortByInputOrder(processInputs);
- List<Task> taskChain = new ArrayList<>();
if (processInputs != null) {
for (InputDataObjectType processInput : processInputs) {
DataType type = processInput.getType();
switch (type) {
case STDERR:
- //
break;
case STDOUT:
- //
break;
case URI:
- // TODO : provide data staging data model
- taskChain.add(Factory.getDataMovementTask(processContext.getDataMovementProtocol()));
+ try {
+ taskCtx = getDataStagingTaskContext(processContext, processInput);
+ } catch (TException e) {
+ throw new GFacException("Error while serializing data staging sub task model");
+ }
+ saveTaskModel(taskCtx);
+ publishTaskStatus(taskCtx);
+ Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
+ executeTask(taskCtx, dMoveTask);
break;
default:
// nothing to do
@@ -97,29 +132,106 @@ public class GFacEngineImpl implements GFacEngine {
}
}
}
- taskChain.add(Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol()));
- List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
- for (OutputDataObjectType processOutput : processOutputs) {
- DataType type = processOutput.getType();
- switch (type) {
- case STDERR:
- break;
- case STDOUT:
- break;
- case URI:
- // TODO : Provide data staging data model
- taskChain.add(Factory.getDataMovementTask(processContext.getDataMovementProtocol()));
- break;
- }
+ processContext.setProcessStatus(new ProcessStatus(ProcessState.EXECUTING));
+ taskCtx = getJobSubmissionTaskContext(processContext);
+ saveTaskModel(taskCtx);
+ publishTaskStatus(taskCtx);
+ JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol());
+ executeTask(taskCtx, jobSubmissionTask);
+ processContext.setTaskChain(taskChain);
+ }
+
+ private void executeTask(TaskContext taskCtx, Task task) throws GFacException {
+ try {
+ taskCtx.getTaskModel().setTaskStatus(new TaskStatus(TaskState.EXECUTING));
+ updateTaskStatus(taskCtx);
+ publishTaskStatus(taskCtx);
+ task.execute(taskCtx);
+ taskCtx.getTaskModel().setTaskStatus(new TaskStatus(TaskState.COMPLETED));
+ updateTaskStatus(taskCtx);
+ publishTaskStatus(taskCtx);
+ } catch (TaskException e) {
+ TaskStatus status = new TaskStatus(TaskState.FAILED);
+ status.setReason(taskCtx.getTaskType().toString() + " Task Failed to execute");
+ taskCtx.setTaskStatus(status);
+ updateTaskStatus(taskCtx);
}
- processContext.setTaskChain(taskChain);
}
+ private TaskContext getJobSubmissionTaskContext(ProcessContext processContext) throws GFacException {
+ TaskContext taskCtx = new TaskContext();
+ taskCtx.setParentProcessContext(processContext);
- @Override
- public void executeProcess(ProcessContext processContext) throws GFacException {
+ TaskModel taskModel = new TaskModel();
+ taskModel.setParentProcessId(processContext.getProcessId());
+ taskModel.setCreationTime(new Date().getTime());
+ taskModel.setLastUpdateTime(taskModel.getCreationTime());
+ taskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
+ taskModel.setTaskType(TaskTypes.JOB_SUBMISSION);
+ taskCtx.setTaskModel(taskModel);
+ return taskCtx;
+ }
+
+ private TaskContext getDataStagingTaskContext(ProcessContext processContext, InputDataObjectType processInput) throws TException {
+ TaskContext taskCtx = new TaskContext();
+ taskCtx.setParentProcessContext(processContext);
+ // create new task model for this task
+ TaskModel taskModel = new TaskModel();
+ taskModel.setParentProcessId(processContext.getProcessId());
+ taskModel.setCreationTime(new Date().getTime());
+ taskModel.setLastUpdateTime(taskModel.getCreationTime());
+ taskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
+ taskModel.setTaskType(TaskTypes.DATA_STAGING);
+ // create data staging sub task model
+ DataStagingTaskModel submodel = new DataStagingTaskModel();
+ submodel.setSource(processInput.getValue());
+ submodel.setDestination(processContext.getWorkingDir());
+ taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+ taskCtx.setTaskModel(taskModel);
+ return taskCtx;
+ }
+ private void publishTaskStatus(TaskContext taskCtx) {
+ Factory.getLocalEventPublisher().publish(taskCtx);
+ }
+
+ /**
+ * Persist task model
+ * @param taskContext
+ */
+ private void saveTaskModel(TaskContext taskContext) throws GFacException {
+ try {
+ TaskModel taskModel = taskContext.getTaskModel();
+ Factory.getDefaultExpCatalog().add(ExpCatChildDataType.TASK, taskModel, taskModel.getParentProcessId ());
+ } catch (RegistryException e) {
+ throw new GFacException("Error while saving task model", e);
+ }
+ }
+
+ private void updateTaskStatus(TaskContext taskContext) throws GFacException {
+ try {
+ TaskStatus taskStatus = taskContext.getTaskStatus();
+ Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.TASK_STATUS, taskStatus, taskContext
+ .getTaskModel().getTaskId());
+ } catch (RegistryException e) {
+ log.error("taskId: {}, taskState: {} :- Error while updating task staus", taskContext.getTaskId(),
+ taskContext.getTaskState().toString());
+ throw new GFacException("Error while updating task status", e);
+ }
+ }
+
+ private TaskContext getEnvSetupTaskContext(ProcessContext processContext) {
+ TaskContext taskCtx = new TaskContext();
+ taskCtx.setParentProcessContext(processContext);
+ TaskModel taskModel = new TaskModel();
+ taskModel.setParentProcessId(processContext.getProcessId());
+ taskModel.setCreationTime(new Date().getTime());
+ taskModel.setLastUpdateTime(taskModel.getCreationTime());
+ taskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
+ taskModel.setTaskType(TaskTypes.ENV_SETUP);
+ taskCtx.setTaskModel(taskModel);
+ return taskCtx;
}
@Override
@@ -129,6 +241,40 @@ public class GFacEngineImpl implements GFacEngine {
@Override
public void runProcessOutflow(ProcessContext processContext) throws GFacException {
+ TaskContext taskCtx = null;
+ TaskModel taskModel = null;
+ List<TaskContext> taskChain = new ArrayList<>();
+ List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
+ for (OutputDataObjectType processOutput : processOutputs) {
+ DataType type = processOutput.getType();
+ switch (type) {
+ case STDERR:
+ break;
+ case STDOUT:
+ break;
+ case URI:
+ // TODO : Provide data staging data model
+ try {
+ taskCtx = new TaskContext();
+ taskCtx.setParentProcessContext(processContext);
+
+ // create new task model for this task
+ taskModel = new TaskModel();
+ taskModel.setParentProcessId(processContext.getProcessId());
+ taskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
+ taskModel.setTaskType(TaskTypes.DATA_STAGING);
+ // create data staging sub task model
+ DataStagingTaskModel submodel = new DataStagingTaskModel();
+ submodel.setSource(processContext.getWorkingDir() + "/" + processOutput.getValue());
+ submodel.setDestination(processOutput.getValue());
+ taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+ taskChain.add(taskCtx);
+ } catch (TException e) {
+ throw new GFacException("Thift model to byte[] convertion issue", e);
+ }
+ break;
+ }
+ }
}
@@ -144,7 +290,6 @@ public class GFacEngineImpl implements GFacEngine {
/**
* Sort input data type by input order.
- * @param processInputs
*/
private void sortByInputOrder(List<InputDataObjectType> processInputs) {
Collections.sort(processInputs, new Comparator<InputDataObjectType>() {
@@ -156,7 +301,4 @@ public class GFacEngineImpl implements GFacEngine {
}
-
-
-
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 334538d..51aa8f8 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -74,13 +74,12 @@ public class GFacWorker implements Runnable {
try {
switch (type) {
case NEW:
- engine.createTaskChain(processContext);
engine.executeProcess(processContext);
break;
case RECOVER:
// recover the process
- engine.createTaskChain(processContext);
- engine.recoverProcess(processContext);
+// engine.recoverProcess(processContext);
+ engine.executeProcess(processContext);
break;
case OUTFLOW:
// run the outflow task
http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
index ed9ddca..a34997c 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
@@ -27,7 +27,7 @@ import org.apache.airavata.model.status.TaskState;
import java.util.Map;
-public class AbstractSCPTask implements Task {
+public abstract class AbstractSCPTask implements Task {
protected static final int DEFAULT_SSH_PORT = 22;
protected String password;
protected String publicKeyPath;
@@ -48,16 +48,4 @@ public class AbstractSCPTask implements Task {
inputPath = propertyMap.get("inputPath");
}
- @Override
- public TaskState execute(TaskContext taskContext) throws TaskException {
- return null;
- }
-
- @Override
- public TaskState recover(TaskContext taskContext) throws TaskException {
- return null;
- }
-
-
-
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
index 29c5695..dbc1a97 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
@@ -25,6 +25,7 @@ import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.task.JobSubmissionTask;
import org.apache.airavata.gfac.core.task.TaskException;
import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.TaskTypes;
import java.util.Map;
@@ -43,4 +44,9 @@ public class ForkJobSubmissionTask implements JobSubmissionTask {
public TaskState recover(TaskContext taskContext) throws TaskException {
return null;
}
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.JOB_SUBMISSION;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
index b51b788..ad7ab6d 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
@@ -36,6 +36,7 @@ import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.JobState;
import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.TaskTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -173,4 +174,9 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
}
}
}
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.JOB_SUBMISSION;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
index 8c649a2..76be7a0 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
@@ -84,5 +84,10 @@ public class SCPInputDataStageTask extends AbstractSCPTask {
return null;
}
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.DATA_STAGING;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
index 30619c0..72e071c 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
@@ -77,4 +77,9 @@ public class SCPOutputDataStatgeTask extends AbstractSCPTask {
public TaskState recover(TaskContext taskContext) throws TaskException {
return null;
}
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.DATA_STAGING;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
index 4b93a80..e541644 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
@@ -26,6 +26,7 @@ import org.apache.airavata.gfac.core.cluster.RemoteCluster;
import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.task.Task;
import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.TaskTypes;
import java.util.Map;
@@ -52,4 +53,9 @@ public class SSHEnvironmentSetupTask implements Task {
public TaskState recover(TaskContext taskContext) throws TaskException {
return execute(taskContext);
}
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.ENV_SETUP;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8225ab55/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
index 9873de5..24a9238 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
@@ -34,6 +34,7 @@ import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.JobState;
import org.apache.airavata.model.status.JobStatus;
import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.TaskTypes;
import org.apache.airavata.registry.cpi.AppCatalogException;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
@@ -160,4 +161,9 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
return TaskState.COMPLETED;
}
}
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.JOB_SUBMISSION;
+ }
}