You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/09/23 19:59:04 UTC
airavata git commit: Handle cancel job operation in Queue state
Repository: airavata
Updated Branches:
refs/heads/master dbb1a0fd2 -> 40a4daebb
Handle cancel job operation in Queue state
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/40a4daeb
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/40a4daeb
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/40a4daeb
Branch: refs/heads/master
Commit: 40a4daebb4dddbc7238e09d55aac16f4ecda6876
Parents: dbb1a0f
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Wed Sep 23 13:58:58 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Wed Sep 23 13:58:58 2015 -0400
----------------------------------------------------------------------
.../apache/airavata/gfac/core/cluster/RemoteCluster.java | 2 +-
.../apache/airavata/gfac/core/monitor/JobMonitor.java | 5 ++---
.../airavata/gfac/core/task/JobSubmissionTask.java | 3 ++-
.../org/apache/airavata/gfac/impl/GFacEngineImpl.java | 11 ++++++++++-
.../org/apache/airavata/gfac/impl/HPCRemoteCluster.java | 5 +++--
.../airavata/gfac/impl/task/LocalJobSubmissionTask.java | 4 +++-
.../gfac/impl/task/SSHForkJobSubmissionTask.java | 3 ++-
.../airavata/gfac/impl/task/SSHJobSubmissionTask.java | 8 ++++++--
.../airavata/gfac/monitor/email/EmailBasedMonitor.java | 11 +++++++++--
9 files changed, 38 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
index 932451b..3a03a6a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
@@ -84,7 +84,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @return return the description of the deleted job
* @throws SSHApiException throws exception during error
*/
- public boolean cancelJob(String jobID) throws SSHApiException;
+ public JobStatus cancelJob(String jobID) throws SSHApiException;
/**
* This will get the job status of the the job associated with this jobId
http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
index 64a9838..a909791 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
@@ -29,11 +29,10 @@ public interface JobMonitor {
* @param jobId
* @param processContext
*/
- public void monitor(String jobId, ProcessContext processContext);
+ void monitor(String jobId, ProcessContext processContext);
/**
* Stop monitoring for given jobId
- * @param jobId
*/
- public void stopMonitor(String jobId);
+ void stopMonitor(String jobId, boolean runOutFlow);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/JobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/JobSubmissionTask.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/JobSubmissionTask.java
index c18d9fc..a5a33ae 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/JobSubmissionTask.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/JobSubmissionTask.java
@@ -21,10 +21,11 @@
package org.apache.airavata.gfac.core.task;
import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.model.status.JobStatus;
public interface JobSubmissionTask extends Task {
- void cancel(TaskContext taskcontext) throws TaskException;
+ JobStatus cancel(TaskContext taskcontext) throws TaskException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 0e11d21..3af600b 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -31,6 +31,7 @@ import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.monitor.JobMonitor;
import org.apache.airavata.gfac.core.task.JobSubmissionTask;
import org.apache.airavata.gfac.core.task.Task;
import org.apache.airavata.gfac.core.task.TaskException;
@@ -48,6 +49,8 @@ import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
import org.apache.airavata.model.status.TaskState;
@@ -491,9 +494,15 @@ public class GFacEngineImpl implements GFacEngine {
private void executeCancel(TaskContext taskContext, JobSubmissionTask jSTask) throws GFacException {
try {
- jSTask.cancel(taskContext);
+ JobStatus oldJobStatus = jSTask.cancel(taskContext);
+ if (oldJobStatus.getJobState() == JobState.QUEUED) {
+ JobMonitor monitorService = Factory.getMonitorService(taskContext.getParentProcessContext().getMonitorMode());
+ monitorService.stopMonitor(taskContext.getParentProcessContext().getJobModel().getJobId(),true);
+ }
} catch (TaskException e) {
throw new GFacException("Error while cancelling job");
+ } catch (AiravataException e) {
+ throw new GFacException("Error wile getting monitoring service");
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index 4325e27..2664d3f 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -190,12 +190,13 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
}
@Override
- public boolean cancelJob(String jobId) throws SSHApiException {
+ public JobStatus cancelJob(String jobId) throws SSHApiException {
+ JobStatus oldStatus = getJobStatus(jobId);
RawCommandInfo cancelCommand = jobManagerConfiguration.getCancelCommand(jobId);
StandardOutReader reader = new StandardOutReader();
executeCommand(cancelCommand, reader);
throwExceptionOnError(reader, cancelCommand);
- return true;
+ return oldStatus;
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
index 3626a54..7954961 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
@@ -35,6 +35,7 @@ import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
import org.apache.airavata.model.status.TaskState;
import org.apache.airavata.model.status.TaskStatus;
import org.apache.airavata.model.task.TaskTypes;
@@ -182,7 +183,8 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
}
@Override
- public void cancel(TaskContext taskcontext) {
+ public JobStatus cancel(TaskContext taskcontext) {
// TODO - implement Local Job cancel
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
index 7224340..ce6a51b 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
@@ -175,7 +175,8 @@ public class SSHForkJobSubmissionTask implements JobSubmissionTask {
}
@Override
- public void cancel(TaskContext taskcontext) {
+ public JobStatus cancel(TaskContext taskcontext) {
// TODO - implement cancel with SSH Fork
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
index 0414bd3..0632ee8 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
@@ -244,16 +244,20 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
}
@Override
- public void cancel(TaskContext taskcontext) throws TaskException {
+ public JobStatus cancel(TaskContext taskcontext) throws TaskException {
ProcessContext processContext = taskcontext.getParentProcessContext();
RemoteCluster remoteCluster = processContext.getRemoteCluster();
JobModel jobModel = processContext.getJobModel();
+ JobStatus oldJobStatus = null;
if (jobModel != null) {
try {
- remoteCluster.cancelJob(jobModel.getJobId());
+ oldJobStatus = remoteCluster.cancelJob(jobModel.getJobId());
+ return oldJobStatus;
} catch (SSHApiException e) {
throw new TaskException("Error while cancelling job " + jobModel.getJobId());
}
+ } else {
+ throw new TaskException("Couldn't complete cancel operation, JobModel is null in ProcessContext.");
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 622c6c5..2ad29f2 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -126,8 +126,15 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
}
@Override
- public void stopMonitor(String jobId) {
- jobMonitorMap.remove(jobId);
+ public void stopMonitor(String jobId, boolean runOutflow) {
+ ProcessContext processContext = jobMonitorMap.remove(jobId);
+ if (processContext != null && runOutflow) {
+ try {
+ GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(processContext));
+ } catch (GFacException e) {
+ log.info("[EJM]: Error while running output tasks", e);
+ }
+ }
}
private JobStatusResult parse(Message message) throws MessagingException, AiravataException {