You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by av...@apache.org on 2017/04/25 12:54:56 UTC

[3/3] incubator-ariatosca git commit: Improve execution cancelling and implement force-cancelling

Improve execution cancelling and implement force-cancelling

Unhandled execution status transitions resulting from cancelling an
execution via the CLI, that we indentified and tried to address:

1. TERMINATED -> CANCELLING
You cancel the execution, but by the time we try to set the status to
CANCELLING, the execution thread had already finished, and therefore, in
SUCCEEDED status.

2. FAILED -> CANCELLING
You cancel the execution, but by the time we try to set the status to
CANCELLING, the execution thread had already encountered an error, and therefore, in
FAILED state.

3. TERMINATED -> CANCELLED
Similar to #1, but with CANCELLED instead of CANCELLING.

4. FAILED -> CANCELLED
Similar to #1, but with CANCELLED instead of CANCELLING.

5.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/8d68c6b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/8d68c6b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/8d68c6b3

Branch: refs/heads/ARIA-143-improve-cancelling-of-workflow-execution
Commit: 8d68c6b32d3b422fb89fef24852e533e82946374
Parents: 1e0a597
Author: Avia Efrat <av...@gigaspaces.com>
Authored: Tue Apr 25 15:00:03 2017 +0300
Committer: Avia Efrat <av...@gigaspaces.com>
Committed: Tue Apr 25 15:22:46 2017 +0300

----------------------------------------------------------------------
 aria/cli/commands/executions.py                 |  2 +-
 aria/modeling/orchestration.py                  | 13 +++---
 aria/orchestrator/events.py                     |  1 +
 aria/orchestrator/workflow_runner.py            |  3 ++
 aria/orchestrator/workflows/core/engine.py      |  6 ++-
 .../workflows/core/events_handler.py            | 48 ++++++++++++++++----
 tests/modeling/test_models.py                   | 22 ++++-----
 tests/orchestrator/test_workflow_runner.py      |  2 +-
 .../orchestrator/workflows/core/test_engine.py  |  2 +-
 9 files changed, 68 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d68c6b3/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index 6a1f02a..343ce6f 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -179,4 +179,4 @@ def _cancel_execution(workflow_runner, execution_thread, logger, log_iterator):
             execution_thread.join(1)
     except KeyboardInterrupt:
         logger.info('Force-cancelling execution')
-        # TODO handle execution (update status etc.) and exit process
+        workflow_runner.force_cancel()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d68c6b3/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index b9a75e9..f49591d 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -56,21 +56,22 @@ class ExecutionBase(ModelMixin):
     __private_fields__ = ['service_fk',
                           'service_template']
 
-    TERMINATED = 'terminated'
+    SUCCEEDED = 'succeeded'
     FAILED = 'failed'
     CANCELLED = 'cancelled'
     PENDING = 'pending'
     STARTED = 'started'
     CANCELLING = 'cancelling'
-    FORCE_CANCELLING = 'force_cancelling'
+    FORCE_CANCELLED = 'force_cancelled'
 
-    STATES = (TERMINATED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING, FORCE_CANCELLING)
-    END_STATES = (TERMINATED, FAILED, CANCELLED)
+    STATES = (SUCCEEDED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING, FORCE_CANCELLED)
+    END_STATES = (SUCCEEDED, FAILED, CANCELLED, FORCE_CANCELLED)
+    CANCEL_STATES = (CANCELLING, CANCELLED, FORCE_CANCELLED)
 
     VALID_TRANSITIONS = {
-        PENDING: (STARTED, CANCELLED),
+        PENDING: (STARTED, CANCELLED, FORCE_CANCELLED),
         STARTED: END_STATES + (CANCELLING,),
-        CANCELLING: END_STATES + (FORCE_CANCELLING,)
+        CANCELLING: END_STATES
     }
 
     @orm.validates('status')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d68c6b3/aria/orchestrator/events.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/events.py b/aria/orchestrator/events.py
