You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/01/08 19:36:58 UTC
airavata git commit: Fixed edge case issues with cancel & recovery
Repository: airavata
Updated Branches:
refs/heads/master 545e75344 -> 4792eac6e
Fixed edge case issues with cancel & recovery
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4792eac6
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4792eac6
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4792eac6
Branch: refs/heads/master
Commit: 4792eac6e173c9ec81bab4f510ebb45da341a3ad
Parents: 545e753
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Fri Jan 8 13:36:44 2016 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Fri Jan 8 13:36:44 2016 -0500
----------------------------------------------------------------------
.../airavata/gfac/impl/GFacEngineImpl.java | 28 +++++++++------
.../apache/airavata/gfac/impl/GFacWorker.java | 36 +++++++++++++++-----
.../gfac/monitor/email/EmailBasedMonitor.java | 18 ++++++----
3 files changed, 56 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/4792eac6/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 00d920d..f264e6c 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -215,7 +215,7 @@ public class GFacEngineImpl implements GFacEngine {
private void executeTaskListFrom(ProcessContext processContext, String startingTaskId) throws GFacException {
// checkpoint
- if (processContext.isInterrupted()) {
+ if (processContext.isInterrupted() && processContext.getProcessState() != ProcessState.MONITORING) {
GFacUtils.handleProcessInterrupt(processContext);
return;
}
@@ -552,7 +552,12 @@ public class GFacEngineImpl implements GFacEngine {
cancelJobSubmission(processContext, rTaskId, pTaskId);
}
continueProcess(processContext, recoverTaskId);
+ } else {
+ log.error("expId: {}, processId: {}, Error while recovering process, couldn't find recovery task",
+ processContext.getExperimentId(), processContext.getProcessId());
}
+
+
}
private void cancelJobSubmission(ProcessContext processContext, String rTaskId, String pTaskId) {
@@ -577,12 +582,17 @@ public class GFacEngineImpl implements GFacEngine {
if (jobModels != null && !jobModels.isEmpty()) {
JobModel jobModel = (JobModel) jobModels.get(jobModels.size() - 1);
- processContext.setJobModel(jobModel);
- log.info("expId: {}, processId: {}, Canceling jobId {}", processContext.getExperimentId(),
- processContext.getProcessId(), jobModel.getJobId());
- cancelProcess(processContext);
- log.info("expId: {}, processId: {}, Canceled jobId {}", processContext.getExperimentId(),
- processContext.getProcessId(), jobModel.getJobId());
+ if (jobModel.getJobId() != null) {
+ processContext.setJobModel(jobModel);
+ log.info("expId: {}, processId: {}, Canceling jobId {}", processContext.getExperimentId(),
+ processContext.getProcessId(), jobModel.getJobId());
+ cancelProcess(processContext);
+ log.info("expId: {}, processId: {}, Canceled jobId {}", processContext.getExperimentId(),
+ processContext.getProcessId(), jobModel.getJobId());
+ } else {
+ log.error("expId: {}, processId: {}, Couldn't find jobId in jobModel, aborting process recovery",
+ processContext.getExperimentId(), processContext.getProcessId());
+ }
}
} catch (GFacException e) {
log.error("expId: {}, processId: {}, Error while canceling process which is in recovery mode",
@@ -606,10 +616,6 @@ public class GFacEngineImpl implements GFacEngine {
@Override
public void continueProcess(ProcessContext processContext, String taskId) throws GFacException {
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- return;
- }
executeTaskListFrom(processContext, taskId);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4792eac6/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index e0664a5..fd6dad3 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -29,6 +29,7 @@ import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.registry.core.experiment.catalog.model.Process;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +54,7 @@ public class GFacWorker implements Runnable {
*/
public GFacWorker(ProcessContext processContext) throws GFacException {
if (processContext == null) {
- throw new GFacException("Worker must initialize with valide processContext, Process context is null");
+ throw new GFacException("Worker must initialize with valid processContext, Process context is null");
}
this.processId = processContext.getProcessId();
this.gatewayId = processContext.getGatewayId();
@@ -78,7 +79,7 @@ public class GFacWorker implements Runnable {
@Override
public void run() {
try {
- ProcessState processState = processContext.getProcessStatus().getState();
+ ProcessState processState = processContext.getProcessState();
switch (processState) {
case CREATED:
case VALIDATED:
@@ -101,6 +102,9 @@ public class GFacWorker implements Runnable {
case COMPLETED:
completeProcess();
break;
+ case CANCELLING:
+ cancelProcess();
+ break;
case CANCELED:
// TODO - implement cancel scenario
break;
@@ -111,12 +115,18 @@ public class GFacWorker implements Runnable {
throw new GFacException("process Id : " + processId + " Couldn't identify process type");
}
if (processContext.isCancel()) {
- if (processContext.getProcessState() == ProcessState.MONITORING
- || processContext.getProcessState() == ProcessState.EXECUTING) {
- // don't send ack if the process is in MONITORING state, wait until cancel email comes to airavata.
- } else {
- sendAck();
- Factory.getGfacContext().removeProcess(processContext.getProcessId());
+ processState = processContext.getProcessState();
+ switch (processState) {
+ case MONITORING: case EXECUTING:
+ // don't send ack if the process is in MONITORING or EXECUTING states, wait until cancel email comes to airavata
+ break;
+ case CANCELLING:
+ cancelProcess();
+ break;
+ default:
+ sendAck();
+ Factory.getGfacContext().removeProcess(processContext.getProcessId());
+ break;
}
}
} catch (GFacException e) {
@@ -143,6 +153,16 @@ public class GFacWorker implements Runnable {
}
}
+ private void cancelProcess() throws GFacException {
+ // do cleanup works before cancel the process.
+ ProcessStatus processStatus = new ProcessStatus(ProcessState.CANCELED);
+ processStatus.setReason("Process cancellation has been triggered");
+ processContext.setProcessStatus(processStatus);
+ GFacUtils.saveAndPublishProcessStatus(processContext);
+ sendAck();
+ Factory.getGfacContext().removeProcess(processContext.getProcessId());
+ }
+
private void completeProcess() throws GFacException {
ProcessStatus status = new ProcessStatus(ProcessState.COMPLETED);
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
http://git-wip-us.apache.org/repos/asf/airavata/blob/4792eac6/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 144465b..c7a6875 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -35,10 +35,7 @@ import org.apache.airavata.gfac.core.monitor.JobStatusResult;
import org.apache.airavata.gfac.impl.GFacWorker;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
import org.apache.airavata.model.job.JobModel;
-import org.apache.airavata.model.status.JobState;
-import org.apache.airavata.model.status.JobStatus;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.status.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -333,7 +330,8 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
// TODO : update job state on process context
boolean runOutflowTasks = false;
JobStatus jobStatus = new JobStatus();
- JobModel jobModel = taskContext.getParentProcessContext().getJobModel();
+ ProcessContext parentProcessContext = taskContext.getParentProcessContext();
+ JobModel jobModel = parentProcessContext.getJobModel();
String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " + jobStatusResult.getJobId();
// TODO - Handle all other valid JobStates
if (resultState == JobState.COMPLETE) {
@@ -374,7 +372,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
try {
jobModel.setJobStatus(jobStatus);
log.info("[EJM]: Publishing status changes to amqp. " + jobDetails);
- GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+ GFacUtils.saveJobStatus(parentProcessContext, jobModel);
} catch (GFacException e) {
log.error("expId: {}, processId: {}, taskId: {}, jobId: {} :- Error while save and publishing Job " +
"status {}", taskContext.getExperimentId(), taskContext.getProcessId(), jobModel
@@ -390,7 +388,13 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
taskStatus.setReason("Job monitoring completed with final state: " + TaskState.COMPLETED.name());
taskContext.setTaskStatus(taskStatus);
GFacUtils.saveAndPublishTaskStatus(taskContext);
- GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(taskContext.getParentProcessContext()));
+ if (parentProcessContext.isCancel()) {
+ ProcessStatus processStatus = new ProcessStatus(ProcessState.CANCELLING);
+ processStatus.setReason("Process has been cancelled");
+ parentProcessContext.setProcessStatus(processStatus);
+ GFacUtils.saveAndPublishProcessStatus(parentProcessContext);
+ }
+ GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(parentProcessContext));
} catch (GFacException e) {
log.info("[EJM]: Error while running output tasks", e);
}