You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/11/29 14:38:58 UTC

incubator-ariatosca git commit: changed the update mechanism in all events to be more generic

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-414-Current-events-handler-mechanism-relies-on-sqlalcehmy [created] ef3694589


changed the update mechanism in all events to be more generic


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/ef369458
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/ef369458
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/ef369458

Branch: refs/heads/ARIA-414-Current-events-handler-mechanism-relies-on-sqlalcehmy
Commit: ef3694589813514ccdd933bc8e2ed3bdfae625af
Parents: e71ddc9
Author: max-orlov <ma...@gigaspaces.com>
Authored: Tue Nov 28 17:19:48 2017 +0200
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed Nov 29 16:38:47 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/context/operation.py          |   6 -
 aria/orchestrator/context/workflow.py           |   6 -
 .../workflows/core/events_handler.py            | 164 ++++++++++---------
 aria/orchestrator/workflows/executor/base.py    |   5 +-
 aria/orchestrator/workflows/executor/dry.py     |  51 +++---
 .../orchestrator/workflows/executor/__init__.py |   5 -
 6 files changed, 113 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ef369458/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 8613ec3..bc4a58c 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -106,12 +106,6 @@ class BaseOperationContext(common.BaseContext):
             self.model.log._session.remove()
             self.model.log._engine.dispose()
 
-    @property
-    @contextmanager
-    def persist_changes(self):
-        yield
-        self.model.task.update(self.task)
-
 
 class NodeOperationContext(BaseOperationContext):
     """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ef369458/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index 738d2fd..ba66a78 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -93,12 +93,6 @@ class WorkflowContext(BaseContext):
             }
         )
 
-    @property
-    @contextmanager
-    def persist_changes(self):
-        yield
-        self._model.execution.update(self.execution)
-
 
 class _CurrentContext(threading.local):
     """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ef369458/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 473475e..2867216 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -28,126 +28,130 @@ from ... import exceptions
 
 @events.sent_task_signal.connect
 def _task_sent(ctx, *args, **kwargs):
-    with ctx.persist_changes:
-        ctx.task.status = ctx.task.SENT
+    task = ctx.task
+    task.status = ctx.task.SENT
+    ctx.model.task.update(task)
 
 
 @events.start_task_signal.connect
 def _task_started(ctx, *args, **kwargs):
-    with ctx.persist_changes:
-        ctx.task.started_at = datetime.utcnow()
-        ctx.task.status = ctx.task.STARTED
-        _update_node_state_if_necessary(ctx, is_transitional=True)
+    task = ctx.task
+    ctx.task.started_at = datetime.utcnow()
+    ctx.task.status = ctx.task.STARTED
+    _update_node_state_if_necessary(ctx, is_transitional=True)
+    ctx.model.task.update(task)
 
 
 @events.on_failure_task_signal.connect
 def _task_failed(ctx, exception, *args, **kwargs):
-    with ctx.persist_changes:
-        should_retry = all([
-            not isinstance(exception, exceptions.TaskAbortException),
-            ctx.task.attempts_count < ctx.task.max_attempts or
-            ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
-            # ignore_failure check here means the task will not be retried and it will be marked
-            # as failed. The engine will also look at ignore_failure so it won't fail the
-            # workflow.
-            not ctx.task.ignore_failure
-        ])
-        if should_retry:
-            retry_interval = None
-            if isinstance(exception, exceptions.TaskRetryException):
-                retry_interval = exception.retry_interval
-            if retry_interval is None:
-                retry_interval = ctx.task.retry_interval
-            ctx.task.status = ctx.task.RETRYING
-            ctx.task.attempts_count += 1
-            ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
-        else:
-            ctx.task.ended_at = datetime.utcnow()
-            ctx.task.status = ctx.task.FAILED
+    task = ctx.task
+    should_retry = all([
+        not isinstance(exception, exceptions.TaskAbortException),
+        task.attempts_count < task.max_attempts or
+        task.max_attempts == task.INFINITE_RETRIES,
+        # ignore_failure check here means the task will not be retried and it will be marked
+        # as failed. The engine will also look at ignore_failure so it won't fail the
+        # workflow.
+        not task.ignore_failure
+    ])
+    if should_retry:
+        retry_interval = None
+        if isinstance(exception, exceptions.TaskRetryException):
+            retry_interval = exception.retry_interval
+        if retry_interval is None:
+            retry_interval = ctx.task.retry_interval
+        task.status = task.RETRYING
+        task.attempts_count += 1
+        task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
+    else:
+        task.ended_at = datetime.utcnow()
+        task.status = task.FAILED
+    ctx.model.task.update(task)
 
 
 @events.on_success_task_signal.connect
 def _task_succeeded(ctx, *args, **kwargs):
