You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/05/22 17:35:49 UTC

airavata git commit: cancelling check

Repository: airavata
Updated Branches:
  refs/heads/master 83feaecee -> db9918f89


cancelling check


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/db9918f8
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/db9918f8
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/db9918f8

Branch: refs/heads/master
Commit: db9918f890dec7a45d73dfb908604cbbbe954803
Parents: 83feaec
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Fri May 22 11:35:45 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Fri May 22 11:35:45 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 37 ++++++++++++++------
 .../airavata/gfac/core/utils/GFacUtils.java     | 10 ++++++
 2 files changed, 37 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/db9918f8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index d0db56c..688a35a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -568,6 +568,10 @@ public class BetterGfacImpl implements GFac,Watcher {
                     jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
                     throw new GFacException(e.getMessage(), e);
                 }
+            }else if (gfacExpState == GfacExperimentState.INHANDLERSINVOKING || gfacExpState == GfacExperimentState.INHANDLERSINVOKED || gfacExpState == GfacExperimentState.OUTHANDLERSINVOKING){
+                log.info("Experiment should be immedietly cancelled");
+                GFacUtils.updateExperimentStatus(jobExecutionContext.getExperimentID(), ExperimentState.CANCELED);
+
             }
             return true;
             }catch(Exception e){
@@ -684,13 +688,14 @@ public class BetterGfacImpl implements GFac,Watcher {
 			// here we do not skip handler if some handler does not have to be
 			// run again during re-run it can implement
 			// that logic in to the handler
-            if (!isCancelled(jobExecutionContext)) {
+            if (!isCancelling(jobExecutionContext)) {
                 invokeInFlowHandlers(jobExecutionContext); // to keep the
                 // consistency we always
                 // try to re-run to
                 // avoid complexity
             }else{
                 log.info("Experiment is cancelled, so launch operation is stopping immediately");
+                GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
                 return; // if the job is cancelled, status change is handled in cancel operation this thread simply has to be returned
             }
             // if (experimentID != null){
@@ -700,10 +705,11 @@ public class BetterGfacImpl implements GFac,Watcher {
 			// After executing the in handlers provider instance should be set
 			// to job execution context.
 			// We get the provider instance and execute it.
-            if (!isCancelled(jobExecutionContext)) {
+            if (!isCancelling(jobExecutionContext)) {
                 invokeProviderExecute(jobExecutionContext);
             } else {
                 log.info("Experiment is cancelled, so launch operation is stopping immediately");
+                GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
                 return;
             }
             } catch (Exception e) {
@@ -878,7 +884,7 @@ public class BetterGfacImpl implements GFac,Watcher {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                     , GfacExperimentState.INHANDLERSINVOKING));
             for (GFacHandlerConfig handlerClassName : handlers) {
-                if(!isCancelled(jobExecutionContext)) {
+                if(!isCancelling(jobExecutionContext)) {
                     Class<? extends GFacHandler> handlerClass;
                     GFacHandler handler;
                     try {
@@ -956,7 +962,7 @@ public class BetterGfacImpl implements GFac,Watcher {
             try {
                 monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
                 for (GFacHandlerConfig handlerClassName : handlers) {
-                    if (!isCancelled(jobExecutionContext)) {
+                    if (!isCancelling(jobExecutionContext)) {
                         Class<? extends GFacHandler> handlerClass;
                         GFacHandler handler;
                         try {
@@ -978,11 +984,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                             handler.invoke(jobExecutionContext);
                             GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
                         } catch (Exception e) {
-                            TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
-                                    jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                                    jobExecutionContext.getExperimentID(),
-                                    jobExecutionContext.getGatewayID());
-                            monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity));
+                            GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.FAILED);
                             try {
                                 StringWriter errors = new StringWriter();
                                 e.printStackTrace(new PrintWriter(errors));
@@ -993,6 +995,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                             throw new GFacException(e);
                         }
                     } else {
+                        GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
                         log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop");
                         break;
                     }
@@ -1222,7 +1225,21 @@ public class BetterGfacImpl implements GFac,Watcher {
         if (status != null){
             ExperimentState experimentState = status.getExperimentState();
             if (experimentState != null){
-                if(experimentState == ExperimentState.CANCELING || experimentState == ExperimentState.CANCELED){
+                if(experimentState == ExperimentState.CANCELED){
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    public boolean isCancelling(JobExecutionContext executionContext) throws RegistryException {
+        // check whether cancelling request came
+        ExperimentStatus status = (ExperimentStatus)registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
+        if (status != null){
+            ExperimentState experimentState = status.getExperimentState();
+            if (experimentState != null){
+                if(experimentState == ExperimentState.CANCELING){
                     return true;
                 }
             }

http://git-wip-us.apache.org/repos/asf/airavata/blob/db9918f8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 9e3a50f..0810bfd 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -44,6 +44,8 @@ import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
 import org.apache.airavata.model.appcatalog.computeresource.*;
 import org.apache.airavata.model.messaging.event.JobIdentifier;
 import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
 import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.ChildDataType;
@@ -779,4 +781,12 @@ public class GFacUtils {
         }
         return false;
     }
+
+    public static void publishTaskStatus (JobExecutionContext jobExecutionContext, MonitorPublisher publisher, TaskState state){
+        TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                jobExecutionContext.getExperimentID(),
+                jobExecutionContext.getGatewayID());
+        publisher.publish(new TaskStatusChangeRequestEvent(state, taskIdentity));
+    }
 }