You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/05/13 17:05:26 UTC
airavata git commit: check iscancelled from registry experiment status
Repository: airavata
Updated Branches:
refs/heads/master ea93cc1ef -> ec9b6fe42
check iscancelled from registry experiment status
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ec9b6fe4
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ec9b6fe4
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ec9b6fe4
Branch: refs/heads/master
Commit: ec9b6fe42114f2cd7b0a411c40bb844217b169e9
Parents: ea93cc1
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Wed May 13 11:05:20 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Wed May 13 11:05:20 2015 -0400
----------------------------------------------------------------------
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 39 +++--
.../server/OrchestratorServerHandler.java | 143 +++++++++----------
2 files changed, 85 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/ec9b6fe4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 32317f3..5f0b1a1 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -59,6 +59,7 @@ import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePrefer
import org.apache.airavata.model.messaging.event.*;
import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
@@ -82,23 +83,13 @@ import java.util.*;
public class BetterGfacImpl implements GFac,Watcher {
private static final Logger log = LoggerFactory.getLogger(BetterGfacImpl.class);
public static final String ERROR_SENT = "ErrorSent";
-
private Registry registry;
- private AppCatalog appCatalog;
-
// we are not storing zk instance in to jobExecution context
private ZooKeeper zk;
-
private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
-
private static File gfacConfigFile;
-
private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
-
private static MonitorPublisher monitorPublisher;
-
- private boolean cancelled = false;
-
private static Integer mutex = -1;
/**
@@ -116,7 +107,6 @@ public class BetterGfacImpl implements GFac,Watcher {
synchronized (mutex) {
mutex.wait(5000); // waiting for the syncConnected event
}
- this.appCatalog = appCatalog;
}
public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher,RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
@@ -706,7 +696,7 @@ public class BetterGfacImpl implements GFac,Watcher {
// here we do not skip handler if some handler does not have to be
// run again during re-run it can implement
// that logic in to the handler
- if (!isCancelled()) {
+ if (!isCancelled(jobExecutionContext)) {
invokeInFlowHandlers(jobExecutionContext); // to keep the
// consistency we always
// try to re-run to
@@ -722,7 +712,7 @@ public class BetterGfacImpl implements GFac,Watcher {
// After executing the in handlers provider instance should be set
// to job execution context.
// We get the provider instance and execute it.
- if (!isCancelled()) {
+ if (!isCancelled(jobExecutionContext)) {
invokeProviderExecute(jobExecutionContext);
} else {
log.info("Experiment is cancelled, so launch operation is stopping immediately");
@@ -893,7 +883,7 @@ public class BetterGfacImpl implements GFac,Watcher {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.INHANDLERSINVOKING));
for (GFacHandlerConfig handlerClassName : handlers) {
- if(!isCancelled()) {
+ if(!isCancelled(jobExecutionContext)) {
Class<? extends GFacHandler> handlerClass;
GFacHandler handler;
try {
@@ -971,7 +961,7 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
for (GFacHandlerConfig handlerClassName : handlers) {
- if (!isCancelled()) {
+ if (!isCancelled(jobExecutionContext)) {
Class<? extends GFacHandler> handlerClass;
GFacHandler handler;
try {
@@ -1254,12 +1244,18 @@ public class BetterGfacImpl implements GFac,Watcher {
}
- public boolean isCancelled() {
- return cancelled;
- }
-
- public void setCancelled(boolean cancelled) {
- this.cancelled = cancelled;
+ public boolean isCancelled(JobExecutionContext executionContext) throws RegistryException {
+ // we should check whether experiment is cancelled using registry
+ ExperimentStatus status = (ExperimentStatus)registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
+ if (status != null){
+ ExperimentState experimentState = status.getExperimentState();
+ if (experimentState != null){
+ if(experimentState == ExperimentState.CANCELING || experimentState == ExperimentState.CANCELED){
+ return true;
+ }
+ }
+ }
+ return false;
}
public void process(WatchedEvent watchedEvent) {
@@ -1267,7 +1263,6 @@ public class BetterGfacImpl implements GFac,Watcher {
if (Event.EventType.NodeDataChanged.equals(watchedEvent.getType())) {
// node data is changed, this means node is cancelled.
log.info("Experiment is cancelled with this path:" + watchedEvent.getPath());
- this.cancelled = true;
}
synchronized (mutex) {
Event.KeeperState state = watchedEvent.getState();
http://git-wip-us.apache.org/repos/asf/airavata/blob/ec9b6fe4/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 3da1e47..f903373 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -525,52 +525,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
throw new OrchestratorException("Error retrieving the Experiment by the given experimentID: " + experimentId);
}
ExperimentState experimentState = experiment.getExperimentStatus().getExperimentState();
- if (experimentState.getValue()> 5 && experimentState.getValue()<10) {
- log.errorId(experimentId, "Unable to mark experiment as Cancelled, current state {} doesn't allow to cancel the experiment {}.",
- experiment.getExperimentStatus().getExperimentState().toString(), experimentId);
- throw new OrchestratorException("Unable to mark experiment as Cancelled, because current state is: "
- + experiment.getExperimentStatus().getExperimentState().toString());
- }else if(experimentState.getValue()<3){
- // when experiment status is < 3 no jobDetails object is created,
- // so we don't have to worry, we simply have to change the status and stop the execution
- ExperimentStatus status = new ExperimentStatus();
- status.setExperimentState(ExperimentState.CANCELED);
- status.setTimeOfStateChange(Calendar.getInstance()
- .getTimeInMillis());
- experiment.setExperimentStatus(status);
- registry.update(RegistryModelType.EXPERIMENT, experiment,
- experimentId);
- List<String> ids = registry.getIds(
- RegistryModelType.WORKFLOW_NODE_DETAIL,
- WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
- for (String workflowNodeId : ids) {
- WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry
- .get(RegistryModelType.WORKFLOW_NODE_DETAIL,
- workflowNodeId);
- WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus();
- workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED);
- workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
- .getTimeInMillis());
- workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
- registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail,
- workflowNodeId);
- List<Object> taskDetailList = registry.get(
- RegistryModelType.TASK_DETAIL,
- TaskDetailConstants.NODE_ID, workflowNodeId);
- for (Object o : taskDetailList) {
- TaskDetails taskDetails = (TaskDetails) o;
- TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus();
- taskStatus.setExecutionState(TaskState.CANCELED);
- taskStatus.setTimeOfStateChange(Calendar.getInstance()
- .getTimeInMillis());
- taskDetails.setTaskStatus(taskStatus);
- registry.update(RegistryModelType.TASK_DETAIL, o,
- taskDetails);
-// GFacUtils.setExperimentCancel(experimentId, taskDetails.getTaskID(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
- }
- }
- }else {
-
+ if (isCancelValid(experimentState)){
ExperimentStatus status = new ExperimentStatus();
status.setExperimentState(ExperimentState.CANCELING);
status.setTimeOfStateChange(Calendar.getInstance()
@@ -617,49 +572,87 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
taskDetails.setTaskStatus(taskStatus);
registry.update(RegistryModelType.TASK_DETAIL, o,
taskDetails.getTaskID());
-// GFacUtils.setExperimentCancel(experimentId, taskDetails.getTaskID(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
}
- // iterate through all the generated tasks and performs the
- // job submisssion+monitoring
- // launching the experiment
orchestrator.cancelExperiment(experiment,
workflowNodeDetail, taskDetails, tokenId);
-
- // after performing gfac level cancel operation
- // mark task cancelled
- taskStatus.setExecutionState(TaskState.CANCELED);
- taskStatus.setTimeOfStateChange(Calendar.getInstance()
- .getTimeInMillis());
- taskDetails.setTaskStatus(taskStatus);
- registry.update(RegistryModelType.TASK_DETAIL, o,
- taskDetails.getTaskID());
+ // Status update should be done at the monitor
}
- // mark workflownode cancelled
- WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus();
- workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED);
- workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
+ }
+ }else {
+ if (isCancelAllowed(experimentState)){
+ // when experiment status is < 3 no jobDetails object is created,
+ // so we don't have to worry, we simply have to change the status and stop the execution
+ ExperimentStatus status = new ExperimentStatus();
+ status.setExperimentState(ExperimentState.CANCELED);
+ status.setTimeOfStateChange(Calendar.getInstance()
.getTimeInMillis());
- workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
- registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail,
- workflowNodeId);
+ experiment.setExperimentStatus(status);
+ registry.update(RegistryModelType.EXPERIMENT, experiment,
+ experimentId);
+ List<String> ids = registry.getIds(
+ RegistryModelType.WORKFLOW_NODE_DETAIL,
+ WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
+ for (String workflowNodeId : ids) {
+ WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry
+ .get(RegistryModelType.WORKFLOW_NODE_DETAIL,
+ workflowNodeId);
+ WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus();
+ workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED);
+ workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
+ .getTimeInMillis());
+ workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
+ registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail,
+ workflowNodeId);
+ List<Object> taskDetailList = registry.get(
+ RegistryModelType.TASK_DETAIL,
+ TaskDetailConstants.NODE_ID, workflowNodeId);
+ for (Object o : taskDetailList) {
+ TaskDetails taskDetails = (TaskDetails) o;
+ TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus();
+ taskStatus.setExecutionState(TaskState.CANCELED);
+ taskStatus.setTimeOfStateChange(Calendar.getInstance()
+ .getTimeInMillis());
+ taskDetails.setTaskStatus(taskStatus);
+ registry.update(RegistryModelType.TASK_DETAIL, o,
+ taskDetails);
+ }
+ }
+ }else {
+ log.errorId(experimentId, "Unable to mark experiment as Cancelled, current state {} doesn't allow to cancel the experiment {}.",
+ experiment.getExperimentStatus().getExperimentState().toString(), experimentId);
+ throw new OrchestratorException("Unable to mark experiment as Cancelled, because current state is: "
+ + experiment.getExperimentStatus().getExperimentState().toString());
}
- // mark experiment cancelled
- status = new ExperimentStatus();
- status.setExperimentState(ExperimentState.CANCELED);
- status.setTimeOfStateChange(Calendar.getInstance()
- .getTimeInMillis());
- experiment.setExperimentStatus(status);
- registry.update(RegistryModelType.EXPERIMENT, experiment,
- experimentId);
}
log.info("Experiment: " + experimentId + " is cancelled !!!!!");
-
} catch (Exception e) {
throw new TException(e);
}
return true;
}
+ private boolean isCancelValid(ExperimentState state){
+ switch (state) {
+ case LAUNCHED:
+ case EXECUTING:
+ case CANCELING:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ private boolean isCancelAllowed(ExperimentState state){
+ switch (state) {
+ case CREATED:
+ case VALIDATED:
+ case SCHEDULED:
+ return true;
+ default:
+ return false;
+ }
+ }
+
private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken) throws TException {
try {
WorkflowEnactmentService.getInstance().