You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/07/16 21:57:15 UTC
airavata git commit: Send acknowledgement to processed processes,
Handle task status after execute each task and handle failed
scenarios.
Repository: airavata
Updated Branches:
refs/heads/master e290cfe17 -> b7e914ee3
Send acknowledgement to processed processes, Handle task status after execute each task and handle failed scenarios.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b7e914ee
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b7e914ee
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b7e914ee
Branch: refs/heads/master
Commit: b7e914ee3c4c90b77676bd2c217bec3b3183dc76
Parents: e290cfe
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Thu Jul 16 15:57:12 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Thu Jul 16 15:57:12 2015 -0400
----------------------------------------------------------------------
.../apache/airavata/gfac/core/GFacUtils.java | 45 ++--
.../gfac/core/context/ProcessContext.java | 17 +-
.../airavata/gfac/core/context/TaskContext.java | 3 +
.../apache/airavata/gfac/core/task/Task.java | 11 +-
.../org/apache/airavata/gfac/impl/Factory.java | 9 +
.../airavata/gfac/impl/GFacEngineImpl.java | 169 ++++++++-------
.../apache/airavata/gfac/impl/GFacWorker.java | 28 +++
.../gfac/impl/task/AbstractSCPTask.java | 2 -
.../gfac/impl/task/ForkJobSubmissionTask.java | 5 +-
.../gfac/impl/task/LocalJobSubmissionTask.java | 15 +-
.../gfac/impl/task/SCPDataStageTask.java | 81 ++++---
.../gfac/impl/task/SCPInputDataStageTask.java | 14 +-
.../gfac/impl/task/SCPOutputDataStatgeTask.java | 10 +-
.../gfac/impl/task/SSHEnvironmentSetupTask.java | 25 ++-
.../gfac/impl/task/SSHJobSubmissionTask.java | 212 ++++++++++++-------
.../airavata/gfac/server/GfacServerHandler.java | 6 +-
16 files changed, 413 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index b00240b..af10218 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -266,9 +266,7 @@ public class GFacUtils {
// first we save job jobModel to the registry for sa and then save the job status.
ProcessContext processContext = taskContext.getParentProcessContext();
ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
- TaskStatus status = new TaskStatus();
- status.setState(state);
- taskContext.setTaskStatus(status);
+ TaskStatus status = taskContext.getTaskStatus();
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
experimentCatalog.add(ExpCatChildDataType.TASK_STATUS, status, taskContext.getTaskId());
TaskIdentifier identifier = new TaskIdentifier(taskContext.getTaskId(),
@@ -286,20 +284,17 @@ public class GFacUtils {
}
}
- public static void saveProcessStatus(ProcessContext processContext,
- ProcessState state) throws GFacException {
+ public static void saveAndPublishProcessStatus(ProcessContext processContext) throws GFacException {
try {
// first we save job jobModel to the registry for sa and then save the job status.
ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
- ProcessStatus status = new ProcessStatus();
- status.setState(state);
- processContext.getProcessModel().setProcessStatus(status);
+ ProcessStatus status = processContext.getProcessStatus();
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
experimentCatalog.add(ExpCatChildDataType.PROCESS_STATUS, status, processContext.getProcessId());
ProcessIdentifier identifier = new ProcessIdentifier(processContext.getProcessId(),
processContext.getProcessModel().getExperimentId(),
processContext.getGatewayId());
- ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(state, identifier);
+ ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier);
MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
AiravataUtils.getId(MessageType.PROCESS.name()), processContext.getGatewayId());
msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
@@ -1094,27 +1089,35 @@ public class GFacUtils {
return GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + experimentId;
}
- public static void createExperimentNode(CuratorFramework curatorClient, String gfacServerName, String
+ public static void createProcessZKNode(CuratorFramework curatorClient, String gfacServerName, String
processId, long deliveryTag, String token) throws Exception {
- // create /experiments/processId node and set data - serverName, add redelivery listener
- String experimentPath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
- ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath);
- curatorClient.setData().withVersion(-1).forPath(experimentPath, gfacServerName.getBytes());
- curatorClient.getData().usingWatcher(new RedeliveryRequestWatcher()).forPath(experimentPath);
-
- // create /experiments/processId/deliveryTag node and set data - deliveryTag
- String deliveryTagPath = ZKPaths.makePath(experimentPath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
+ // TODO - To handle multiple processes per experiment, need to create a /experiment/{expId}/{processId} node
+ // create /experiments/{processId} node and set data - serverName, add redelivery listener
+ String zkProcessNodePath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
+ ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessNodePath);
+ curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes());
+ curatorClient.getData().usingWatcher(new RedeliveryRequestWatcher()).forPath(zkProcessNodePath);
+
+ // create /experiments/{processId}/deliveryTag node and set data - deliveryTag
+ String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), deliveryTagPath);
curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag));
- // create /experiments/processId/token node and set data - token
+ // create /experiments/{processId}/token node and set data - token
String tokenNodePath = ZKPaths.makePath(processId, GFacConstants.ZOOKEEPER_TOKEN_NODE);
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), tokenNodePath);
curatorClient.setData().withVersion(-1).forPath(tokenNodePath, token.getBytes());
- // create /experiments/processId/cancelListener node and set watcher for data changes
- String cancelListenerNode = ZKPaths.makePath(experimentPath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+ // create /experiments/{processId}/cancelListener node and set watcher for data changes
+ String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), cancelListenerNode);
curatorClient.getData().usingWatcher(new CancelRequestWatcher()).forPath(cancelListenerNode);
}
+
+ public static long getProcessDeliveryTag(CuratorFramework curatorClient, String processId) throws Exception {
+ String deliveryTagPath = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + GFacConstants
+ .ZOOKEEPER_DELIVERYTAG_NODE;
+ byte[] bytes = curatorClient.getData().forPath(deliveryTagPath);
+ return GFacUtils.bytesToLong(bytes);
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index dc8dace..9e1ac06 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -21,7 +21,7 @@
package org.apache.airavata.gfac.core.context;
-import org.apache.airavata.common.utils.LocalEventPublisher;
+import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.cluster.RemoteCluster;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
@@ -40,11 +40,15 @@ import org.apache.airavata.model.status.ProcessStatus;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
public class ProcessContext {
+
+ private static final Logger log = LoggerFactory.getLogger(ProcessContext.class);
// process model
private ExperimentCatalog experimentCatalog;
private AppCatalog appCatalog;
@@ -289,11 +293,16 @@ public class ProcessContext {
public void setProcessStatus(ProcessStatus status) {
if (status != null) {
+ log.info("expId: {}, processId: {} :- Status changed {} -> {}", getExperimentId(), processId,
+ getProcessState().name(), status.getState().name());
processModel.setProcessStatus(status);
- // TODO publish process status change.
}
}
+ public ProcessStatus getProcessStatus(){
+ return processModel.getProcessStatus();
+ }
+
public String getComputeResourceId() {
return getComputeResourceDescription().getComputeResourceId();
}
@@ -321,4 +330,8 @@ public class ProcessContext {
public void setLocalWorkingDir(String localWorkingDir) {
this.localWorkingDir = localWorkingDir;
}
+
+ public String getExperimentId() {
+ return processModel.getExperimentId();
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
index 95d2fb9..597fd2e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
@@ -54,6 +54,9 @@ public class TaskContext {
}
public void setTaskStatus(TaskStatus taskStatus) {
+ log.info("expId: {}, processId: {}, taskId: {}, type: {}:- Status changed {} -> {}", parentProcessContext
+ .getExperimentId(), parentProcessContext.getProcessId(), getTaskId(), getTaskType().name(),
+ getTaskState().name(), taskStatus .getState().name());
taskModel.setTaskStatus(taskStatus);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
index 62c069a..f4eec85 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
@@ -22,6 +22,7 @@ package org.apache.airavata.gfac.core.task;
import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.TaskTypes;
import java.util.Map;
@@ -42,19 +43,17 @@ public interface Task {
* This method will be called at the first time of task chain execution. This method should called before recover
* method. For a given task chain execute method only call one time. recover method may be called more than once.
* @param taskContext
- * @throws TaskException
- * @return
+ * @return completed task status if success otherwise failed task status.
*/
- public TaskState execute(TaskContext taskContext) throws TaskException;
+ public TaskStatus execute(TaskContext taskContext);
/**
* This methond will be invoked at recover path.Before this method is invoked, execute method should be invoked.
* This method may be called zero or few time in a process chain.
* @param taskContext
- * @throws TaskException
- * @return
+ * @return completed task status if success otherwise failed task status.
*/
- public TaskState recover(TaskContext taskContext) throws TaskException;
+ public TaskStatus recover(TaskContext taskContext);
/**
* Task type will be used to identify the task behaviour. eg : DATA_STAGING , JOB_SUBMISSION
http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index 51db6f3..a5fa5ed 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -52,6 +52,7 @@ import org.apache.airavata.gfac.impl.job.UGEJobConfiguration;
import org.apache.airavata.gfac.impl.job.UGEOutputParser;
import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer;
import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
@@ -101,6 +102,7 @@ public abstract class Factory {
private static Map<DataMovementProtocol, Task> dataMovementTask = new HashMap<>();
private static Map<ResourceJobManagerType, ResourceConfig> resources = new HashMap<>();
private static Map<MonitorMode, JobMonitor> jobMonitorServices = new HashMap<>();
+ private static RabbitMQProcessLaunchConsumer processLaunchConsumer;
public static GFacEngine getGFacEngine() throws GFacException {
if (engine == null) {
@@ -145,6 +147,13 @@ public abstract class Factory {
return curatorClient;
}
+ public static RabbitMQProcessLaunchConsumer getProcessLaunchConsumer() throws AiravataException {
+ if (processLaunchConsumer == null) {
+ processLaunchConsumer = new RabbitMQProcessLaunchConsumer();
+ }
+ return processLaunchConsumer;
+ }
+
public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws GFacException {
ResourceConfig resourceConfig = Factory.getResourceConfig(resourceJobManager.getResourceJobManagerType());
OutputParser outputParser;
http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index c7eba99..a4c8381 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -124,14 +124,23 @@ public class GFacEngineImpl implements GFacEngine {
TaskContext taskCtx = null;
List<TaskContext> taskChain = new ArrayList<>();
processContext.setProcessStatus(new ProcessStatus(ProcessState.CONFIGURING_WORKSPACE));
+ GFacUtils.saveAndPublishProcessStatus(processContext);
// Run all environment setup tasks
taskCtx = getEnvSetupTaskContext(processContext);
saveTaskModel(taskCtx);
GFacUtils.saveAndPublishTaskStatus(taskCtx);
SSHEnvironmentSetupTask envSetupTask = new SSHEnvironmentSetupTask();
- executeTask(taskCtx, envSetupTask);
+ TaskStatus taskStatus = executeTask(taskCtx, envSetupTask);
+ if (taskStatus.getState() == TaskState.FAILED) {
+ log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " +
+ "reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx
+ .getParentProcessContext().getProcessId(), taskCtx.getTaskId(), envSetupTask.getType
+ ().name(), taskStatus.getReason());
+ throw new GFacException("Error while environment setup");
+ }
// execute process inputs
processContext.setProcessStatus(new ProcessStatus(ProcessState.INPUT_DATA_STAGING));
+ GFacUtils.saveAndPublishProcessStatus(processContext);
List<InputDataObjectType> processInputs = processContext.getProcessModel().getProcessInputs();
sortByInputOrder(processInputs);
if (processInputs != null) {
@@ -151,7 +160,14 @@ public class GFacEngineImpl implements GFacEngine {
saveTaskModel(taskCtx);
GFacUtils.saveAndPublishTaskStatus(taskCtx);
Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
- executeTask(taskCtx, dMoveTask);
+ taskStatus = executeTask(taskCtx, dMoveTask);
+ if (taskStatus.getState() == TaskState.FAILED) {
+ log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " +
+ "reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx
+ .getParentProcessContext().getProcessId(), taskCtx.getTaskId(), dMoveTask.getType
+ ().name(), taskStatus.getReason());
+ throw new GFacException("Error while staging input data");
+ }
break;
default:
// nothing to do
@@ -160,29 +176,87 @@ public class GFacEngineImpl implements GFacEngine {
}
}
processContext.setProcessStatus(new ProcessStatus(ProcessState.EXECUTING));
+ GFacUtils.saveAndPublishProcessStatus(processContext);
taskCtx = getJobSubmissionTaskContext(processContext);
+ saveTaskModel(taskCtx);
GFacUtils.saveAndPublishTaskStatus(taskCtx);
JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol());
- executeTask(taskCtx, jobSubmissionTask);
+ taskStatus = executeTask(taskCtx, jobSubmissionTask);
+ if (taskStatus.getState() == TaskState.FAILED) {
+ throw new GFacException("Job submission task failed");
+ }
processContext.setTaskChain(taskChain);
}
- private void executeTask(TaskContext taskCtx, Task task) throws GFacException {
- try {
- taskCtx.setTaskStatus(new TaskStatus(TaskState.EXECUTING));
- GFacUtils.saveAndPublishTaskStatus(taskCtx);
- task.execute(taskCtx);
- taskCtx.setTaskStatus(new TaskStatus(TaskState.COMPLETED));
- GFacUtils.saveAndPublishTaskStatus(taskCtx);
- } catch (TaskException e) {
- TaskStatus status = new TaskStatus(TaskState.FAILED);
- status.setReason(taskCtx.getTaskType().toString() + " Task Failed to execute");
- taskCtx.setTaskStatus(status);
- GFacUtils.saveAndPublishTaskStatus(taskCtx);
+
+ @Override
+ public void recoverProcess(ProcessContext processContext) throws GFacException {
+
+ }
+
+ @Override
+ public void runProcessOutflow(ProcessContext processContext) throws GFacException {
+ TaskContext taskCtx = null;
+ processContext.setProcessStatus(new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING));
+ GFacUtils.saveAndPublishProcessStatus(processContext);
+ List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
+ for (OutputDataObjectType processOutput : processOutputs) {
+ DataType type = processOutput.getType();
+ switch (type) {
+ case STDERR:
+ break;
+ case STDOUT:
+ break;
+ case URI:
+ try {
+ taskCtx = getDataStagingTaskContext(processContext, processOutput);
+ } catch (TException e) {
+ throw new GFacException("Thrift model to byte[] convertion issue", e);
+ }
+ File localWorkingdir = new File(taskCtx.getLocalWorkingDir());
+ localWorkingdir.mkdirs(); // make local dir if not exist
+ saveTaskModel(taskCtx);
+ GFacUtils.saveAndPublishTaskStatus(taskCtx);
+ Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
+ TaskStatus taskStatus = executeTask(taskCtx, dMoveTask);
+ if (taskStatus.getState() == TaskState.FAILED) {
+ log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " +
+ "reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx
+ .getParentProcessContext().getProcessId(), taskCtx.getTaskId(), dMoveTask.getType
+ ().name(), taskStatus.getReason());
+ throw new GFacException("Error while staging input data");
+ }
+ break;
+ default:
+ // nothing to do
+ break;
+ }
}
+ processContext.setProcessStatus(new ProcessStatus(ProcessState.POST_PROCESSING));
+ GFacUtils.saveAndPublishProcessStatus(processContext);
+// taskCtx = getEnvCleanupTaskContext(processContext);
+
+ }
+
+ @Override
+ public void recoverProcessOutflow(ProcessContext processContext) throws GFacException {
+
+ }
+
+ @Override
+ public void cancelProcess() throws GFacException {
}
+ private TaskStatus executeTask(TaskContext taskCtx, Task task) throws GFacException {
+ taskCtx.setTaskStatus(new TaskStatus(TaskState.EXECUTING));
+ GFacUtils.saveAndPublishTaskStatus(taskCtx);
+ TaskStatus taskStatus = task.execute(taskCtx);
+ taskCtx.setTaskStatus(taskStatus);
+ GFacUtils.saveAndPublishTaskStatus(taskCtx);
+ return taskCtx.getTaskStatus();
+ }
+
private TaskContext getJobSubmissionTaskContext(ProcessContext processContext) throws GFacException {
TaskContext taskCtx = new TaskContext();
taskCtx.setParentProcessContext(processContext);
@@ -197,7 +271,8 @@ public class GFacEngineImpl implements GFacEngine {
return taskCtx;
}
- private TaskContext getDataStagingTaskContext(ProcessContext processContext, InputDataObjectType processInput) throws TException {
+ private TaskContext getDataStagingTaskContext(ProcessContext processContext, InputDataObjectType processInput)
+ throws TException {
TaskContext taskCtx = new TaskContext();
taskCtx.setParentProcessContext(processContext);
// create new task model for this task
@@ -210,13 +285,15 @@ public class GFacEngineImpl implements GFacEngine {
// create data staging sub task model
DataStagingTaskModel submodel = new DataStagingTaskModel();
submodel.setSource(processInput.getValue());
- submodel.setDestination(processContext.getDataMovementProtocol().name() + ":" + processContext.getWorkingDir());
+ submodel.setDestination(processContext.getDataMovementProtocol().name() + ":" + processContext.getWorkingDir
+ ());
taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
taskCtx.setTaskModel(taskModel);
return taskCtx;
}
- private TaskContext getDataStagingTaskContext(ProcessContext processContext, OutputDataObjectType processOutput) throws TException {
+ private TaskContext getDataStagingTaskContext(ProcessContext processContext, OutputDataObjectType processOutput)
+ throws TException {
TaskContext taskCtx = new TaskContext();
taskCtx.setParentProcessContext(processContext);
// create new task model for this task
@@ -241,7 +318,6 @@ public class GFacEngineImpl implements GFacEngine {
/**
* Persist task model
- * @param taskContext
*/
private void saveTaskModel(TaskContext taskContext) throws GFacException {
try {
@@ -266,56 +342,6 @@ public class GFacEngineImpl implements GFacEngine {
return taskCtx;
}
- @Override
- public void recoverProcess(ProcessContext processContext) throws GFacException {
-
- }
-
- @Override
- public void runProcessOutflow(ProcessContext processContext) throws GFacException {
- TaskContext taskCtx = null;
- processContext.setProcessStatus(new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING));
- List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
- for (OutputDataObjectType processOutput : processOutputs) {
- DataType type = processOutput.getType();
- switch (type) {
- case STDERR:
- break;
- case STDOUT:
- break;
- case URI:
- // TODO : Provide data staging data model
- try {
- taskCtx = getDataStagingTaskContext(processContext, processOutput);
- } catch (TException e) {
- throw new GFacException("Thrift model to byte[] convertion issue", e);
- }
- File localWorkingdir = new File(taskCtx.getLocalWorkingDir());
- localWorkingdir.mkdirs(); // make local dir if not exist
- saveTaskModel(taskCtx);
- GFacUtils.saveAndPublishTaskStatus(taskCtx);
- Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
- executeTask(taskCtx, dMoveTask);
- break;
- default:
- // nothing to do
- break;
- }
- }
- processContext.setProcessStatus(new ProcessStatus(ProcessState.POST_PROCESSING));
-// taskCtx = getEnvCleanupTaskContext(processContext);
-
- }
-
- @Override
- public void recoverProcessOutflow(ProcessContext processContext) throws GFacException {
-
- }
-
- @Override
- public void cancelProcess() throws GFacException {
-
- }
/**
* Sort input data type by input order.
@@ -331,7 +357,7 @@ public class GFacEngineImpl implements GFacEngine {
public static ResourceJobManager getResourceJobManager(ProcessContext processCtx) throws AppCatalogException {
List<JobSubmissionInterface> jobSubmissionInterfaces = Factory.getDefaultAppCatalog().getComputeResource()
- .getComputeResource(processCtx.getComputeResourceId()) .getJobSubmissionInterfaces();
+ .getComputeResource(processCtx.getComputeResourceId()).getJobSubmissionInterfaces();
ResourceJobManager resourceJobManager = null;
JobSubmissionInterface jsInterface = null;
@@ -346,7 +372,8 @@ public class GFacEngineImpl implements GFacEngine {
} else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
SSHJobSubmission sshJobSubmission = Factory.getDefaultAppCatalog().getComputeResource().getSSHJobSubmission
(jsInterface.getJobSubmissionInterfaceId());
- processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process context method.
+ processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process
+ // context method.
resourceJobManager = sshJobSubmission.getResourceJobManager();
} else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.LOCAL) {
LOCALSubmission localSubmission = Factory.getDefaultAppCatalog().getComputeResource().getLocalJobSubmission
http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 899f684..a759f90 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -25,6 +25,7 @@ import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.gfac.core.GFac;
import org.apache.airavata.gfac.core.GFacEngine;
import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.gfac.core.monitor.JobMonitor;
import org.apache.airavata.model.status.ProcessState;
@@ -32,6 +33,8 @@ import org.apache.airavata.model.status.ProcessStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.text.MessageFormat;
+
public class GFacWorker implements Runnable {
private static final Logger log = LoggerFactory.getLogger(GFacWorker.class);
@@ -85,11 +88,15 @@ public class GFacWorker implements Runnable {
// run the outflow task
engine.runProcessOutflow(processContext);
processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED));
+ GFacUtils.saveAndPublishProcessStatus(processContext);
+ sendAck();
break;
case RECOVER_OUTFLOW:
// recover outflow task;
engine.recoverProcessOutflow(processContext);
processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED));
+ GFacUtils.saveAndPublishProcessStatus(processContext);
+ sendAck();
break;
default:
throw new GFacException("process Id : " + processId + " Couldn't identify process type");
@@ -113,6 +120,14 @@ public class GFacWorker implements Runnable {
}
} catch (GFacException e) {
log.error("GFac Worker throws an exception", e);
+ processContext.setProcessStatus(new ProcessStatus(ProcessState.FAILED));
+ try {
+ GFacUtils.saveAndPublishProcessStatus(processContext);
+ } catch (GFacException e1) {
+ log.error("expId: {}, processId: {} :- Couldn't save and publish process status {}", processContext
+ .getExperimentId(), processContext.getProcessId(), processContext.getProcessState());
+ }
+ sendAck();
}
}
@@ -142,6 +157,19 @@ public class GFacWorker implements Runnable {
}
}
+ private void sendAck() {
+ try {
+ long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(), processId);
+ Factory.getProcessLaunchConsumer().sendAck(processDeliveryTag);
+ log.info("expId: {}, procesId: {} :- Sent ack for deliveryTag {}", processContext.getExperimentId(),
+ processId, processDeliveryTag);
+ } catch (Exception e1) {
+ String format = MessageFormat.format("expId: {0}, processId: {1} :- Couldn't send ack for deliveryTag ",
+ processContext .getExperimentId(), processId);
+ log.error(format, e1);
+ }
+ }
+
private ProcessType getProcessType(ProcessContext processContext) {
// check the status and return correct type of process.
switch (processContext.getProcessState()) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
index a34997c..17746f4 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
@@ -20,10 +20,8 @@
*/
package org.apache.airavata.gfac.impl.task;
-import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.task.Task;
import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.model.status.TaskState;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
index dbc1a97..f8ef0ea 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
@@ -25,6 +25,7 @@ import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.task.JobSubmissionTask;
import org.apache.airavata.gfac.core.task.TaskException;
import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.TaskTypes;
import java.util.Map;
@@ -36,12 +37,12 @@ public class ForkJobSubmissionTask implements JobSubmissionTask {
}
@Override
- public TaskState execute(TaskContext taskContext) throws TaskException {
+ public TaskStatus execute(TaskContext taskContext) {
return null;
}
@Override
- public TaskState recover(TaskContext taskContext) throws TaskException {
+ public TaskStatus recover(TaskContext taskContext) {
return null;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
index ad7ab6d..5201de6 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
@@ -36,6 +36,7 @@ import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.JobState;
import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.TaskTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,8 +54,8 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
}
@Override
- public TaskState execute(TaskContext taskContext) throws TaskException {
- try {
+ public TaskStatus execute(TaskContext taskContext) {
+ /* try {
ProcessContext processContext = taskContext.getParentProcessContext();
// build command with all inputs
List<String> cmdList = buildCommand(processContext);
@@ -97,10 +98,10 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
standardOutWriter.join();
standardErrorWriter.join();
- /*
+ *//*
* check return value. usually not very helpful to draw conclusions based on return values so don't bother.
* just provide warning in the log messages
- */
+ *//*
if (returnValue != 0) {
log.error("Process finished with non zero return value. Process may have failed");
} else {
@@ -124,12 +125,12 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
} catch (IOException e) {
log.error("Error while submitting local job", e);
throw new TaskException("Error while submitting local job", e);
- }
- return TaskState.COMPLETED;
+ }*/
+ return new TaskStatus(TaskState.COMPLETED);
}
@Override
- public TaskState recover(TaskContext taskContext) throws TaskException {
+ public TaskStatus recover(TaskContext taskContext) {
return null;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index 089535e..b2a83ed 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -20,63 +20,88 @@
*/
package org.apache.airavata.gfac.impl.task;
-import com.jcraft.jsch.JSch;
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.gfac.core.SSHApiException;
import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.task.Task;
import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.DataStagingTaskModel;
import org.apache.airavata.model.task.TaskTypes;
import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
public class SCPDataStageTask implements Task {
+ private static final Logger log = LoggerFactory.getLogger(SCPDataStageTask.class);
+
@Override
public void init(Map<String, String> propertyMap) throws TaskException {
}
@Override
- public TaskState execute(TaskContext taskContext) throws TaskException {
-
+ public TaskStatus execute(TaskContext taskContext) {
+ TaskStatus status = new TaskStatus(TaskState.COMPLETED);
if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
- throw new TaskException("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found "
+ status.setState(TaskState.FAILED);
+ status.setReason("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found "
+ taskContext.getTaskModel().getTaskType().toString());
- }
- try {
- DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext
- .getTaskModel());
- URI sourceURI = new URI(subTaskModel.getSource());
- URI destinationURI = new URI(subTaskModel.getDestination());
+ } else {
+ try {
+ DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext
+ .getTaskModel());
+ URI sourceURI = new URI(subTaskModel.getSource());
+ URI destinationURI = new URI(subTaskModel.getDestination());
- if (sourceURI.getScheme().equalsIgnoreCase("file")) { // Airavata --> RemoteCluster
- taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(), destinationURI
- .getPath());
- } else { // RemoteCluster --> Airavata
- taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(), destinationURI
- .getPath());
+ if (sourceURI.getScheme().equalsIgnoreCase("file")) { // Airavata --> RemoteCluster
+ taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(), destinationURI
+ .getPath());
+ } else { // RemoteCluster --> Airavata
+ taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(), destinationURI
+ .getPath());
+ }
+ status.setReason("Successfully staged data");
+ } catch (SSHApiException e) {
+ String msg = "Scp attempt failed";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (TException e) {
+ String msg = "Invalid task invocation";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (URISyntaxException e) {
+ String msg = "source or destination is not a valid URI";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
}
- } catch (SSHApiException e) {
- throw new TaskException("Scp attempt failed", e);
- } catch (TException e) {
- throw new TaskException("Invalid task invocation");
- } catch (URISyntaxException e) {
- throw new TaskException("source or destination is not a valid URI");
}
- return null;
+ return status;
}
@Override
- public TaskState recover(TaskContext taskContext) throws TaskException {
+ public TaskStatus recover(TaskContext taskContext) {
return null;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
index 332a0aa..fc4d634 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
@@ -23,23 +23,19 @@ package org.apache.airavata.gfac.impl.task;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
-import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.gfac.core.SSHApiException;
import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.Factory;
import org.apache.airavata.gfac.impl.SSHUtils;
-import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.DataStagingTaskModel;
import org.apache.airavata.model.task.TaskTypes;
import org.apache.thrift.TException;
import java.io.IOException;
-import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.net.URL;
public class SCPInputDataStageTask extends AbstractSCPTask {
@@ -47,9 +43,9 @@ public class SCPInputDataStageTask extends AbstractSCPTask {
}
@Override
- public TaskState execute(TaskContext taskContext) throws TaskException {
+ public TaskStatus execute(TaskContext taskContext) {
- if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
+/* if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
throw new TaskException("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found "
+ taskContext.getTaskModel().getTaskType().toString());
}
@@ -81,12 +77,12 @@ public class SCPInputDataStageTask extends AbstractSCPTask {
throw new TaskException("Invalid task invocation");
} catch (URISyntaxException e) {
e.printStackTrace();
- }
+ }*/
return null;
}
@Override
- public TaskState recover(TaskContext taskContext) throws TaskException {
+ public TaskStatus recover(TaskContext taskContext) {
return null;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
index 72e071c..6fa87c4 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
@@ -28,7 +28,7 @@ import org.apache.airavata.gfac.core.SSHApiException;
import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.task.TaskException;
import org.apache.airavata.gfac.impl.SSHUtils;
-import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.DataStagingTaskModel;
import org.apache.airavata.model.task.TaskTypes;
import org.apache.thrift.TException;
@@ -41,8 +41,8 @@ public class SCPOutputDataStatgeTask extends AbstractSCPTask {
@Override
- public TaskState execute(TaskContext taskContext) throws TaskException {
- if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
+ public TaskStatus execute(TaskContext taskContext) {
+/* if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) {
throw new TaskException("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found "
+ taskContext.getTaskModel().getTaskType().toString());
}
@@ -69,12 +69,12 @@ public class SCPOutputDataStatgeTask extends AbstractSCPTask {
throw new TaskException("Scp failed", e);
} catch (TException e) {
throw new TaskException("Invalid task invocation");
- }
+ }*/
return null;
}
@Override
- public TaskState recover(TaskContext taskContext) throws TaskException {
+ public TaskStatus recover(TaskContext taskContext) {
return null;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
index e541644..74f5826 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
@@ -20,37 +20,50 @@
*/
package org.apache.airavata.gfac.impl.task;
-import org.apache.airavata.gfac.core.task.TaskException;
import org.apache.airavata.gfac.core.SSHApiException;
import org.apache.airavata.gfac.core.cluster.RemoteCluster;
import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.TaskTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
public class SSHEnvironmentSetupTask implements Task {
+ private static final Logger log = LoggerFactory.getLogger(SSHEnvironmentSetupTask.class);
@Override
public void init(Map<String, String> propertyMap) throws TaskException {
}
@Override
- public TaskState execute(TaskContext taskContext) throws TaskException {
-
+ public TaskStatus execute(TaskContext taskContext) {
+ TaskStatus status = new TaskStatus(TaskState.COMPLETED);
try {
RemoteCluster remoteCluster = taskContext.getParentProcessContext().getRemoteCluster();
remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir());
+ status.setReason("Successfully createded environment");
} catch (SSHApiException e) {
- throw new TaskException("Error while environment setup", e);
+ String msg = "Error while environment setup";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
}
- return null;
+ return status;
}
@Override
- public TaskState recover(TaskContext taskContext) throws TaskException {
+ public TaskStatus recover(TaskContext taskContext) {
return execute(taskContext);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
index ff3a6f8..c282f17 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
@@ -31,10 +31,12 @@ import org.apache.airavata.gfac.core.task.JobSubmissionTask;
import org.apache.airavata.gfac.core.task.TaskException;
import org.apache.airavata.gfac.impl.Factory;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.JobState;
import org.apache.airavata.model.status.JobStatus;
import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.TaskTypes;
import org.apache.airavata.registry.cpi.AppCatalogException;
import org.apache.commons.io.FileUtils;
@@ -53,87 +55,143 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
}
@Override
- public TaskState execute(TaskContext taskContext) throws TaskException {
- try {
- ProcessContext processContext = taskContext.getParentProcessContext();
- JobModel jobModel = processContext.getJobModel();
- if (jobModel == null){
- jobModel = new JobModel();
- jobModel.setWorkingDir(processContext.getWorkingDir());
- jobModel.setTaskId(taskContext.getTaskId());
- jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
- }
- RemoteCluster remoteCluster = processContext.getRemoteCluster();
- JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext);
- jobModel.setJobName(jobDescriptor.getJobName());
- ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
- JobManagerConfiguration jConfig = null;
- if (resourceJobManager != null) {
- jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
- }
- File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig);
- if (jobFile != null && jobFile.exists()){
- jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
- String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir());
- if (jobId != null && !jobId.isEmpty()) {
- jobModel.setJobId(jobId);
- GFacUtils.saveJobStatus(taskContext, jobModel, JobState.SUBMITTED);
+ public TaskStatus execute(TaskContext taskContext){
+ TaskStatus status = new TaskStatus(TaskState.COMPLETED); // set to completed.
+ try {
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ JobModel jobModel = processContext.getJobModel();
+ if (jobModel == null) {
+ jobModel = new JobModel();
+ jobModel.setWorkingDir(processContext.getWorkingDir());
+ jobModel.setTaskId(taskContext.getTaskId());
+ jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ }
+ RemoteCluster remoteCluster = processContext.getRemoteCluster();
+ JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext);
+ jobModel.setJobName(jobDescriptor.getJobName());
+ ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext);
+ JobManagerConfiguration jConfig = null;
+ if (resourceJobManager != null) {
+ jConfig = Factory.getJobManagerConfiguration(resourceJobManager);
+ }
+ File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig);
+ if (jobFile != null && jobFile.exists()) {
+ jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+ String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir());
+ if (jobId != null && !jobId.isEmpty()) {
+ jobModel.setJobId(jobId);
+ GFacUtils.saveJobStatus(taskContext, jobModel, JobState.SUBMITTED);
// publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
// , GfacExperimentState.JOBSUBMITTED));
- processContext.setJobModel(jobModel);
- if (verifyJobSubmissionByJobId(remoteCluster, jobId)) {
+ processContext.setJobModel(jobModel);
+ if (verifyJobSubmissionByJobId(remoteCluster, jobId)) {
// publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
// , GfacExperimentState.JOBSUBMITTED));
- GFacUtils.saveJobStatus(taskContext, jobModel, JobState.QUEUED);
- }
- } else {
- processContext.setJobModel(jobModel);
- int verificationTryCount = 0;
- while (verificationTryCount++ < 3) {
- String verifyJobId = verifyJobSubmission(remoteCluster, jobModel);
- if (verifyJobId != null && !verifyJobId.isEmpty()) {
- // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
- jobId = verifyJobId;
- jobModel.setJobId(jobId);
+ GFacUtils.saveJobStatus(taskContext, jobModel, JobState.QUEUED);
+ }
+ status = new TaskStatus(TaskState.COMPLETED);
+ status.setReason("Submitted job to compute resource");
+ } else {
+ processContext.setJobModel(jobModel);
+ int verificationTryCount = 0;
+ while (verificationTryCount++ < 3) {
+ String verifyJobId = verifyJobSubmission(remoteCluster, jobModel);
+ if (verifyJobId != null && !verifyJobId.isEmpty()) {
+ // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
+ jobId = verifyJobId;
+ jobModel.setJobId(jobId);
// publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
// , GfacExperimentState.JOBSUBMITTED));
- GFacUtils.saveJobStatus(taskContext, jobModel, JobState.QUEUED);
- break;
- }
- Thread.sleep(verificationTryCount * 1000);
- }
- }
+ GFacUtils.saveJobStatus(taskContext, jobModel, JobState.QUEUED);
+ status.setState(TaskState.COMPLETED);
+ status.setReason("Submitted job to compute resource");
+ break;
+ }
+ Thread.sleep(verificationTryCount * 1000);
+ }
+ }
- if (jobId == null || jobId.isEmpty()) {
- String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find remote jobId for JobName:"
- + jobModel.getJobName() + ", both submit and verify steps doesn't return a valid JobId. Hence changing experiment state to Failed";
- log.error(msg);
- GFacUtils.saveErrorDetails(processContext, msg);
- // FIXME : Need to handle according to status update chain
-// GFacUtils.publishTaskStatus(jobExecutionContext, publisher, TaskState.FAILED);
- return TaskState.FAILED;
- }
- }
- return TaskState.COMPLETED;
- } catch (AppCatalogException e) {
- log.error("Error while instatiating app catalog",e);
- throw new TaskException("Error while instatiating app catalog", e);
- } catch (ApplicationSettingsException e) {
- log.error("Error occurred while creating job descriptor", e);
- throw new TaskException("Error occurred while creating job descriptor", e);
- } catch (GFacException e) {
- log.error("Error occurred while creating job descriptor", e);
- throw new TaskException("Error occurred while creating job descriptor", e);
- } catch (SSHApiException e) {
- log.error("Error occurred while submitting the job", e);
- throw new TaskException("Error occurred while submitting the job", e);
- } catch (IOException e) {
- log.error("Error while reading the content of the job file", e);
- throw new TaskException("Error while reading the content of the job file", e);
- } catch (InterruptedException e) {
- log.error("Error occurred while verifying the job submission", e);
- throw new TaskException("Error occurred while verifying the job submission", e);
- }
+ if (jobId == null || jobId.isEmpty()) {
+ String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find " +
+ "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " +
+ "doesn't return a valid JobId. " + "Hence changing experiment state to Failed";
+ log.error(msg);
+ GFacUtils.saveErrorDetails(processContext, msg);
+ status.setState(TaskState.FAILED);
+ status.setReason("Couldn't find job id in both submitted and verified steps");
+ }
+ } else {
+ status.setState(TaskState.FAILED);
+ if (jobFile == null) {
+ status.setReason("JobFile is null");
+ } else {
+ status.setReason("Job file doesn't exist");
+ }
+ }
+
+ } catch (AppCatalogException e) {
+ String msg = "Error while instatiating app catalog";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (ApplicationSettingsException e) {
+ String msg = "Error occurred while creating job descriptor";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (GFacException e) {
+ String msg = "Error occurred while creating job descriptor";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (SSHApiException e) {
+ String msg = "Error occurred while submitting the job";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (IOException e) {
+ String msg = "Error while reading the content of the job file";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ } catch (InterruptedException e) {
+ String msg = "Error occurred while verifying the job submission";
+ log.error(msg, e);
+ status.setState(TaskState.FAILED);
+ status.setReason(msg);
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setActualErrorMessage(e.getMessage());
+ errorModel.setUserFriendlyMessage(msg);
+ taskContext.getTaskModel().setTaskError(errorModel);
+ }
+
+ taskContext.setTaskStatus(status);
+ try {
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+ } catch (GFacException e) {
+ log.error("Error while saving task status", e);
+ }
+ return status;
}
private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, String jobID) throws SSHApiException {
@@ -154,15 +212,15 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
@Override
- public TaskState recover(TaskContext taskContext) throws TaskException {
+ public TaskStatus recover(TaskContext taskContext) {
ProcessContext processContext = taskContext.getParentProcessContext();
JobModel jobModel = processContext.getJobModel();
// original job failed before submitting
if (jobModel == null || jobModel.getJobId() == null ){
return execute(taskContext);
}else {
- // job is already submitted and monitor should handle the recovery
- return TaskState.COMPLETED;
+ // job is already submitted and monitor should handle the recovery
+ return new TaskStatus(TaskState.COMPLETED);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 6fa1288..56d9d9c 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -38,7 +38,6 @@ import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.MessagingConstants;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
import org.apache.airavata.model.messaging.event.*;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
@@ -92,7 +91,7 @@ public class GfacServerHandler implements GfacService.Iface {
}
private void initAMQPClient() throws AiravataException {
- rabbitMQProcessLaunchConsumer = new RabbitMQProcessLaunchConsumer();
+ rabbitMQProcessLaunchConsumer = Factory.getProcessLaunchConsumer();
rabbitMQProcessLaunchConsumer.listen(new ProcessLaunchMessageHandler());
}
@@ -220,7 +219,8 @@ public class GfacServerHandler implements GfacService.Iface {
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
try {
- GFacUtils.createExperimentNode(curatorClient, gfacServerName, event.getProcessId(), message.getDeliveryTag(),
+ GFacUtils.createProcessZKNode(curatorClient, gfacServerName, event.getProcessId(), message
+ .getDeliveryTag(),
event.getTokenId());
submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
} catch (Exception e) {