You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/05/13 17:05:26 UTC

airavata git commit: check iscancelled from registry experiment status

Repository: airavata
Updated Branches:
  refs/heads/master ea93cc1ef -> ec9b6fe42


check iscancelled from registry experiment status


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ec9b6fe4
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ec9b6fe4
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ec9b6fe4

Branch: refs/heads/master
Commit: ec9b6fe42114f2cd7b0a411c40bb844217b169e9
Parents: ea93cc1
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Wed May 13 11:05:20 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Wed May 13 11:05:20 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  39 +++--
 .../server/OrchestratorServerHandler.java       | 143 +++++++++----------
 2 files changed, 85 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/ec9b6fe4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 32317f3..5f0b1a1 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -59,6 +59,7 @@ import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePrefer
 import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
 import org.apache.airavata.registry.cpi.RegistryModelType;
 import org.apache.zookeeper.*;
 import org.apache.zookeeper.data.Stat;
@@ -82,23 +83,13 @@ import java.util.*;
 public class BetterGfacImpl implements GFac,Watcher {
     private static final Logger log = LoggerFactory.getLogger(BetterGfacImpl.class);
     public static final String ERROR_SENT = "ErrorSent";
-
     private Registry registry;
-    private AppCatalog appCatalog;
-
     // we are not storing zk instance in to jobExecution context
     private ZooKeeper zk;
-
     private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
-
     private static File gfacConfigFile;
-
     private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
-
     private static MonitorPublisher monitorPublisher;
-
-    private boolean cancelled = false;
-
     private static Integer mutex = -1;
 
     /**
@@ -116,7 +107,6 @@ public class BetterGfacImpl implements GFac,Watcher {
         synchronized (mutex) {
             mutex.wait(5000);  // waiting for the syncConnected event
         }
-        this.appCatalog = appCatalog;
     }
 
     public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher,RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
@@ -706,7 +696,7 @@ public class BetterGfacImpl implements GFac,Watcher {
 			// here we do not skip handler if some handler does not have to be
 			// run again during re-run it can implement
 			// that logic in to the handler
-            if (!isCancelled()) {
+            if (!isCancelled(jobExecutionContext)) {
                 invokeInFlowHandlers(jobExecutionContext); // to keep the
                 // consistency we always
                 // try to re-run to
@@ -722,7 +712,7 @@ public class BetterGfacImpl implements GFac,Watcher {
 			// After executing the in handlers provider instance should be set
 			// to job execution context.
 			// We get the provider instance and execute it.
-            if (!isCancelled()) {
+            if (!isCancelled(jobExecutionContext)) {
                 invokeProviderExecute(jobExecutionContext);
             } else {
                 log.info("Experiment is cancelled, so launch operation is stopping immediately");
@@ -893,7 +883,7 @@ public class BetterGfacImpl implements GFac,Watcher {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                     , GfacExperimentState.INHANDLERSINVOKING));
             for (GFacHandlerConfig handlerClassName : handlers) {
-                if(!isCancelled()) {
+                if(!isCancelled(jobExecutionContext)) {
                     Class<? extends GFacHandler> handlerClass;
                     GFacHandler handler;
                     try {
@@ -971,7 +961,7 @@ public class BetterGfacImpl implements GFac,Watcher {
             try {
                 monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
                 for (GFacHandlerConfig handlerClassName : handlers) {
-                    if (!isCancelled()) {
+                    if (!isCancelled(jobExecutionContext)) {
                         Class<? extends GFacHandler> handlerClass;
                         GFacHandler handler;
                         try {
@@ -1254,12 +1244,18 @@ public class BetterGfacImpl implements GFac,Watcher {
     }
 
 
-    public boolean isCancelled() {
-        return cancelled;
-    }
-
-    public void setCancelled(boolean cancelled) {
-        this.cancelled = cancelled;
+    public boolean isCancelled(JobExecutionContext executionContext) throws RegistryException {
+        // we should check whether experiment is cancelled using registry
+        ExperimentStatus status = (ExperimentStatus)registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
+        if (status != null){
+            ExperimentState experimentState = status.getExperimentState();
+            if (experimentState != null){
+                if(experimentState == ExperimentState.CANCELING || experimentState == ExperimentState.CANCELED){
+                    return true;
+                }
+            }
+        }
+        return false;
     }
 
     public void process(WatchedEvent watchedEvent) {
@@ -1267,7 +1263,6 @@ public class BetterGfacImpl implements GFac,Watcher {
         if (Event.EventType.NodeDataChanged.equals(watchedEvent.getType())) {
             // node data is changed, this means node is cancelled.
             log.info("Experiment is cancelled with this path:" + watchedEvent.getPath());
-            this.cancelled = true;
         }
         synchronized (mutex) {
             Event.KeeperState state = watchedEvent.getState();

http://git-wip-us.apache.org/repos/asf/airavata/blob/ec9b6fe4/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 3da1e47..f903373 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -525,52 +525,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
                 throw new OrchestratorException("Error retrieving the Experiment by the given experimentID: " + experimentId);
             }
             ExperimentState experimentState = experiment.getExperimentStatus().getExperimentState();
-            if (experimentState.getValue()> 5 && experimentState.getValue()<10) {
-                log.errorId(experimentId, "Unable to mark experiment as Cancelled, current state {} doesn't allow to cancel the experiment {}.",
-                        experiment.getExperimentStatus().getExperimentState().toString(), experimentId);
-                throw new OrchestratorException("Unable to mark experiment as Cancelled, because current state is: "
-                        + experiment.getExperimentStatus().getExperimentState().toString());
-            }else if(experimentState.getValue()<3){
-                // when experiment status is < 3 no jobDetails object is created,
-                // so we don't have to worry, we simply have to change the status and stop the execution
-                ExperimentStatus status = new ExperimentStatus();
-                status.setExperimentState(ExperimentState.CANCELED);
-                status.setTimeOfStateChange(Calendar.getInstance()
-                        .getTimeInMillis());
-                experiment.setExperimentStatus(status);
-                registry.update(RegistryModelType.EXPERIMENT, experiment,
-                        experimentId);
-                List<String> ids = registry.getIds(
-                        RegistryModelType.WORKFLOW_NODE_DETAIL,
-                        WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
-                for (String workflowNodeId : ids) {
-                    WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry
-                            .get(RegistryModelType.WORKFLOW_NODE_DETAIL,
-                                    workflowNodeId);
-                    WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus();
-                    workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED);
-                    workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
-                            .getTimeInMillis());
-                    workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
-                    registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail,
-                            workflowNodeId);
-                    List<Object> taskDetailList = registry.get(
-                            RegistryModelType.TASK_DETAIL,
-                            TaskDetailConstants.NODE_ID, workflowNodeId);
-                    for (Object o : taskDetailList) {
-                        TaskDetails taskDetails = (TaskDetails) o;
-                        TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus();
-                        taskStatus.setExecutionState(TaskState.CANCELED);
-                        taskStatus.setTimeOfStateChange(Calendar.getInstance()
-                                .getTimeInMillis());
-                        taskDetails.setTaskStatus(taskStatus);
-                        registry.update(RegistryModelType.TASK_DETAIL, o,
-                                taskDetails);
-//                        GFacUtils.setExperimentCancel(experimentId, taskDetails.getTaskID(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
-                    }
-                }
-            }else {
-
+            if (isCancelValid(experimentState)){
                 ExperimentStatus status = new ExperimentStatus();
                 status.setExperimentState(ExperimentState.CANCELING);
                 status.setTimeOfStateChange(Calendar.getInstance()
@@ -617,49 +572,87 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
                             taskDetails.setTaskStatus(taskStatus);
                             registry.update(RegistryModelType.TASK_DETAIL, o,
                                     taskDetails.getTaskID());
-//                            GFacUtils.setExperimentCancel(experimentId, taskDetails.getTaskID(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
                         }
-                        // iterate through all the generated tasks and performs the
-                        // job submisssion+monitoring
-                        // launching the experiment
                         orchestrator.cancelExperiment(experiment,
                                 workflowNodeDetail, taskDetails, tokenId);
-
-                        // after performing gfac level cancel operation
-                        // mark task cancelled
-                        taskStatus.setExecutionState(TaskState.CANCELED);
-                        taskStatus.setTimeOfStateChange(Calendar.getInstance()
-                                .getTimeInMillis());
-                        taskDetails.setTaskStatus(taskStatus);
-                        registry.update(RegistryModelType.TASK_DETAIL, o,
-                                taskDetails.getTaskID());
+                        // Status update should be done at the monitor
                     }
-                    // mark workflownode cancelled
-                    WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus();
-                    workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED);
-                    workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
+                }
+            }else {
+                if (isCancelAllowed(experimentState)){
+                    // when experiment status is < 3 no jobDetails object is created,
+                    // so we don't have to worry, we simply have to change the status and stop the execution
+                    ExperimentStatus status = new ExperimentStatus();
+                    status.setExperimentState(ExperimentState.CANCELED);
+                    status.setTimeOfStateChange(Calendar.getInstance()
                             .getTimeInMillis());
-                    workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
-                    registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail,
-                            workflowNodeId);
+                    experiment.setExperimentStatus(status);
+                    registry.update(RegistryModelType.EXPERIMENT, experiment,
+                            experimentId);
+                    List<String> ids = registry.getIds(
+                            RegistryModelType.WORKFLOW_NODE_DETAIL,
+                            WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
+                    for (String workflowNodeId : ids) {
+                        WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry
+                                .get(RegistryModelType.WORKFLOW_NODE_DETAIL,
+                                        workflowNodeId);
+                        WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus();
+                        workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED);
+                        workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
+                                .getTimeInMillis());
+                        workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
+                        registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail,
+                                workflowNodeId);
+                        List<Object> taskDetailList = registry.get(
+                                RegistryModelType.TASK_DETAIL,
+                                TaskDetailConstants.NODE_ID, workflowNodeId);
+                        for (Object o : taskDetailList) {
+                            TaskDetails taskDetails = (TaskDetails) o;
+                            TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus();
+                            taskStatus.setExecutionState(TaskState.CANCELED);
+                            taskStatus.setTimeOfStateChange(Calendar.getInstance()
+                                    .getTimeInMillis());
+                            taskDetails.setTaskStatus(taskStatus);
+                            registry.update(RegistryModelType.TASK_DETAIL, o,
+                                    taskDetails);
+                        }
+                    }
+                }else {
+                    log.errorId(experimentId, "Unable to mark experiment as Cancelled, current state {} doesn't allow to cancel the experiment {}.",
+                            experiment.getExperimentStatus().getExperimentState().toString(), experimentId);
+                    throw new OrchestratorException("Unable to mark experiment as Cancelled, because current state is: "
+                            + experiment.getExperimentStatus().getExperimentState().toString());
                 }
-                // mark experiment cancelled
-                status = new ExperimentStatus();
-                status.setExperimentState(ExperimentState.CANCELED);
-                status.setTimeOfStateChange(Calendar.getInstance()
-                        .getTimeInMillis());
-                experiment.setExperimentStatus(status);
-                registry.update(RegistryModelType.EXPERIMENT, experiment,
-                        experimentId);
             }
             log.info("Experiment: " + experimentId + " is cancelled !!!!!");
-
         } catch (Exception e) {
             throw new TException(e);
         }
         return true;
     }
 
+    private boolean isCancelValid(ExperimentState state){
+        switch (state) {
+            case LAUNCHED:
+            case EXECUTING:
+            case CANCELING:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    private boolean isCancelAllowed(ExperimentState state){
+        switch (state) {
+            case CREATED:
+            case VALIDATED:
+            case SCHEDULED:
+                return true;
+            default:
+                return false;
+        }
+    }
+
     private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken) throws TException {
         try {
             WorkflowEnactmentService.getInstance().