You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/04/10 19:13:47 UTC

[airavata] branch develop updated: Handling externally cancelled Jobs

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new e26b66c  Handling externally cancelled Jobs
e26b66c is described below

commit e26b66c4b5fe0912c9992ef1baefa2f364469377
Author: dimuthu <di...@gmail.com>
AuthorDate: Tue Apr 10 15:13:40 2018 -0400

    Handling externally cancelled Jobs
---
 .../helix/impl/workflow/PostWorkflowManager.java   | 12 ++++++---
 .../helix/impl/workflow/PreWorkflowManager.java    | 30 +++-------------------
 .../airavata/helix/core/util/MonitoringUtil.java   | 21 +++++++++++++++
 3 files changed, 34 insertions(+), 29 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 9a8a1f3..3f86db5 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -114,12 +114,14 @@ public class PostWorkflowManager extends WorkflowManager {
 
                 logger.info("Updating the job status for job id : " + jobStatusResult.getJobId() + " with process id "
                         + processId + ", exp id " + experimentId + ", gateway " + gateway + " and status " + jobStatusResult.getState().name());
+
                 saveAndPublishJobStatus(jobStatusResult.getJobId(), task, processId, experimentId, gateway, jobStatusResult.getState());
 
                 // TODO get cluster lock before that
-                if ("cancel".equals(processStatus)) {
+                if (MonitoringUtil.CANCEL.equals(processStatus)) {
                     logger.info("Cancelled post workflow for process " + processId + " in experiment " + experimentId);
                     // This will mark an cancelling Experiment into a cancelled status for a set of valid job statuses
+                    // This is a safety check. Cancellation is originally handled in Job Cancellation Workflow
                     switch (jobStatusResult.getState()) {
                         case FAILED:
                         case SUSPENDED:
@@ -222,9 +224,13 @@ public class PostWorkflowManager extends WorkflowManager {
                             logger.error("Failed to save workflow " + workflowName + " of process " + processId + " in zookeeper registry. " +
                                     "This will affect cancellation tasks", e);
                         }
+
                     } else if (jobStatusResult.getState() == JobState.CANCELED) {
-                        logger.info("Job " + jobStatusResult.getJobId() + " was externally cancelled");
-                        //
+                        logger.info("Job " + jobStatusResult.getJobId() + " was externally cancelled but process is not marked as cancelled yet");
+                        MonitoringUtil.registerCancelProcess(getCuratorClient(), processId);
+                        publishProcessStatus(processId, experimentId,gateway, ProcessState.CANCELED);
+                        logger.info("Marked process " + processId + " of experiment " + experimentId + " as cancelled as job is already being cancelled");
+
                     } else if (jobStatusResult.getState() == JobState.SUBMITTED) {
                         logger.info("Job " + jobStatusResult.getJobId() + " was submitted");
 
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index dce98c3..a0a2f67 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -26,6 +26,7 @@ import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.helix.core.AbstractTask;
 import org.apache.airavata.helix.core.OutPort;
+import org.apache.airavata.helix.core.util.MonitoringUtil;
 import org.apache.airavata.helix.impl.task.AiravataTask;
 import org.apache.airavata.helix.impl.task.cancel.CancelCompletingTask;
 import org.apache.airavata.helix.impl.task.cancel.RemoteJobCancellationTask;
@@ -71,29 +72,6 @@ public class PreWorkflowManager extends WorkflowManager {
         this.subscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(), routingKeys, Type.PROCESS_LAUNCH);
     }
 
-    private void registerWorkflow(String processId, String workflowId) throws Exception {
-        getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                "/registry/" + processId + "/workflows/" + workflowId , new byte[0]);
-    }
-
-    private void registerCancelProcess(String processId) throws Exception {
-        String path = "/registry/" + processId + "/status";
-        if (getCuratorClient().checkExists().forPath(path) != null) {
-            getCuratorClient().delete().forPath(path);
-        }
-        getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
-                path , "cancel".getBytes());
-    }
-
-    private List<String> getWorkflowsOfProcess(String processId) throws Exception {
-        String path = "/registry/" + processId + "/workflows";
-        if (getCuratorClient().checkExists().forPath(path) != null) {
-            return getCuratorClient().getChildren().forPath(path);
-        } else {
-            return null;
-        }
-    }
-
     private String createAndLaunchPreWorkflow(String processId) throws Exception {
 
         RegistryService.Client registryClient = getRegistryClientPool().getResource();
@@ -153,7 +131,7 @@ public class PreWorkflowManager extends WorkflowManager {
         String workflowName = getWorkflowOperator().launchWorkflow(processId + "-PRE-" + UUID.randomUUID().toString(),
                 new ArrayList<>(allTasks), true, false);
         try {
-            registerWorkflow(processId, workflowName);
+            MonitoringUtil.registerWorkflow(getCuratorClient(), processId, workflowName);
         } catch (Exception e) {
             logger.error("Failed to save workflow " + workflowName + " of process " + processId + " in zookeeper registry. " +
                     "This will affect cancellation tasks", e);
@@ -179,8 +157,8 @@ public class PreWorkflowManager extends WorkflowManager {
 
         String experimentId = processModel.getExperimentId();
 
-        registerCancelProcess(processId);
-        List<String> workflows = getWorkflowsOfProcess(processId);
+        MonitoringUtil.registerCancelProcess(getCuratorClient(), processId);
+        List<String> workflows = MonitoringUtil.getWorkflowsOfProcess(getCuratorClient(), processId);
         final List<AbstractTask> allTasks = new ArrayList<>();
         if (workflows != null && workflows.size() > 0) {
             for (String wf : workflows) {
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
index 1a77ad0..e427c16 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/MonitoringUtil.java
@@ -7,6 +7,8 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 public class MonitoringUtil {
 
     private final static Logger logger = LoggerFactory.getLogger(MonitoringUtil.class);
@@ -26,6 +28,7 @@ public class MonitoringUtil {
     private static final String JOB_NAME = "/jobName";
     private static final String WORKFLOWS = "/workflows";
 
+    public static final String CANCEL = "cancel";
 
     public static String getExperimentIdByJobId(CuratorFramework curatorClient, String jobId) throws Exception {
         String path = MONITORING + jobId + EXPERIMENT;
@@ -134,4 +137,22 @@ public class MonitoringUtil {
             return null;
         }
     }
+
+    public static void registerCancelProcess(CuratorFramework curatorClient, String processId) throws Exception {
+        String path = REGISTRY + processId + STATUS;
+        if (curatorClient.checkExists().forPath(path) != null) {
+            curatorClient.delete().forPath(path);
+        }
+        curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(
+                path , CANCEL.getBytes());
+    }
+
+    public static List<String> getWorkflowsOfProcess(CuratorFramework curatorClient, String processId) throws Exception {
+        String path = REGISTRY + processId + WORKFLOWS;
+        if (curatorClient.checkExists().forPath(path) != null) {
+            return curatorClient.getChildren().forPath(path);
+        } else {
+            return null;
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.