index a1c4922..bcc0627 100644
--- a/aria/orchestrator/events.py
+++ b/aria/orchestrator/events.py
@@ -32,5 +32,6 @@ on_failure_task_signal = signal('failure_task_signal')
 start_workflow_signal = signal('start_workflow_signal')
 on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal')
 on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal')
+on_force_cancelled_workflow_signal = signal('on_force_cancelled_workflow_signal')
 on_success_workflow_signal = signal('on_success_workflow_signal')
 on_failure_workflow_signal = signal('on_failure_workflow_signal')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d68c6b3/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 8f25cce..32cf02d 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -107,6 +107,9 @@ class WorkflowRunner(object):
     def cancel(self):
         self._engine.cancel_execution()
 
+    def force_cancel(self):
+        self._engine.force_cancel_execution()
+
     def _create_execution_model(self, inputs):
         execution = models.Execution(
             created_at=datetime.utcnow(),

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d68c6b3/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 155d0ee..bd61b16 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -81,9 +81,11 @@ class Engine(logger.LoggerMixin):
         """
         events.on_cancelling_workflow_signal.send(self._workflow_context)
 
+    def force_cancel_execution(self):
+        events.on_force_cancelled_workflow_signal.send(self._workflow_context)
+
     def _is_cancel(self):
-        return self._workflow_context.execution.status in (models.Execution.CANCELLING,
-                                                           models.Execution.CANCELLED)
+        return self._workflow_context.execution.status in models.Execution.CANCEL_STATES
 
     def _executable_tasks(self):
         now = datetime.utcnow()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d68c6b3/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 7f61bfa..b53efbb 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -81,6 +81,9 @@ def _task_succeeded(task, *args, **kwargs):
 @events.start_workflow_signal.connect
 def _workflow_started(workflow_context, *args, **kwargs):
     execution = workflow_context.execution
+    # the execution may already be in a cancelling process
+    if execution.status in execution.CANCEL_STATES:
+        return
     execution.status = execution.STARTED
     execution.started_at = datetime.utcnow()
     workflow_context.execution = execution
@@ -98,7 +101,7 @@ def _workflow_failed(workflow_context, exception, *args, **kwargs):
 @events.on_success_workflow_signal.connect
 def _workflow_succeeded(workflow_context, *args, **kwargs):
     execution = workflow_context.execution
-    execution.status = execution.TERMINATED
+    execution.status = execution.SUCCEEDED
     execution.ended_at = datetime.utcnow()
     workflow_context.execution = execution
 
@@ -106,22 +109,43 @@ def _workflow_succeeded(workflow_context, *args, **kwargs):
 @events.on_cancelled_workflow_signal.connect
 def _workflow_cancelled(workflow_context, *args, **kwargs):
     execution = workflow_context.execution
-    # _workflow_cancelling function may have called this function
-    # already
-    if execution.status == execution.CANCELLED:
+    status = execution.status
+    # _workflow_cancelling function may have called this function already
+    if status == execution.CANCELLED:
         return
-    execution.status = execution.CANCELLED
-    execution.ended_at = datetime.utcnow()
+    # the execution may have already been finished
+    elif status == execution.SUCCEEDED or status == execution.FAILED:
+        _log_execution_already_ended(workflow_context, status)
+    else:
+        execution.status = execution.CANCELLED
+        execution.ended_at = datetime.utcnow()
+        workflow_context.execution = execution
+
+
+@events.on_force_cancelled_workflow_signal.connect
+def _workflow_force_cancelled(workflow_context, *args, **kwargs):
+    execution = workflow_context.execution
+    status = execution.status
+    if status in execution.END_STATES:
+        if status == execution.SUCCEEDED or status == execution.FAILED:
+            _log_execution_already_ended(workflow_context, status)
+        return
+    execution.status = execution.FORCE_CANCELLED
     workflow_context.execution = execution
 
 
 @events.on_cancelling_workflow_signal.connect
 def _workflow_cancelling(workflow_context, *args, **kwargs):
     execution = workflow_context.execution
-    if execution.status == execution.PENDING:
+    status = execution.status
+    if status == execution.PENDING:
         return _workflow_cancelled(workflow_context=workflow_context)
-    execution.status = execution.CANCELLING
-    workflow_context.execution = execution
+    # the execution may have already been finished
+    elif status == execution.SUCCEEDED or status == execution.FAILED:
+        _log_execution_already_ended(workflow_context, status)
+    else:
+        execution.status = execution.CANCELLING
+        workflow_context.execution = execution
 
 
 def _update_node_state_if_necessary(task, is_transitional=False):
@@ -135,3 +159,9 @@ def _update_node_state_if_necessary(task, is_transitional=False):
         if state:
             node.state = state
             task.context.model.node.update(node)
+
+
+def _log_execution_already_ended(workflow_context, status):
+    workflow_context.logger.info(
+        "'{workflow_name}' workflow execution {status} before the cancel request"
+        "was fully processed '".format(workflow_name=workflow_context.workflow_name, status=status))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d68c6b3/tests/modeling/test_models.py
----------------------------------------------------------------------
diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py
index d64cdba..2da2154 100644
--- a/tests/modeling/test_models.py
+++ b/tests/modeling/test_models.py
@@ -310,40 +310,40 @@ class TestExecution(object):
                                 Execution.CANCELLED,
                                 Execution.PENDING],
             Execution.STARTED: [Execution.FAILED,
-                                Execution.TERMINATED,
+                                Execution.SUCCEEDED,
                                 Execution.CANCELLED,
                                 Execution.CANCELLING,
                                 Execution.STARTED],
             Execution.CANCELLING: [Execution.FAILED,
-                                   Execution.TERMINATED,
+                                   Execution.SUCCEEDED,
                                    Execution.CANCELLED,
                                    Execution.CANCELLING],
             Execution.FAILED: [Execution.FAILED],
-            Execution.TERMINATED: [Execution.TERMINATED],
+            Execution.SUCCEEDED: [Execution.SUCCEEDED],
             Execution.CANCELLED: [Execution.CANCELLED]
         }
 
         invalid_transitions = {
             Execution.PENDING: [Execution.FAILED,
-                                Execution.TERMINATED,
+                                Execution.SUCCEEDED,
                                 Execution.CANCELLING],
             Execution.STARTED: [Execution.PENDING],
             Execution.CANCELLING: [Execution.PENDING,
                                    Execution.STARTED],
             Execution.FAILED: [Execution.PENDING,
                                Execution.STARTED,
-                               Execution.TERMINATED,
+                               Execution.SUCCEEDED,
                                Execution.CANCELLED,
                                Execution.CANCELLING],
-            Execution.TERMINATED: [Execution.PENDING,
-                                   Execution.STARTED,
-                                   Execution.FAILED,
-                                   Execution.CANCELLED,
-                                   Execution.CANCELLING],
+            Execution.SUCCEEDED: [Execution.PENDING,
+                                  Execution.STARTED,
+                                  Execution.FAILED,
+                                  Execution.CANCELLED,
+                                  Execution.CANCELLING],
             Execution.CANCELLED: [Execution.PENDING,
                                   Execution.STARTED,
                                   Execution.FAILED,
-                                  Execution.TERMINATED,
+                                  Execution.SUCCEEDED,
                                   Execution.CANCELLING],
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d68c6b3/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index 54e940f..7374e50 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -86,7 +86,7 @@ def test_existing_active_executions(request, service, model):
 def test_existing_executions_but_no_active_ones(request, service, model):
     existing_terminated_execution = models.Execution(
         service=service,
-        status=models.Execution.TERMINATED,
+        status=models.Execution.SUCCEEDED,
         workflow_name='uninstall')
     model.execution.put(existing_terminated_execution)
     # no active executions exist, so no error should be raised

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d68c6b3/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index 1a88f13..af9af17 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -157,7 +157,7 @@ class TestEngine(BaseTest):
         execution = workflow_context.execution
         assert execution.started_at <= execution.ended_at <= datetime.utcnow()
         assert execution.error is None
-        assert execution.status == models.Execution.TERMINATED
+        assert execution.status == models.Execution.SUCCEEDED
 
     def test_single_task_successful_execution(self, workflow_context, executor):
         @workflow