-    with ctx.persist_changes:
-        ctx.task.ended_at = datetime.utcnow()
-        ctx.task.status = ctx.task.SUCCESS
-        ctx.task.attempts_count += 1
+    task = ctx.task
+    ctx.task.ended_at = datetime.utcnow()
+    ctx.task.status = ctx.task.SUCCESS
+    ctx.task.attempts_count += 1
 
-        _update_node_state_if_necessary(ctx)
+    _update_node_state_if_necessary(ctx)
+    ctx.model.task.update(task)
 
 
 @events.start_workflow_signal.connect
 def _workflow_started(workflow_context, *args, **kwargs):
-    with workflow_context.persist_changes:
-        execution = workflow_context.execution
-        # the execution may already be in the process of cancelling
-        if execution.status in (execution.CANCELLING, execution.CANCELLED):
-            return
-        execution.status = execution.STARTED
-        execution.started_at = datetime.utcnow()
+    execution = workflow_context.execution
+    # the execution may already be in the process of cancelling
+    if execution.status in (execution.CANCELLING, execution.CANCELLED):
+        return
+    execution.status = execution.STARTED
+    execution.started_at = datetime.utcnow()
+    workflow_context.model.execution.update(execution)
 
 
 @events.on_failure_workflow_signal.connect
 def _workflow_failed(workflow_context, exception, *args, **kwargs):
-    with workflow_context.persist_changes:
-        execution = workflow_context.execution
-        execution.error = str(exception)
-        execution.status = execution.FAILED
-        execution.ended_at = datetime.utcnow()
+    execution = workflow_context.execution
+    execution.error = str(exception)
+    execution.status = execution.FAILED
+    execution.ended_at = datetime.utcnow()
+    workflow_context.model.execution.update(execution)
 
 
 @events.on_success_workflow_signal.connect
 def _workflow_succeeded(workflow_context, *args, **kwargs):
-    with workflow_context.persist_changes:
-        execution = workflow_context.execution
-        execution.status = execution.SUCCEEDED
-        execution.ended_at = datetime.utcnow()
+    execution = workflow_context.execution
+    execution.status = execution.SUCCEEDED
+    execution.ended_at = datetime.utcnow()
+    workflow_context.model.execution.update(execution)
 
 
 @events.on_cancelled_workflow_signal.connect
 def _workflow_cancelled(workflow_context, *args, **kwargs):
-    with workflow_context.persist_changes:
-        execution = workflow_context.execution
-        # _workflow_cancelling function may have called this function already
-        if execution.status == execution.CANCELLED:
-            return
-        # the execution may have already been finished
-        elif execution.status in (execution.SUCCEEDED, execution.FAILED):
-            _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
-        else:
-            execution.status = execution.CANCELLED
-            execution.ended_at = datetime.utcnow()
+    execution = workflow_context.execution
+    # _workflow_cancelling function may have called this function already
+    if execution.status == execution.CANCELLED:
+        return
+    # the execution may have already been finished
+    elif execution.status in (execution.SUCCEEDED, execution.FAILED):
+        _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
+    else:
+        execution.status = execution.CANCELLED
+        execution.ended_at = datetime.utcnow()
+    workflow_context.model.execution.update(execution)
 
 
 @events.on_resume_workflow_signal.connect
 def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs):
-    with workflow_context.persist_changes:
-        execution = workflow_context.execution
-        execution.status = execution.PENDING
-        # Any non ended task would be put back to pending state
+    execution = workflow_context.execution
+    execution.status = execution.PENDING
+    # Any non ended task would be put back to pending state
+    for task in execution.tasks:
+        if not task.has_ended():
+            task.status = task.PENDING
+
+    if retry_failed:
         for task in execution.tasks:
-            if not task.has_ended():
+            if task.status == task.FAILED and not task.ignore_failure:
+                task.attempts_count = 0
                 task.status = task.PENDING
