You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/10/23 17:04:09 UTC
airavata git commit: Saved correct task status in correct place in
order to fix recovery issues
Repository: airavata
Updated Branches:
refs/heads/orchestratorTaskBreakdown 138c9949c -> 72fb57bc2
Saved correct task status in correct place in order to fix recovery issues
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/72fb57bc
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/72fb57bc
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/72fb57bc
Branch: refs/heads/orchestratorTaskBreakdown
Commit: 72fb57bc280690885974547ec0c9e1fd4102601c
Parents: 138c994
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Fri Oct 23 11:04:00 2015 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Fri Oct 23 11:04:00 2015 -0400
----------------------------------------------------------------------
.../airavata/gfac/core/monitor/JobMonitor.java | 5 +
.../airavata/gfac/impl/GFacEngineImpl.java | 115 ++++++++++++++-----
.../apache/airavata/gfac/impl/GFacWorker.java | 1 +
.../impl/task/AdvancedSCPDataStageTask.java | 2 +-
.../gfac/monitor/email/EmailBasedMonitor.java | 5 +
5 files changed, 101 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/72fb57bc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
index a909791..a3f62cf 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
@@ -35,4 +35,9 @@ public interface JobMonitor {
* Stop monitoring for given jobId
*/
void stopMonitor(String jobId, boolean runOutFlow);
+
+ /**
+ * Return <code>true</code> if jobId is already monitoring by this Monitor, <code>false</code> if not
+ */
+ boolean isMonitoring(String jobId);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/72fb57bc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index e213a9b..76e5ae2 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -259,18 +259,11 @@ public class GFacEngineImpl implements GFacEngine {
break;
case MONITORING:
- JobMonitor monitorService = null;
- try {
- MonitorTaskModel monitorTaskModel = ((MonitorTaskModel) taskContext.getSubTaskModel());
- status = new ProcessStatus(ProcessState.MONITORING);
- status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- processContext.setProcessStatus(status);
- GFacUtils.saveAndPublishProcessStatus(processContext);
- monitorService = Factory.getMonitorService(monitorTaskModel.getMonitorMode());
- monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
- } catch (AiravataException | TException e) {
- throw new GFacException(e);
- }
+ status = new ProcessStatus(ProcessState.MONITORING);
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ processContext.setProcessStatus(status);
+ GFacUtils.saveAndPublishProcessStatus(processContext);
+ executeJobMonitoring(taskContext, processContext.isRecovery());
break;
case ENV_CLEANUP:
@@ -290,22 +283,70 @@ public class GFacEngineImpl implements GFacEngine {
processContext.setComplete(true);
}
- private boolean executeJobSubmission(TaskContext taskContext, boolean recovery) throws GFacException {
+ private void executeJobMonitoring(TaskContext taskContext, boolean recovery) throws GFacException {
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ ProcessStatus status;
TaskStatus taskStatus;
+ JobMonitor monitorService = null;
+ try {
+ taskStatus = new TaskStatus(TaskState.EXECUTING);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskContext.setTaskStatus(taskStatus);
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+
+ MonitorTaskModel monitorTaskModel = ((MonitorTaskModel) taskContext.getSubTaskModel());
+ monitorService = Factory.getMonitorService(monitorTaskModel.getMonitorMode());
+ if (!monitorService.isMonitoring(processContext.getJobModel().getJobId())) {
+ monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
+ }
+ } catch (AiravataException | TException e) {
+ taskStatus = new TaskStatus(TaskState.FAILED);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskStatus.setReason("Couldn't handover jobId {} to monitor service, monitor service type {}");
+ taskContext.setTaskStatus(taskStatus);
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+
+ String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ")
+ .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+ .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Input staging failed. Reason: ")
+ .append(taskStatus.getReason()).toString();
+ ErrorModel errorModel = new ErrorModel();
+ errorModel.setUserFriendlyMessage("Error while staging output data");
+ errorModel.setActualErrorMessage(errorMsg);
+ GFacUtils.saveTaskError(taskContext, errorModel);
+ throw new GFacException(e);
+ }
+ taskStatus = new TaskStatus(TaskState.COMPLETED);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskStatus.setReason("Successfully handed over job id to job monitor service.");
+ taskContext.setTaskStatus(taskStatus);
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+ }
+
+ private boolean executeJobSubmission(TaskContext taskContext, boolean recovery) throws GFacException {
+ TaskStatus taskStatus = new TaskStatus(TaskState.EXECUTING);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskContext.setTaskStatus(taskStatus);
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
try {
JobSubmissionTaskModel jobSubmissionTaskModel = ((JobSubmissionTaskModel) taskContext.getSubTaskModel());
JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(jobSubmissionTaskModel.getJobSubmissionProtocol());
ProcessContext processContext = taskContext.getParentProcessContext();
taskStatus = executeTask(taskContext, jobSubmissionTask, recovery);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskContext.setTaskStatus(taskStatus);
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+
if (taskStatus.getState() == TaskState.FAILED) {
log.error("expId: {}, processId: {}, taskId: {} type: {},:- Job submission task failed, " +
"reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
.getParentProcessContext().getProcessId(), taskContext.getTaskId(), jobSubmissionTask.getType
().name(), taskStatus.getReason());
- String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Job submission task failed, " +
- "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() +
- taskContext.getTaskId() + jobSubmissionTask.getType().name() + taskStatus.getReason();
+ String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ")
+ .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+ .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Job submission task failed. Reason: ")
+ .append(taskStatus.getReason()).toString();
ErrorModel errorModel = new ErrorModel();
errorModel.setUserFriendlyMessage("Job submission task failed");
errorModel.setActualErrorMessage(errorMsg);
@@ -343,8 +384,11 @@ public class GFacEngineImpl implements GFacEngine {
"reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
.getParentProcessContext().getProcessId(), taskContext.getTaskId(), envSetupTask.getType
().name(), taskStatus.getReason());
- String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Input staging failed, " +
- "reason:" + " {}" + taskContext.getExperimentId() + taskContext.getProcessId() + taskContext.getTaskId() + envSetupTask.getType().name() + taskStatus.getReason();
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ")
+ .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+ .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Environment Setup failed. Reason: ")
+ .append(taskStatus.getReason()).toString();
ErrorModel errorModel = new ErrorModel();
errorModel.setUserFriendlyMessage("Error while environment setup");
errorModel.setActualErrorMessage(errorMsg);
@@ -358,18 +402,27 @@ public class GFacEngineImpl implements GFacEngine {
}
private boolean inputDataStaging(TaskContext taskContext, boolean recover) throws GFacException {
- TaskStatus taskStatus;// execute process inputs
+ TaskStatus taskStatus = new TaskStatus(TaskState.EXECUTING);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskContext.setTaskStatus(taskStatus);
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+
ProcessContext processContext = taskContext.getParentProcessContext();
Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
taskStatus = executeTask(taskContext, dMoveTask, false);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskContext.setTaskStatus(taskStatus);
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+
if (taskStatus.getState() == TaskState.FAILED) {
log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " +
"reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
.getParentProcessContext().getProcessId(), taskContext.getTaskId(), dMoveTask.getType
().name(), taskStatus.getReason());
- String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Input staging failed, " +
- "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() +
- taskContext.getTaskId() + dMoveTask.getType().name() + taskStatus.getReason();
+ String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ")
+ .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+ .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Input staging failed. Reason: ")
+ .append(taskStatus.getReason()).toString();
ErrorModel errorModel = new ErrorModel();
errorModel.setUserFriendlyMessage("Error while staging input data");
errorModel.setActualErrorMessage(errorMsg);
@@ -430,18 +483,28 @@ public class GFacEngineImpl implements GFacEngine {
* @throws GFacException
*/
private boolean outputDataStaging(TaskContext taskContext, boolean recovery) throws GFacException {
+ TaskStatus taskStatus = new TaskStatus(TaskState.EXECUTING);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskContext.setTaskStatus(taskStatus);
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+
ProcessContext processContext = taskContext.getParentProcessContext();
Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
- TaskStatus taskStatus = executeTask(taskContext, dMoveTask, recovery);
+ taskStatus = executeTask(taskContext, dMoveTask, recovery);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskContext.setTaskStatus(taskStatus);
+ GFacUtils.saveAndPublishTaskStatus(taskContext);
+
if (taskStatus.getState() == TaskState.FAILED) {
log.error("expId: {}, processId: {}, taskId: {} type: {},:- output staging failed, " +
"reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
.getParentProcessContext().getProcessId(), taskContext.getTaskId(), dMoveTask.getType
().name(), taskStatus.getReason());
- String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- output staging failed, " +
- "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() +
- taskContext.getTaskId() + dMoveTask.getType().name() + taskStatus.getReason();
+ String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ")
+ .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+ .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Output staging failed. Reason: ")
+ .append(taskStatus.getReason()).toString();
ErrorModel errorModel = new ErrorModel();
errorModel.setUserFriendlyMessage("Error while staging output data");
errorModel.setActualErrorMessage(errorMsg);
http://git-wip-us.apache.org/repos/asf/airavata/blob/72fb57bc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 52cd395..c71c8e7 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -196,6 +196,7 @@ public class GFacWorker implements Runnable {
String taskDag = processContext.getProcessModel().getTaskDag();
List<String> taskExecutionOrder = GFacUtils.parseTaskDag(taskDag);
+ processContext.setTaskExecutionOrder(taskExecutionOrder);
Map<String, TaskModel> taskMap = processContext.getTaskMap();
String recoverTaskId = null;
for (String taskId : taskExecutionOrder) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/72fb57bc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
index e200546..029da77 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
@@ -91,7 +91,7 @@ public class AdvancedSCPDataStageTask implements Task {
@Override
public TaskStatus execute(TaskContext taskContext) {
- TaskStatus status = new TaskStatus(TaskState.CREATED);
+ TaskStatus status = new TaskStatus(TaskState.EXECUTING);
AuthenticationInfo authenticationInfo = null;
DataStagingTaskModel subTaskModel = null;
String localDataDir = null;
http://git-wip-us.apache.org/repos/asf/airavata/blob/72fb57bc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index c4d4676..b3a1398 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -134,6 +134,11 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
}
}
+ @Override
+ public boolean isMonitoring(String jobId) {
+ return jobMonitorMap.containsKey(jobId);
+ }
+
private JobStatusResult parse(Message message) throws MessagingException, AiravataException {
Address fromAddress = message.getFrom()[0];
String addressStr = fromAddress.toString();