You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/10/23 17:04:09 UTC

airavata git commit: Saved correct task status in correct place in order to fix recovery issues

Repository: airavata
Updated Branches:
  refs/heads/orchestratorTaskBreakdown 138c9949c -> 72fb57bc2


Saved correct task status in correct place in order to fix recovery issues


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/72fb57bc
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/72fb57bc
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/72fb57bc

Branch: refs/heads/orchestratorTaskBreakdown
Commit: 72fb57bc280690885974547ec0c9e1fd4102601c
Parents: 138c994
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Fri Oct 23 11:04:00 2015 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Fri Oct 23 11:04:00 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/core/monitor/JobMonitor.java  |   5 +
 .../airavata/gfac/impl/GFacEngineImpl.java      | 115 ++++++++++++++-----
 .../apache/airavata/gfac/impl/GFacWorker.java   |   1 +
 .../impl/task/AdvancedSCPDataStageTask.java     |   2 +-
 .../gfac/monitor/email/EmailBasedMonitor.java   |   5 +
 5 files changed, 101 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/72fb57bc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
index a909791..a3f62cf 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
@@ -35,4 +35,9 @@ public interface JobMonitor {
 	 * Stop monitoring for given jobId
 	 */
 	void stopMonitor(String jobId, boolean runOutFlow);
+
+    /**
+     * Return <code>true</code> if jobId is already monitoring by this Monitor, <code>false</code> if not
+     */
+    boolean isMonitoring(String jobId);
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/72fb57bc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index e213a9b..76e5ae2 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -259,18 +259,11 @@ public class GFacEngineImpl implements GFacEngine {
                     break;
 
                 case MONITORING:
-                    JobMonitor monitorService = null;
-                    try {
-                        MonitorTaskModel monitorTaskModel = ((MonitorTaskModel) taskContext.getSubTaskModel());
-                        status = new ProcessStatus(ProcessState.MONITORING);
-                        status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-                        processContext.setProcessStatus(status);
-                        GFacUtils.saveAndPublishProcessStatus(processContext);
-                        monitorService = Factory.getMonitorService(monitorTaskModel.getMonitorMode());
-                        monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
-                    } catch (AiravataException | TException e) {
-                        throw new GFacException(e);
-                    }
+                    status = new ProcessStatus(ProcessState.MONITORING);
+                    status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                    processContext.setProcessStatus(status);
+                    GFacUtils.saveAndPublishProcessStatus(processContext);
+                    executeJobMonitoring(taskContext, processContext.isRecovery());
                     break;
 
                 case ENV_CLEANUP:
@@ -290,22 +283,70 @@ public class GFacEngineImpl implements GFacEngine {
         processContext.setComplete(true);
     }
 
-    private boolean executeJobSubmission(TaskContext taskContext, boolean recovery) throws GFacException {
+    private void executeJobMonitoring(TaskContext taskContext, boolean recovery) throws GFacException {
+        ProcessContext processContext = taskContext.getParentProcessContext();
+        ProcessStatus status;
         TaskStatus taskStatus;
+        JobMonitor monitorService = null;
+        try {
+            taskStatus = new TaskStatus(TaskState.EXECUTING);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            taskContext.setTaskStatus(taskStatus);
+            GFacUtils.saveAndPublishTaskStatus(taskContext);
+
+            MonitorTaskModel monitorTaskModel = ((MonitorTaskModel) taskContext.getSubTaskModel());
+            monitorService = Factory.getMonitorService(monitorTaskModel.getMonitorMode());
+            if (!monitorService.isMonitoring(processContext.getJobModel().getJobId())) {
+                monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
+            }
+        } catch (AiravataException | TException e) {
+            taskStatus = new TaskStatus(TaskState.FAILED);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            taskStatus.setReason("Couldn't handover jobId {} to monitor service, monitor service type {}");
+            taskContext.setTaskStatus(taskStatus);
+            GFacUtils.saveAndPublishTaskStatus(taskContext);
+
+            String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ")
+                    .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+                    .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Input staging failed. Reason: ")
+                    .append(taskStatus.getReason()).toString();
+            ErrorModel errorModel = new ErrorModel();
+            errorModel.setUserFriendlyMessage("Error while staging output data");
+            errorModel.setActualErrorMessage(errorMsg);
+            GFacUtils.saveTaskError(taskContext, errorModel);
+            throw new GFacException(e);
+        }
+        taskStatus = new TaskStatus(TaskState.COMPLETED);
+        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        taskStatus.setReason("Successfully handed over job id to job monitor service.");
+        taskContext.setTaskStatus(taskStatus);
+        GFacUtils.saveAndPublishTaskStatus(taskContext);
+    }
+
+    private boolean executeJobSubmission(TaskContext taskContext, boolean recovery) throws GFacException {
+        TaskStatus taskStatus = new TaskStatus(TaskState.EXECUTING);
+        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        taskContext.setTaskStatus(taskStatus);
+        GFacUtils.saveAndPublishTaskStatus(taskContext);
         try {
             JobSubmissionTaskModel jobSubmissionTaskModel = ((JobSubmissionTaskModel) taskContext.getSubTaskModel());
             JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(jobSubmissionTaskModel.getJobSubmissionProtocol());
 
             ProcessContext processContext = taskContext.getParentProcessContext();
             taskStatus = executeTask(taskContext, jobSubmissionTask, recovery);
+            taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+            taskContext.setTaskStatus(taskStatus);
+            GFacUtils.saveAndPublishTaskStatus(taskContext);
+
             if (taskStatus.getState() == TaskState.FAILED) {
                 log.error("expId: {}, processId: {}, taskId: {} type: {},:- Job submission task failed, " +
                         "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
                         .getParentProcessContext().getProcessId(), taskContext.getTaskId(), jobSubmissionTask.getType
                         ().name(), taskStatus.getReason());
-                String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Job submission task failed, " +
-                        "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() +
-                        taskContext.getTaskId() + jobSubmissionTask.getType().name() + taskStatus.getReason();
+                String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ")
+                        .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+                        .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Job submission task failed. Reason: ")
+                        .append(taskStatus.getReason()).toString();
                 ErrorModel errorModel = new ErrorModel();
                 errorModel.setUserFriendlyMessage("Job submission task failed");
                 errorModel.setActualErrorMessage(errorMsg);
@@ -343,8 +384,11 @@ public class GFacEngineImpl implements GFacEngine {
                         "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
                         .getParentProcessContext().getProcessId(), taskContext.getTaskId(), envSetupTask.getType
                         ().name(), taskStatus.getReason());
-                String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Input staging failed, " +
-                        "reason:" + " {}" + taskContext.getExperimentId() + taskContext.getProcessId() + taskContext.getTaskId() + envSetupTask.getType().name() + taskStatus.getReason();
+                ProcessContext processContext = taskContext.getParentProcessContext();
+                String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ")
+                        .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+                        .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Environment Setup failed. Reason: ")
+                        .append(taskStatus.getReason()).toString();
                 ErrorModel errorModel = new ErrorModel();
                 errorModel.setUserFriendlyMessage("Error while environment setup");
                 errorModel.setActualErrorMessage(errorMsg);
@@ -358,18 +402,27 @@ public class GFacEngineImpl implements GFacEngine {
     }
 
     private boolean inputDataStaging(TaskContext taskContext, boolean recover) throws GFacException {
-        TaskStatus taskStatus;// execute process inputs
+        TaskStatus taskStatus = new TaskStatus(TaskState.EXECUTING);
+        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        taskContext.setTaskStatus(taskStatus);
+        GFacUtils.saveAndPublishTaskStatus(taskContext);
+
         ProcessContext processContext = taskContext.getParentProcessContext();
         Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
         taskStatus = executeTask(taskContext, dMoveTask, false);
+        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        taskContext.setTaskStatus(taskStatus);
+        GFacUtils.saveAndPublishTaskStatus(taskContext);
+
         if (taskStatus.getState() == TaskState.FAILED) {
             log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " +
                     "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
                     .getParentProcessContext().getProcessId(), taskContext.getTaskId(), dMoveTask.getType
                     ().name(), taskStatus.getReason());
-            String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- Input staging failed, " +
-                    "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() +
-                    taskContext.getTaskId() + dMoveTask.getType().name() + taskStatus.getReason();
+            String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ")
+                    .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+                    .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Input staging failed. Reason: ")
+                    .append(taskStatus.getReason()).toString();
             ErrorModel errorModel = new ErrorModel();
             errorModel.setUserFriendlyMessage("Error while staging input data");
             errorModel.setActualErrorMessage(errorMsg);
@@ -430,18 +483,28 @@ public class GFacEngineImpl implements GFacEngine {
      * @throws GFacException
      */
     private boolean outputDataStaging(TaskContext taskContext, boolean recovery) throws GFacException {
+        TaskStatus taskStatus = new TaskStatus(TaskState.EXECUTING);
+        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        taskContext.setTaskStatus(taskStatus);
+        GFacUtils.saveAndPublishTaskStatus(taskContext);
+
         ProcessContext processContext = taskContext.getParentProcessContext();
         Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol());
-        TaskStatus taskStatus = executeTask(taskContext, dMoveTask, recovery);
+        taskStatus = executeTask(taskContext, dMoveTask, recovery);
+        taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+        taskContext.setTaskStatus(taskStatus);
+        GFacUtils.saveAndPublishTaskStatus(taskContext);
+
         if (taskStatus.getState() == TaskState.FAILED) {
             log.error("expId: {}, processId: {}, taskId: {} type: {},:- output staging failed, " +
                     "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
                     .getParentProcessContext().getProcessId(), taskContext.getTaskId(), dMoveTask.getType
                     ().name(), taskStatus.getReason());
 
-            String errorMsg = "expId: {}, processId: {}, taskId: {} type: {},:- output staging failed, " +
-                    "reason:" + " {}" + processContext.getExperimentId() + processContext.getProcessId() +
-                    taskContext.getTaskId() + dMoveTask.getType().name() + taskStatus.getReason();
+            String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ")
+                    .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+                    .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Output staging failed. Reason: ")
+                    .append(taskStatus.getReason()).toString();
             ErrorModel errorModel = new ErrorModel();
             errorModel.setUserFriendlyMessage("Error while staging output data");
             errorModel.setActualErrorMessage(errorMsg);

http://git-wip-us.apache.org/repos/asf/airavata/blob/72fb57bc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 52cd395..c71c8e7 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -196,6 +196,7 @@ public class GFacWorker implements Runnable {
 
         String taskDag = processContext.getProcessModel().getTaskDag();
         List<String> taskExecutionOrder = GFacUtils.parseTaskDag(taskDag);
+        processContext.setTaskExecutionOrder(taskExecutionOrder);
         Map<String, TaskModel> taskMap = processContext.getTaskMap();
         String recoverTaskId = null;
         for (String taskId : taskExecutionOrder) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/72fb57bc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
index e200546..029da77 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AdvancedSCPDataStageTask.java
@@ -91,7 +91,7 @@ public class AdvancedSCPDataStageTask implements Task {
 
     @Override
     public TaskStatus execute(TaskContext taskContext) {
-        TaskStatus status = new TaskStatus(TaskState.CREATED);
+        TaskStatus status = new TaskStatus(TaskState.EXECUTING);
         AuthenticationInfo authenticationInfo = null;
         DataStagingTaskModel subTaskModel = null;
         String localDataDir = null;

http://git-wip-us.apache.org/repos/asf/airavata/blob/72fb57bc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index c4d4676..b3a1398 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -134,6 +134,11 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
 		}
 	}
 
+    @Override
+    public boolean isMonitoring(String jobId) {
+        return jobMonitorMap.containsKey(jobId);
+    }
+
     private JobStatusResult parse(Message message) throws MessagingException, AiravataException {
         Address fromAddress = message.getFrom()[0];
         String addressStr = fromAddress.toString();