You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/04/10 19:13:47 UTC
[airavata] branch develop updated: Handling externally cancelled
Jobs
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push:
new e26b66c Handling externally cancelled Jobs
e26b66c is described below
commit e26b66c4b5fe0912c9992ef1baefa2f364469377
Author: dimuthu <di...@gmail.com>
AuthorDate: Tue Apr 10 15:13:40 2018 -0400
Handling externally cancelled Jobs
---
.../helix/impl/workflow/PostWorkflowManager.java | 12 ++++++---
.../helix/impl/workflow/PreWorkflowManager.java | 30 +++-------------------
.../airavata/helix/core/util/MonitoringUtil.java | 21 +++++++++++++++
3 files changed, 34 insertions(+), 29 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 9a8a1f3..3f86db5 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -114,12 +114,14 @@ public class PostWorkflowManager extends WorkflowManager {
logger.info("Updating the job status for job id : " + jobStatusResult.getJobId() + " with process id "
+ processId + ", exp id " + experimentId + ", gateway " + gateway + " and status " + jobStatusResult.getState().name());
+
saveAndPublishJobStatus(jobStatusResult.getJobId(), task, processId, experimentId, gateway, jobStatusResult.getState());
// TODO get cluster lock before that
- if ("cancel".equals(processStatus)) {
+ if (MonitoringUtil.CANCEL.equals(processStatus)) {
logger.info("Cancelled post workflow for process " + processId + " in experiment " + experimentId);
// This will mark an cancelling Experiment into a cancelled status for a set of valid job statuses
+ // This is a safety check. Cancellation is originally handled in Job Cancellation Workflow
switch (jobStatusResult.getState()) {
case FAILED:
case SUSPENDED:
@@ -222,9 +224,13 @@ public class PostWorkflowManager extends WorkflowManager {
logger.error("Failed to save workflow " + workflowName + " of process " + processId + " in zookeeper registry. " +
"This will affect cancellation tasks", e);
}
+
} else if (jobStatusResult.getState() == JobState.CANCELED) {
- logger.info("Job " + jobStatusResult.getJobId() + " was externally cancelled");
- //
+ logger.info("Job " + jobStatusResult.getJobId() + " was externally cancelled but process is not marked as cancelled yet");
+ MonitoringUtil.registerCancelProcess(getCuratorClient(), processId);
+ publishProcessStatus(processId, experimentId,gateway, ProcessState.CANCELED);
+ logger.info("Marked process " + processId + " of experiment " + experimentId + " as cancelled as job is already being cancelled");
+
} else if (jobStatusResult.getState() == JobState.SUBMITTED) {
logger.info("Job " + jobStatusResult.getJobId() + " was submitted");
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index dce98c3..a0a2f67 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -26,6 +26,7 @@ import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.helix.core.AbstractTask;
import org.apache.airavata.helix.core.OutPort;
+import org.apache.airavata.helix.core.util.MonitoringUtil;
import org.apache.airavata.helix.impl.task.AiravataTask;
import org.apache.airavata.helix.impl.task.cancel.CancelCompletingTask;
import org.apache.airavata.helix.impl.task.cancel.RemoteJobCancellationTask;
@@ -71,29 +72,6 @@ public class PreWorkflowManager extends WorkflowManager {
this.subscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(), routingKeys, Type.PROCESS_LAUNCH);
}
- private void registerWorkflow(String processId, String workflowId) throws Exception {
- getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
- "/registry/" + processId + "/workflows/" + workflowId , new byte[0]);
- }
-
- private void registerCancelProcess(String processId) throws Exception {
- String path = "/registry/" + processId + "/status";
- if (getCuratorClient().checkExists().forPath(path) != null) {
- getCuratorClient().delete().forPath(path);
- }
- getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
- path , "cancel".getBytes());
- }
-
- private List<String> getWorkflowsOfProcess(String processId) throws Exception {
- String path = "/registry/" + processId + "/workflows";
- if (getCuratorClient().checkExists().forPath(path) != null) {
- return getCuratorClient().getChildren().forPath(path);
- } else {
- return null;
- }
- }
-
private String createAndLaunchPreWorkflow(String processId) throws Exception {
RegistryService.Client registryClient = getRegistryClientPool().getResource();
@@ -153,7 +131,7 @@ public class PreWorkflowManager extends WorkflowManager {
String workflowName = getWorkflowOperator().launchWorkflow(processId + "-PRE-" + UUID.randomUUID().toString(),
new ArrayList<>(allTasks), true, false);
try {
- registerWorkflow(processId, workflowName);
+ MonitoringUtil.registerWorkflow(getCuratorClient(), processId, workflowName);
} catch (Exception e) {
logger.error("Failed to save workflow " + workflowName + " of process " + processId + " in zookeeper registry. " +
"This will affect cancellation tasks", e);
@@ -179,8 +157,8 @@ public class PreWorkflowManager extends WorkflowManager {
String experimentId = processModel.getExperimentId();
- registerCancelProcess(processId);
- List<String> workflows = getWorkflowsOfProcess(processId);
+ MonitoringUtil.registerCancelProcess(getCuratorClient(), processId);
+ List<String> workflows = MonitoringUtil.getWorkflowsOfProcess(getCuratorClient(), processId);
final List<AbstractTask> allTasks = new ArrayList<>();
if (workflows != null && workflows.size() > 0) {
for (String wf : workflows) {
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
index 1a77ad0..e427c16 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
@@ -7,6 +7,8 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
public class MonitoringUtil {
private final static Logger logger = LoggerFactory.getLogger(MonitoringUtil.class);
@@ -26,6 +28,7 @@ public class MonitoringUtil {
private static final String JOB_NAME = "/jobName";
private static final String WORKFLOWS = "/workflows";
+ public static final String CANCEL = "cancel";
public static String getExperimentIdByJobId(CuratorFramework curatorClient, String jobId) throws Exception {
String path = MONITORING + jobId + EXPERIMENT;
@@ -134,4 +137,22 @@ public class MonitoringUtil {
return null;
}
}
+
+ public static void registerCancelProcess(CuratorFramework curatorClient, String processId) throws Exception {
+ String path = REGISTRY + processId + STATUS;
+ if (curatorClient.checkExists().forPath(path) != null) {
+ curatorClient.delete().forPath(path);
+ }
+ curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+ path , CANCEL.getBytes());
+ }
+
+ public static List<String> getWorkflowsOfProcess(CuratorFramework curatorClient, String processId) throws Exception {
+ String path = REGISTRY + processId + WORKFLOWS;
+ if (curatorClient.checkExists().forPath(path) != null) {
+ return curatorClient.getChildren().forPath(path);
+ } else {
+ return null;
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.