You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/05/22 17:35:49 UTC
airavata git commit: cancelling check
Repository: airavata
Updated Branches:
refs/heads/master 83feaecee -> db9918f89
cancelling check
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/db9918f8
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/db9918f8
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/db9918f8
Branch: refs/heads/master
Commit: db9918f890dec7a45d73dfb908604cbbbe954803
Parents: 83feaec
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Fri May 22 11:35:45 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Fri May 22 11:35:45 2015 -0400
----------------------------------------------------------------------
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 37 ++++++++++++++------
.../airavata/gfac/core/utils/GFacUtils.java | 10 ++++++
2 files changed, 37 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/db9918f8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index d0db56c..688a35a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -568,6 +568,10 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
throw new GFacException(e.getMessage(), e);
}
+ }else if (gfacExpState == GfacExperimentState.INHANDLERSINVOKING || gfacExpState == GfacExperimentState.INHANDLERSINVOKED || gfacExpState == GfacExperimentState.OUTHANDLERSINVOKING){
+ log.info("Experiment should be immedietly cancelled");
+ GFacUtils.updateExperimentStatus(jobExecutionContext.getExperimentID(), ExperimentState.CANCELED);
+
}
return true;
}catch(Exception e){
@@ -684,13 +688,14 @@ public class BetterGfacImpl implements GFac,Watcher {
// here we do not skip handler if some handler does not have to be
// run again during re-run it can implement
// that logic in to the handler
- if (!isCancelled(jobExecutionContext)) {
+ if (!isCancelling(jobExecutionContext)) {
invokeInFlowHandlers(jobExecutionContext); // to keep the
// consistency we always
// try to re-run to
// avoid complexity
}else{
log.info("Experiment is cancelled, so launch operation is stopping immediately");
+ GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
return; // if the job is cancelled, status change is handled in cancel operation this thread simply has to be returned
}
// if (experimentID != null){
@@ -700,10 +705,11 @@ public class BetterGfacImpl implements GFac,Watcher {
// After executing the in handlers provider instance should be set
// to job execution context.
// We get the provider instance and execute it.
- if (!isCancelled(jobExecutionContext)) {
+ if (!isCancelling(jobExecutionContext)) {
invokeProviderExecute(jobExecutionContext);
} else {
log.info("Experiment is cancelled, so launch operation is stopping immediately");
+ GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
return;
}
} catch (Exception e) {
@@ -878,7 +884,7 @@ public class BetterGfacImpl implements GFac,Watcher {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.INHANDLERSINVOKING));
for (GFacHandlerConfig handlerClassName : handlers) {
- if(!isCancelled(jobExecutionContext)) {
+ if(!isCancelling(jobExecutionContext)) {
Class<? extends GFacHandler> handlerClass;
GFacHandler handler;
try {
@@ -956,7 +962,7 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
for (GFacHandlerConfig handlerClassName : handlers) {
- if (!isCancelled(jobExecutionContext)) {
+ if (!isCancelling(jobExecutionContext)) {
Class<? extends GFacHandler> handlerClass;
GFacHandler handler;
try {
@@ -978,11 +984,7 @@ public class BetterGfacImpl implements GFac,Watcher {
handler.invoke(jobExecutionContext);
GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
} catch (Exception e) {
- TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getExperimentID(),
- jobExecutionContext.getGatewayID());
- monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity));
+ GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.FAILED);
try {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
@@ -993,6 +995,7 @@ public class BetterGfacImpl implements GFac,Watcher {
throw new GFacException(e);
}
} else {
+ GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop");
break;
}
@@ -1222,7 +1225,21 @@ public class BetterGfacImpl implements GFac,Watcher {
if (status != null){
ExperimentState experimentState = status.getExperimentState();
if (experimentState != null){
- if(experimentState == ExperimentState.CANCELING || experimentState == ExperimentState.CANCELED){
+ if(experimentState == ExperimentState.CANCELED){
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public boolean isCancelling(JobExecutionContext executionContext) throws RegistryException {
+ // check whether cancelling request came
+ ExperimentStatus status = (ExperimentStatus)registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
+ if (status != null){
+ ExperimentState experimentState = status.getExperimentState();
+ if (experimentState != null){
+ if(experimentState == ExperimentState.CANCELING){
return true;
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/db9918f8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 9e3a50f..0810bfd 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -44,6 +44,8 @@ import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
import org.apache.airavata.model.appcatalog.computeresource.*;
import org.apache.airavata.model.messaging.event.JobIdentifier;
import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.ChildDataType;
@@ -779,4 +781,12 @@ public class GFacUtils {
}
return false;
}
+
+ public static void publishTaskStatus (JobExecutionContext jobExecutionContext, MonitorPublisher publisher, TaskState state){
+ TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ publisher.publish(new TaskStatusChangeRequestEvent(state, taskIdentity));
+ }
}