You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/09/23 19:59:04 UTC

airavata git commit: Handle cancel job operation in Queue state

Repository: airavata
Updated Branches:
  refs/heads/master dbb1a0fd2 -> 40a4daebb


Handle cancel job operation in Queue state


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/40a4daeb
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/40a4daeb
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/40a4daeb

Branch: refs/heads/master
Commit: 40a4daebb4dddbc7238e09d55aac16f4ecda6876
Parents: dbb1a0f
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Wed Sep 23 13:58:58 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Wed Sep 23 13:58:58 2015 -0400

----------------------------------------------------------------------
 .../apache/airavata/gfac/core/cluster/RemoteCluster.java |  2 +-
 .../apache/airavata/gfac/core/monitor/JobMonitor.java    |  5 ++---
 .../airavata/gfac/core/task/JobSubmissionTask.java       |  3 ++-
 .../org/apache/airavata/gfac/impl/GFacEngineImpl.java    | 11 ++++++++++-
 .../org/apache/airavata/gfac/impl/HPCRemoteCluster.java  |  5 +++--
 .../airavata/gfac/impl/task/LocalJobSubmissionTask.java  |  4 +++-
 .../gfac/impl/task/SSHForkJobSubmissionTask.java         |  3 ++-
 .../airavata/gfac/impl/task/SSHJobSubmissionTask.java    |  8 ++++++--
 .../airavata/gfac/monitor/email/EmailBasedMonitor.java   | 11 +++++++++--
 9 files changed, 38 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
index 932451b..3a03a6a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
@@ -84,7 +84,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
 	 * @return return the description of the deleted job
 	 * @throws SSHApiException throws exception during error
 	 */
-	public boolean cancelJob(String jobID) throws SSHApiException;
+	public JobStatus cancelJob(String jobID) throws SSHApiException;
 
 	/**
 	 * This will get the job status of the the job associated with this jobId

http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
index 64a9838..a909791 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
@@ -29,11 +29,10 @@ public interface JobMonitor {
 	 * @param jobId
 	 * @param processContext
 	 */
-	public void monitor(String jobId, ProcessContext processContext);
+	void monitor(String jobId, ProcessContext processContext);
 
 	/**
 	 * Stop monitoring for given jobId
-	 * @param jobId
 	 */
-	public void stopMonitor(String jobId);
+	void stopMonitor(String jobId, boolean runOutFlow);
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/JobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/JobSubmissionTask.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/JobSubmissionTask.java
index c18d9fc..a5a33ae 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/JobSubmissionTask.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/JobSubmissionTask.java
@@ -21,10 +21,11 @@
 package org.apache.airavata.gfac.core.task;
 
 import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.model.status.JobStatus;
 
 public interface JobSubmissionTask extends Task {
 
-	void cancel(TaskContext taskcontext) throws TaskException;
+	JobStatus cancel(TaskContext taskcontext) throws TaskException;
 
 }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 0e11d21..3af600b 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -31,6 +31,7 @@ import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.monitor.JobMonitor;
 import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
@@ -48,6 +49,8 @@ import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
 import org.apache.airavata.model.status.TaskState;
@@ -491,9 +494,15 @@ public class GFacEngineImpl implements GFacEngine {
 
 	private void executeCancel(TaskContext taskContext, JobSubmissionTask jSTask) throws GFacException {
 		try {
-			jSTask.cancel(taskContext);
+			JobStatus oldJobStatus = jSTask.cancel(taskContext);
+			if (oldJobStatus.getJobState() == JobState.QUEUED) {
+				JobMonitor monitorService = Factory.getMonitorService(taskContext.getParentProcessContext().getMonitorMode());
+				monitorService.stopMonitor(taskContext.getParentProcessContext().getJobModel().getJobId(),true);
+			}
 		} catch (TaskException e) {
 			throw new GFacException("Error while cancelling job");
+		} catch (AiravataException e) {
+			throw new GFacException("Error wile getting monitoring service");
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index 4325e27..2664d3f 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -190,12 +190,13 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 	}
 
 	@Override
-	public boolean cancelJob(String jobId) throws SSHApiException {
+	public JobStatus cancelJob(String jobId) throws SSHApiException {
+		JobStatus oldStatus = getJobStatus(jobId);
 		RawCommandInfo cancelCommand = jobManagerConfiguration.getCancelCommand(jobId);
 		StandardOutReader reader = new StandardOutReader();
 		executeCommand(cancelCommand, reader);
 		throwExceptionOnError(reader, cancelCommand);
-		return true;
+		return oldStatus;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
index 3626a54..7954961 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
@@ -35,6 +35,7 @@ import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
 import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
 import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskTypes;
@@ -182,7 +183,8 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{
 	}
 
 	@Override
-	public void cancel(TaskContext taskcontext) {
+	public JobStatus cancel(TaskContext taskcontext) {
 		// TODO - implement Local Job cancel
+		return null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
index 7224340..ce6a51b 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
@@ -175,7 +175,8 @@ public class SSHForkJobSubmissionTask implements JobSubmissionTask {
 	}
 
 	@Override
-	public void cancel(TaskContext taskcontext) {
+	public JobStatus cancel(TaskContext taskcontext) {
 		// TODO - implement cancel with SSH Fork
+		return null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
index 0414bd3..0632ee8 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
@@ -244,16 +244,20 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
 	}
 
 	@Override
-	public void cancel(TaskContext taskcontext) throws TaskException {
+	public JobStatus cancel(TaskContext taskcontext) throws TaskException {
 		ProcessContext processContext = taskcontext.getParentProcessContext();
 		RemoteCluster remoteCluster = processContext.getRemoteCluster();
 		JobModel jobModel = processContext.getJobModel();
+		JobStatus oldJobStatus = null;
 		if (jobModel != null) {
 			try {
-				remoteCluster.cancelJob(jobModel.getJobId());
+				oldJobStatus = remoteCluster.cancelJob(jobModel.getJobId());
+				return oldJobStatus;
 			} catch (SSHApiException e) {
 				throw new TaskException("Error while cancelling job " + jobModel.getJobId());
 			}
+		} else {
+			throw new TaskException("Couldn't complete cancel operation, JobModel is null in ProcessContext.");
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/40a4daeb/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 622c6c5..2ad29f2 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -126,8 +126,15 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
 	}
 
 	@Override
-	public void stopMonitor(String jobId) {
-		jobMonitorMap.remove(jobId);
+	public void stopMonitor(String jobId, boolean runOutflow) {
+		ProcessContext processContext = jobMonitorMap.remove(jobId);
+		if (processContext != null && runOutflow) {
+			try {
+				GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(processContext));
+			} catch (GFacException e) {
+				log.info("[EJM]: Error while running output tasks", e);
+			}
+		}
 	}
 
     private JobStatusResult parse(Message message) throws MessagingException, AiravataException {