-
-        if retry_failed:
-            for task in execution.tasks:
-                if task.status == task.FAILED and not task.ignore_failure:
-                    task.attempts_count = 0
-                    task.status = task.PENDING
+    workflow_context.model.execution.update(execution)
 
 
 
 @events.on_cancelling_workflow_signal.connect
 def _workflow_cancelling(workflow_context, *args, **kwargs):
-    with workflow_context.persist_changes:
-        execution = workflow_context.execution
-        if execution.status == execution.PENDING:
-            return _workflow_cancelled(workflow_context=workflow_context)
-        # the execution may have already been finished
-        elif execution.status in (execution.SUCCEEDED, execution.FAILED):
-            _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
-        else:
-            execution.status = execution.CANCELLING
+    execution = workflow_context.execution
+    if execution.status == execution.PENDING:
+        return _workflow_cancelled(workflow_context=workflow_context)
+    # the execution may have already been finished
+    elif execution.status in (execution.SUCCEEDED, execution.FAILED):
+        _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
+    else:
+        execution.status = execution.CANCELLING
+    workflow_context.model.execution.update(execution)
 
 
 def _update_node_state_if_necessary(ctx, is_transitional=False):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ef369458/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index e7d03ea..d550b53 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -71,5 +71,6 @@ class BaseExecutor(logger.LoggerMixin):
 
 class StubTaskExecutor(BaseExecutor):                                                               # pylint: disable=abstract-method
     def execute(self, ctx, *args, **kwargs):
-        with ctx.persist_changes:
-            ctx.task.status = ctx.task.SUCCESS
+        task = ctx.task
+        task.status = ctx.task.SUCCESS
+        ctx.model.task.update(task)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ef369458/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index 9314e5d..628d9cb 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -27,28 +27,29 @@ class DryExecutor(base.BaseExecutor):
     Dry task executor: prints task information without causing any side effects.
     """
     def execute(self, ctx):
-        with ctx.persist_changes:
-            # updating the task manually instead of calling self._task_started(task),
-            # to avoid any side effects raising that event might cause
-            ctx.task.started_at = datetime.utcnow()
-            ctx.task.status = ctx.task.STARTED
-
-            dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}'
-            logger = ctx.logger.info if ctx.task.function else ctx.logger.debug
-
-            if hasattr(ctx.task.actor, 'source_node'):
-                name = '{source_node.name}->{target_node.name}'.format(
-                    source_node=ctx.task.actor.source_node, target_node=ctx.task.actor.target_node)
-            else:
-                name = ctx.task.actor.name
-
-            if ctx.task.function:
-                logger(dry_msg.format(name=name, task=ctx.task, suffix='started...'))
-                logger(dry_msg.format(name=name, task=ctx.task, suffix='successful'))
-            else:
-                logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation'))
-
-            # updating the task manually instead of calling self._task_succeeded(task),
-            # to avoid any side effects raising that event might cause
-            ctx.task.ended_at = datetime.utcnow()
-            ctx.task.status = ctx.task.SUCCESS
+        task = ctx.task
+        # updating the task manually instead of calling self._task_started(task),
+        # to avoid any side effects raising that event might cause
+        task.started_at = datetime.utcnow()
+        task.status = task.STARTED
+
+        dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}'
+        logger = ctx.logger.info if task.function else ctx.logger.debug
+
+        if hasattr(task.actor, 'source_node'):
+            name = '{source_node.name}->{target_node.name}'.format(
+                source_node=task.actor.source_node, target_node=task.actor.target_node)
+        else:
+            name = task.actor.name
+
+        if task.function:
+            logger(dry_msg.format(name=name, task=ctx.task, suffix='started...'))
+            logger(dry_msg.format(name=name, task=ctx.task, suffix='successful'))
+        else:
+            logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation'))
+
+        # updating the task manually instead of calling self._task_succeeded(task),
+        # to avoid any side effects raising that event might cause
+        task.ended_at = datetime.utcnow()
+        task.status = task.SUCCESS
+        ctx.model.task.update(task)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ef369458/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index 99d0b39..fa71a2a 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -58,11 +58,6 @@ class MockContext(object):
         return cls(storage=aria.application_model_storage(**(storage_kwargs or {})),
                    task_kwargs=(task_kwargs or {}))
 
-    @property
-    @contextmanager
-    def persist_changes(self):
-        yield
-
 
 class MockActor(object):
     def __init__(self):