You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2022/01/14 03:49:11 UTC
[airavata] branch ultrascan-mainteinence updated: Gracefully cleaning up the helix manager to avoid stale zookeeper connections
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch ultrascan-mainteinence
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/ultrascan-mainteinence by this push:
new 77fe909 Gracefully cleaning up the helix manager to avoid stale zookeeper connections
77fe909 is described below
commit 77fe9095a311120c793e26d57996178a789dacc3
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu Jan 13 22:49:04 2022 -0500
Gracefully cleaning up the helix manager to avoid stale zookeeper connections
---
.../impl/task/cancel/WorkflowCancellationTask.java | 72 +++++++++++++---------
1 file changed, 43 insertions(+), 29 deletions(-)
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
index 4ee3949..a05bb63 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
@@ -49,6 +49,17 @@ public class WorkflowCancellationTask extends AbstractTask {
} catch (Exception e) {
logger.error("Failed to build Helix Task driver in " + taskName, e);
throw new RuntimeException("Failed to build Helix Task driver in " + taskName, e);
+ } finally {
+
+ try {
+ if (helixManager != null) {
+ if (helixManager.isConnected()) {
+ helixManager.disconnect();
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("Failed to disconnect helix manager", e);
+ }
}
}
@@ -56,43 +67,46 @@ public class WorkflowCancellationTask extends AbstractTask {
public TaskResult onRun(TaskHelper helper) {
logger.info("Cancelling workflow " + cancellingWorkflowName);
- if (taskDriver.getWorkflowConfig(cancellingWorkflowName) == null) {
- // Workflow could be already deleted by cleanup agents
- logger.warn("Can not find a workflow with name " + cancellingWorkflowName + " but continuing");
- return onSuccess("Can not find a workflow with name " + cancellingWorkflowName + " but continuing");
- }
-
try {
- WorkflowContext workflowContext = taskDriver.getWorkflowContext(cancellingWorkflowName);
-
- // if the workflow can not be found, ignore it
- if (workflowContext == null) {
- logger.warn("Can not find a workflow with id " + cancellingWorkflowName + ". So ignoring");
- return onSuccess("Can not find a workflow with id " + cancellingWorkflowName + ". So ignoring");
+ if (taskDriver.getWorkflowConfig(cancellingWorkflowName) == null) {
+ // Workflow could be already deleted by cleanup agents
+ logger.warn("Can not find a workflow with name " + cancellingWorkflowName + " but continuing");
+ return onSuccess("Can not find a workflow with name " + cancellingWorkflowName + " but continuing");
}
- TaskState workflowState = workflowContext.getWorkflowState();
- logger.info("Current state of workflow " + cancellingWorkflowName + " : " + workflowState.name());
+ try {
+ WorkflowContext workflowContext = taskDriver.getWorkflowContext(cancellingWorkflowName);
- taskDriver.stop(cancellingWorkflowName);
+ // if the workflow can not be found, ignore it
+ if (workflowContext == null) {
+ logger.warn("Can not find a workflow with id " + cancellingWorkflowName + ". So ignoring");
+ return onSuccess("Can not find a workflow with id " + cancellingWorkflowName + ". So ignoring");
+ }
- } catch (Exception e) {
- logger.error("Failed to stop workflow " + cancellingWorkflowName, e);
- // in case of an error, retry
- return onFail("Failed to stop workflow " + cancellingWorkflowName + ": " + e.getMessage(), false);
- }
+ TaskState workflowState = workflowContext.getWorkflowState();
+ logger.info("Current state of workflow " + cancellingWorkflowName + " : " + workflowState.name());
- try {
- logger.info("Waiting maximum " + waitTime +"s for workflow " + cancellingWorkflowName + " state to change");
- TaskState newWorkflowState = taskDriver.pollForWorkflowState(cancellingWorkflowName, waitTime * 1000,
- TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED, TaskState.NOT_STARTED);
+ taskDriver.stop(cancellingWorkflowName);
+
+ } catch (Exception e) {
+ logger.error("Failed to stop workflow " + cancellingWorkflowName, e);
+ // in case of an error, retry
+ return onFail("Failed to stop workflow " + cancellingWorkflowName + ": " + e.getMessage(), false);
+ }
+
+ try {
+ logger.info("Waiting maximum " + waitTime + "s for workflow " + cancellingWorkflowName + " state to change");
+ TaskState newWorkflowState = taskDriver.pollForWorkflowState(cancellingWorkflowName, waitTime * 1000,
+ TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED, TaskState.NOT_STARTED);
- logger.info("Workflow " + cancellingWorkflowName + " state changed to " + newWorkflowState.name());
- return onSuccess("Successfully cancelled workflow " + cancellingWorkflowName);
+ logger.info("Workflow " + cancellingWorkflowName + " state changed to " + newWorkflowState.name());
+ return onSuccess("Successfully cancelled workflow " + cancellingWorkflowName);
- } catch (Exception e) {
- logger.warn("Failed while watching workflow to stop " + cancellingWorkflowName, e);
- return onSuccess("Failed while watching workflow to stop " + cancellingWorkflowName +". But continuing");
+ } catch (Exception e) {
+ logger.warn("Failed while watching workflow to stop " + cancellingWorkflowName, e);
+ return onSuccess("Failed while watching workflow to stop " + cancellingWorkflowName + ". But continuing");
+
+ }
} finally {