You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/11/03 20:48:07 UTC

[42/51] [abbrv] airavata git commit: Completed process recovery

Completed process recovery


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b57a5015
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b57a5015
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b57a5015

Branch: refs/heads/master
Commit: b57a5015d047ac032636772cbb8f7dd9184035cd
Parents: 72fb57b
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Fri Oct 23 14:22:46 2015 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Fri Oct 23 14:22:46 2015 -0400

----------------------------------------------------------------------
 .../apache/airavata/gfac/core/GFacEngine.java   |  2 +-
 .../gfac/core/context/ProcessContext.java       | 19 ++++++---
 .../airavata/gfac/core/monitor/JobMonitor.java  |  6 +--
 .../airavata/gfac/impl/GFacEngineImpl.java      | 30 +++++++++++---
 .../apache/airavata/gfac/impl/GFacWorker.java   |  2 +-
 .../gfac/monitor/email/EmailBasedMonitor.java   | 43 ++++++++++++--------
 6 files changed, 67 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/b57a5015/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
index 1d3ad30..78a6278 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacEngine.java
@@ -29,7 +29,7 @@ public interface GFacEngine {
 
 	public void executeProcess(ProcessContext processContext) throws GFacException ;
 
-    public void recoverProcess(ProcessContext processContext, String taskId) throws GFacException;
+    public void recoverProcess(ProcessContext processContext) throws GFacException;
 
 	public void continueProcess(ProcessContext processContext, String taskId) throws GFacException ;
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b57a5015/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 47a7430..1a3a236 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -85,10 +85,10 @@ public class ProcessContext {
     private List<String> taskExecutionOrder;
     private List<TaskModel> taskList;
     private Map<String, TaskModel> taskMap;
-    private String currentExecutingTaskId; // TaskId of current executing task.
     private boolean pauseTaskExecution = false;  // Task can pause task execution by setting this value
     private boolean complete = false; // all tasks executed?
     private boolean recovery = false; // is process in recovery mode?
+    private TaskModel currentExecutingTaskModel; // current execution task model in case we pause process execution we need this to continue process exectuion again
 
     /**
 	 * Note: process context property use lazy loading approach. In runtime you will see some properties as null
@@ -416,11 +416,10 @@ public class ProcessContext {
     }
 
     public String getCurrentExecutingTaskId() {
-        return currentExecutingTaskId;
-    }
-
-    public void setCurrentExecutingTaskId(String currentExecutingTaskId) {
-        this.currentExecutingTaskId = currentExecutingTaskId;
+        if (currentExecutingTaskModel != null) {
+            return currentExecutingTaskModel.getTaskId();
+        }
+        return null;
     }
 
     public boolean isPauseTaskExecution() {
@@ -446,4 +445,12 @@ public class ProcessContext {
     public void setRecovery(boolean recovery) {
         this.recovery = recovery;
     }
+
+    public TaskModel getCurrentExecutingTaskModel() {
+        return currentExecutingTaskModel;
+    }
+
+    public void setCurrentExecutingTaskModel(TaskModel currentExecutingTaskModel) {
+        this.currentExecutingTaskModel = currentExecutingTaskModel;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/b57a5015/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
index a3f62cf..4b2ecb2 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
@@ -20,16 +20,16 @@
  */
 package org.apache.airavata.gfac.core.monitor;
 
-import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
 
 public interface JobMonitor {
 
 	/**
 	 * Start monitor jobId on remote computer resource.
 	 * @param jobId
-	 * @param processContext
+	 * @param taskContext
 	 */
-	void monitor(String jobId, ProcessContext processContext);
+	void monitor(String jobId, TaskContext taskContext);
 
 	/**
 	 * Stop monitoring for given jobId

http://git-wip-us.apache.org/repos/asf/airavata/blob/b57a5015/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 76e5ae2..4b67ffd 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -184,11 +184,11 @@ public class GFacEngineImpl implements GFacEngine {
             }
 
             TaskModel taskModel = taskMap.get(taskId);
+            processContext.setCurrentExecutingTaskModel(taskModel);
             TaskTypes taskType = taskModel.getTaskType();
             TaskContext taskContext = getTaskContext(processContext);
             taskContext.setTaskModel(taskModel);
             ProcessStatus status = null;
-            processContext.setCurrentExecutingTaskId(taskId);
             switch (taskType) {
                 case ENV_SETUP:
                     status = new ProcessStatus(ProcessState.CONFIGURING_WORKSPACE);
@@ -276,7 +276,7 @@ public class GFacEngineImpl implements GFacEngine {
             }
 
             if (processContext.isPauseTaskExecution()) {
-                return;   // If any task put processContext to wait, the same task should continue processContext execution.
+                return;   // If any task put processContext to wait, the same task must continue processContext execution.
             }
 
         }
@@ -285,7 +285,6 @@ public class GFacEngineImpl implements GFacEngine {
 
     private void executeJobMonitoring(TaskContext taskContext, boolean recovery) throws GFacException {
         ProcessContext processContext = taskContext.getParentProcessContext();
-        ProcessStatus status;
         TaskStatus taskStatus;
         JobMonitor monitorService = null;
         try {
@@ -297,7 +296,9 @@ public class GFacEngineImpl implements GFacEngine {
             MonitorTaskModel monitorTaskModel = ((MonitorTaskModel) taskContext.getSubTaskModel());
             monitorService = Factory.getMonitorService(monitorTaskModel.getMonitorMode());
             if (!monitorService.isMonitoring(processContext.getJobModel().getJobId())) {
-                monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
+                monitorService.monitor(processContext.getJobModel().getJobId(), taskContext);
+            } else {
+                log.warn("Jobid: {}, already in monitoring map", processContext.getJobModel().getJobId());
             }
         } catch (AiravataException | TException e) {
             taskStatus = new TaskStatus(TaskState.FAILED);
@@ -316,6 +317,10 @@ public class GFacEngineImpl implements GFacEngine {
             GFacUtils.saveTaskError(taskContext, errorModel);
             throw new GFacException(e);
         }
+        if (processContext.isPauseTaskExecution()) {
+            // we won't update task status to complete, job monitor will update task status to complete after it complete monitoring for this job id.
+            return;
+        }
         taskStatus = new TaskStatus(TaskState.COMPLETED);
         taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
         taskStatus.setReason("Successfully handed over job id to job monitor service.");
@@ -432,10 +437,23 @@ public class GFacEngineImpl implements GFacEngine {
         return false;
     }
 
-
     @Override
-    public void recoverProcess(ProcessContext processContext, String recoverTaskId) throws GFacException {
+    public void recoverProcess(ProcessContext processContext) throws GFacException {
         processContext.setRecovery(true);
+        String taskDag = processContext.getProcessModel().getTaskDag();
+        List<String> taskExecutionOrder = GFacUtils.parseTaskDag(taskDag);
+        processContext.setTaskExecutionOrder(taskExecutionOrder);
+        Map<String, TaskModel> taskMap = processContext.getTaskMap();
+        String recoverTaskId = null;
+        for (String taskId : taskExecutionOrder) {
+            TaskModel taskModel = taskMap.get(taskId);
+            TaskState state = taskModel.getTaskStatus().getState();
+            if (state == TaskState.CREATED || state == TaskState.EXECUTING) {
+                recoverTaskId = taskId;
+                break;
+            }
+        }
+
         continueProcess(processContext, recoverTaskId);
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b57a5015/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index c71c8e7..970cbf0 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -208,7 +208,7 @@ public class GFacWorker implements Runnable {
             }
         }
 
-        engine.recoverProcess(processContext, recoverTaskId);
+        engine.recoverProcess(processContext);
         if (processContext.isInterrupted()) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/b57a5015/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index b3a1398..f983d63 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -28,6 +28,7 @@ import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.config.ResourceConfig;
 import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.monitor.EmailParser;
 import org.apache.airavata.gfac.core.monitor.JobMonitor;
 import org.apache.airavata.gfac.core.monitor.JobStatusResult;
@@ -36,6 +37,8 @@ import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerTy
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.JobState;
 import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,7 +71,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
     private Store store;
     private Folder emailFolder;
     private Properties properties;
-    private Map<String, ProcessContext> jobMonitorMap = new ConcurrentHashMap<>();
+    private Map<String, TaskContext> jobMonitorMap = new ConcurrentHashMap<>();
     private String host, emailAddress, password, storeProtocol, folderName ;
     private Date monitorStartDate;
     private Map<ResourceJobManagerType, EmailParser> emailParserMap = new HashMap<ResourceJobManagerType, EmailParser>();
@@ -116,18 +119,18 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
 
 	}
 	@Override
-	public void monitor(String jobId, ProcessContext processContext) {
+	public void monitor(String jobId, TaskContext taskContext) {
 		log.info("[EJM]: Added monitor Id : " + jobId + " to email based monitor map");
-		jobMonitorMap.put(jobId, processContext);
-        processContext.setPauseTaskExecution(true);
+		jobMonitorMap.put(jobId, taskContext);
+        taskContext.getParentProcessContext().setPauseTaskExecution(true);
 	}
 
 	@Override
 	public void stopMonitor(String jobId, boolean runOutflow) {
-		ProcessContext processContext = jobMonitorMap.remove(jobId);
-		if (processContext != null && runOutflow) {
+		TaskContext taskContext = jobMonitorMap.remove(jobId);
+		if (taskContext != null && runOutflow) {
 			try {
-				GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(processContext));
+				GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(taskContext.getParentProcessContext()));
 			} catch (GFacException e) {
 				log.info("[EJM]: Error while running output tasks", e);
 			}
@@ -238,12 +241,12 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
         for (Message message : searchMessages) {
             try {
                 JobStatusResult jobStatusResult = parse(message);
-                ProcessContext processContext = jobMonitorMap.get(jobStatusResult.getJobId());
-                if (processContext == null) {
-	                processContext = jobMonitorMap.get(jobStatusResult.getJobName());
+                TaskContext taskContext = jobMonitorMap.get(jobStatusResult.getJobId());
+                if (taskContext == null) {
+                    taskContext = jobMonitorMap.get(jobStatusResult.getJobName());
                 }
-                if (processContext != null) {
-                    process(jobStatusResult, processContext);
+                if (taskContext != null) {
+                    process(jobStatusResult, taskContext);
                     processedMessages.add(message);
                 } else {
                     // we can get JobExecutionContext null in multiple Gfac instances environment,
@@ -294,12 +297,12 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
         }
     }
 
-    private void process(JobStatusResult jobStatusResult, ProcessContext processContext){
+    private void process(JobStatusResult jobStatusResult, TaskContext taskContext){
         JobState resultState = jobStatusResult.getState();
 	    // TODO : update job state on process context
         boolean runOutflowTasks = false;
 	    JobStatus jobStatus = new JobStatus();
-	    JobModel jobModel = processContext.getJobModel();
+	    JobModel jobModel = taskContext.getParentProcessContext().getJobModel();
         String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " + jobStatusResult.getJobId();
         // TODO - Handle all other valid JobStates
         if (resultState == JobState.COMPLETE) {
@@ -340,18 +343,22 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
 		    try {
 			    jobModel.setJobStatus(jobStatus);
 			    log.info("[EJM]: Publishing status changes to amqp. " + jobDetails);
-			    GFacUtils.saveJobStatus(processContext, jobModel);
+			    GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
 		    } catch (GFacException e) {
 			    log.error("expId: {}, processId: {}, taskId: {}, jobId: {} :- Error while save and publishing Job " +
-					    "status {}", processContext.getExperimentId(), processContext.getProcessId(), jobModel
-					    .getTaskId(), jobModel.getJobId(), jobStatus.getJobState());
+                        "status {}", taskContext.getExperimentId(), taskContext.getProcessId(), jobModel
+                        .getTaskId(), jobModel.getJobId(), jobStatus.getJobState());
 		    }
 	    }
 
         if (runOutflowTasks) {
             log.info("[EJM]: Calling Out Handler chain of " + jobDetails);
 	        try {
-		        GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(processContext));
+                TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED);
+                taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                taskStatus.setReason("Job monitoring completed with final state: " + TaskState.COMPLETED.name());
+                GFacUtils.saveAndPublishTaskStatus(taskContext);
+		        GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(taskContext.getParentProcessContext()));
 	        } catch (GFacException e) {
 		        log.info("[EJM]: Error while running output tasks", e);
 	        }