You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/01/08 19:36:58 UTC

airavata git commit: Fixed edge case issues with cancel & recovery

Repository: airavata
Updated Branches:
  refs/heads/master 545e75344 -> 4792eac6e


Fixed edge case issues with cancel & recovery


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4792eac6
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4792eac6
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4792eac6

Branch: refs/heads/master
Commit: 4792eac6e173c9ec81bab4f510ebb45da341a3ad
Parents: 545e753
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Fri Jan 8 13:36:44 2016 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Fri Jan 8 13:36:44 2016 -0500

----------------------------------------------------------------------
 .../airavata/gfac/impl/GFacEngineImpl.java      | 28 +++++++++------
 .../apache/airavata/gfac/impl/GFacWorker.java   | 36 +++++++++++++++-----
 .../gfac/monitor/email/EmailBasedMonitor.java   | 18 ++++++----
 3 files changed, 56 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/4792eac6/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 00d920d..f264e6c 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -215,7 +215,7 @@ public class GFacEngineImpl implements GFacEngine {
 
     private void executeTaskListFrom(ProcessContext processContext, String startingTaskId) throws GFacException {
         // checkpoint
-        if (processContext.isInterrupted()) {
+        if (processContext.isInterrupted() && processContext.getProcessState() != ProcessState.MONITORING) {
             GFacUtils.handleProcessInterrupt(processContext);
             return;
         }
@@ -552,7 +552,12 @@ public class GFacEngineImpl implements GFacEngine {
                 cancelJobSubmission(processContext, rTaskId, pTaskId);
             }
             continueProcess(processContext, recoverTaskId);
+        } else {
+            log.error("expId: {}, processId: {}, Error while recovering process, couldn't find recovery task",
+                    processContext.getExperimentId(), processContext.getProcessId());
         }
+
+
     }
 
     private void cancelJobSubmission(ProcessContext processContext, String rTaskId, String pTaskId) {
@@ -577,12 +582,17 @@ public class GFacEngineImpl implements GFacEngine {
 
                 if (jobModels != null && !jobModels.isEmpty()) {
                     JobModel jobModel = (JobModel) jobModels.get(jobModels.size() - 1);
-                    processContext.setJobModel(jobModel);
-                    log.info("expId: {}, processId: {}, Canceling jobId {}", processContext.getExperimentId(),
-                            processContext.getProcessId(), jobModel.getJobId());
-                    cancelProcess(processContext);
-                    log.info("expId: {}, processId: {}, Canceled jobId {}", processContext.getExperimentId(),
-                            processContext.getProcessId(), jobModel.getJobId());
+                    if (jobModel.getJobId() != null) {
+                        processContext.setJobModel(jobModel);
+                        log.info("expId: {}, processId: {}, Canceling jobId {}", processContext.getExperimentId(),
+                                processContext.getProcessId(), jobModel.getJobId());
+                        cancelProcess(processContext);
+                        log.info("expId: {}, processId: {}, Canceled jobId {}", processContext.getExperimentId(),
+                                processContext.getProcessId(), jobModel.getJobId());
+                    } else {
+                        log.error("expId: {}, processId: {}, Couldn't find jobId in jobModel, aborting process recovery",
+                                processContext.getExperimentId(), processContext.getProcessId());
+                    }
                 }
             } catch (GFacException e) {
                 log.error("expId: {}, processId: {}, Error while canceling process which is in recovery mode",
@@ -606,10 +616,6 @@ public class GFacEngineImpl implements GFacEngine {
 
     @Override
     public void continueProcess(ProcessContext processContext, String taskId) throws GFacException {
-        if (processContext.isInterrupted()) {
-            GFacUtils.handleProcessInterrupt(processContext);
-            return;
-        }
         executeTaskListFrom(processContext, taskId);
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/4792eac6/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index e0664a5..fd6dad3 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -29,6 +29,7 @@ import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.registry.core.experiment.catalog.model.Process;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +54,7 @@ public class GFacWorker implements Runnable {
 	 */
 	public GFacWorker(ProcessContext processContext) throws GFacException {
 		if (processContext == null) {
-			throw new GFacException("Worker must initialize with valide processContext, Process context is null");
+			throw new GFacException("Worker must initialize with valid processContext, Process context is null");
 		}
 		this.processId = processContext.getProcessId();
 		this.gatewayId = processContext.getGatewayId();
@@ -78,7 +79,7 @@ public class GFacWorker implements Runnable {
 	@Override
 	public void run() {
 		try {
-			ProcessState processState = processContext.getProcessStatus().getState();
+			ProcessState processState = processContext.getProcessState();
 			switch (processState) {
 				case CREATED:
 				case VALIDATED:
@@ -101,6 +102,9 @@ public class GFacWorker implements Runnable {
 				case COMPLETED:
 					completeProcess();
 					break;
+				case CANCELLING:
+					cancelProcess();
+					break;
 				case CANCELED:
 					// TODO - implement cancel scenario
 					break;
@@ -111,12 +115,18 @@ public class GFacWorker implements Runnable {
 					throw new GFacException("process Id : " + processId + " Couldn't identify process type");
 			}
 			if (processContext.isCancel()) {
-				if (processContext.getProcessState() == ProcessState.MONITORING
-						|| processContext.getProcessState() == ProcessState.EXECUTING) {
-					// don't send ack if the process is in MONITORING state, wait until cancel email comes to airavata.
-				} else {
-					sendAck();
-					Factory.getGfacContext().removeProcess(processContext.getProcessId());
+				processState = processContext.getProcessState();
+				switch (processState) {
+					case MONITORING: case EXECUTING:
+						// don't send ack if the process is in MONITORING or EXECUTING states, wait until cancel email comes to airavata
+						break;
+					case CANCELLING:
+						cancelProcess();
+						break;
+					default:
+						sendAck();
+						Factory.getGfacContext().removeProcess(processContext.getProcessId());
+						break;
 				}
 			}
 		} catch (GFacException e) {
@@ -143,6 +153,16 @@ public class GFacWorker implements Runnable {
 		}
 	}
 
+	private void cancelProcess() throws GFacException {
+		// do cleanup works before cancel the process.
+		ProcessStatus processStatus = new ProcessStatus(ProcessState.CANCELED);
+		processStatus.setReason("Process cancellation has been triggered");
+		processContext.setProcessStatus(processStatus);
+		GFacUtils.saveAndPublishProcessStatus(processContext);
+		sendAck();
+		Factory.getGfacContext().removeProcess(processContext.getProcessId());
+	}
+
 	private void completeProcess() throws GFacException {
         ProcessStatus status = new ProcessStatus(ProcessState.COMPLETED);
         status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());

http://git-wip-us.apache.org/repos/asf/airavata/blob/4792eac6/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 144465b..c7a6875 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -35,10 +35,7 @@ import org.apache.airavata.gfac.core.monitor.JobStatusResult;
 import org.apache.airavata.gfac.impl.GFacWorker;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
 import org.apache.airavata.model.job.JobModel;
-import org.apache.airavata.model.status.JobState;
-import org.apache.airavata.model.status.JobStatus;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.status.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -333,7 +330,8 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
         // TODO : update job state on process context
         boolean runOutflowTasks = false;
         JobStatus jobStatus = new JobStatus();
-        JobModel jobModel = taskContext.getParentProcessContext().getJobModel();
+        ProcessContext parentProcessContext = taskContext.getParentProcessContext();
+        JobModel jobModel = parentProcessContext.getJobModel();
         String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " + jobStatusResult.getJobId();
         // TODO - Handle all other valid JobStates
         if (resultState == JobState.COMPLETE) {
@@ -374,7 +372,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
 		    try {
 			    jobModel.setJobStatus(jobStatus);
 			    log.info("[EJM]: Publishing status changes to amqp. " + jobDetails);
-			    GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+			    GFacUtils.saveJobStatus(parentProcessContext, jobModel);
 		    } catch (GFacException e) {
 			    log.error("expId: {}, processId: {}, taskId: {}, jobId: {} :- Error while save and publishing Job " +
                         "status {}", taskContext.getExperimentId(), taskContext.getProcessId(), jobModel
@@ -390,7 +388,13 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
                 taskStatus.setReason("Job monitoring completed with final state: " + TaskState.COMPLETED.name());
                 taskContext.setTaskStatus(taskStatus);
                 GFacUtils.saveAndPublishTaskStatus(taskContext);
-		        GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(taskContext.getParentProcessContext()));
+                if (parentProcessContext.isCancel()) {
+                    ProcessStatus processStatus = new ProcessStatus(ProcessState.CANCELLING);
+                    processStatus.setReason("Process has been cancelled");
+                    parentProcessContext.setProcessStatus(processStatus);
+                    GFacUtils.saveAndPublishProcessStatus(parentProcessContext);
+                }
+		        GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(parentProcessContext));
 	        } catch (GFacException e) {
 		        log.info("[EJM]: Error while running output tasks", e);
 	        }