You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2022/01/14 03:49:11 UTC

[airavata] branch ultrascan-mainteinence updated: Gracefully cleaning up the helix manager to avoid stale zookeeper connections

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch ultrascan-mainteinence
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/ultrascan-mainteinence by this push:
     new 77fe909  Gracefully cleaning up the helix manager to avoid stale zookeeper connections
77fe909 is described below

commit 77fe9095a311120c793e26d57996178a789dacc3
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Jan 13 22:49:04 2022 -0500

    Gracefully cleaning up the helix manager to avoid stale zookeeper connections
---
 .../impl/task/cancel/WorkflowCancellationTask.java | 72 +++++++++++++---------
 1 file changed, 43 insertions(+), 29 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
index 4ee3949..a05bb63 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
@@ -49,6 +49,17 @@ public class WorkflowCancellationTask extends AbstractTask {
         } catch (Exception e) {
             logger.error("Failed to build Helix Task driver in " + taskName, e);
             throw new RuntimeException("Failed to build Helix Task driver in " + taskName, e);
+        } finally {
+
+            try {
+                if (helixManager != null) {
+                    if (helixManager.isConnected()) {
+                        helixManager.disconnect();
+                    }
+                }
+            } catch (Exception e) {
+                logger.warn("Failed to disconnect helix manager", e);
+            }
         }
     }
 
@@ -56,43 +67,46 @@ public class WorkflowCancellationTask extends AbstractTask {
     public TaskResult onRun(TaskHelper helper) {
         logger.info("Cancelling workflow " + cancellingWorkflowName);
 
-        if (taskDriver.getWorkflowConfig(cancellingWorkflowName) == null) {
-            // Workflow could be already deleted by cleanup agents
-            logger.warn("Can not find a workflow with name " + cancellingWorkflowName + " but continuing");
-            return onSuccess("Can not find a workflow with name " + cancellingWorkflowName + " but continuing");
-        }
-
         try {
-            WorkflowContext workflowContext = taskDriver.getWorkflowContext(cancellingWorkflowName);
-
-            // if the workflow can not be found, ignore it
-            if (workflowContext == null) {
-                logger.warn("Can not find a workflow with id " + cancellingWorkflowName + ". So ignoring");
-                return onSuccess("Can not find a workflow with id " + cancellingWorkflowName + ". So ignoring");
+            if (taskDriver.getWorkflowConfig(cancellingWorkflowName) == null) {
+                // Workflow could be already deleted by cleanup agents
+                logger.warn("Can not find a workflow with name " + cancellingWorkflowName + " but continuing");
+                return onSuccess("Can not find a workflow with name " + cancellingWorkflowName + " but continuing");
             }
 
-            TaskState workflowState = workflowContext.getWorkflowState();
-            logger.info("Current state of workflow " + cancellingWorkflowName + " : " + workflowState.name());
+            try {
+                WorkflowContext workflowContext = taskDriver.getWorkflowContext(cancellingWorkflowName);
 
-            taskDriver.stop(cancellingWorkflowName);
+                // if the workflow can not be found, ignore it
+                if (workflowContext == null) {
+                    logger.warn("Can not find a workflow with id " + cancellingWorkflowName + ". So ignoring");
+                    return onSuccess("Can not find a workflow with id " + cancellingWorkflowName + ". So ignoring");
+                }
 
-        } catch (Exception e) {
-            logger.error("Failed to stop workflow " + cancellingWorkflowName, e);
-            // in case of an error, retry
-            return onFail("Failed to stop workflow " + cancellingWorkflowName + ": " + e.getMessage(), false);
-        }
+                TaskState workflowState = workflowContext.getWorkflowState();
+                logger.info("Current state of workflow " + cancellingWorkflowName + " : " + workflowState.name());
 
-        try {
-            logger.info("Waiting maximum " + waitTime +"s for workflow " + cancellingWorkflowName + " state to change");
-            TaskState newWorkflowState = taskDriver.pollForWorkflowState(cancellingWorkflowName, waitTime * 1000,
-                    TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED, TaskState.NOT_STARTED);
+                taskDriver.stop(cancellingWorkflowName);
+
+            } catch (Exception e) {
+                logger.error("Failed to stop workflow " + cancellingWorkflowName, e);
+                // in case of an error, retry
+                return onFail("Failed to stop workflow " + cancellingWorkflowName + ": " + e.getMessage(), false);
+            }
+
+            try {
+                logger.info("Waiting maximum " + waitTime + "s for workflow " + cancellingWorkflowName + " state to change");
+                TaskState newWorkflowState = taskDriver.pollForWorkflowState(cancellingWorkflowName, waitTime * 1000,
+                        TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED, TaskState.NOT_STARTED);
 
-            logger.info("Workflow " + cancellingWorkflowName + " state changed to " + newWorkflowState.name());
-            return onSuccess("Successfully cancelled workflow " + cancellingWorkflowName);
+                logger.info("Workflow " + cancellingWorkflowName + " state changed to " + newWorkflowState.name());
+                return onSuccess("Successfully cancelled workflow " + cancellingWorkflowName);
 
-        } catch (Exception e) {
-            logger.warn("Failed while watching workflow to stop " + cancellingWorkflowName, e);
-            return onSuccess("Failed while watching workflow to stop " + cancellingWorkflowName +". But continuing");
+            } catch (Exception e) {
+                logger.warn("Failed while watching workflow to stop " + cancellingWorkflowName, e);
+                return onSuccess("Failed while watching workflow to stop " + cancellingWorkflowName + ". But continuing");
+
+            }
 
         } finally {