You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/11/29 14:38:58 UTC
incubator-ariatosca git commit: changed the update mechanism in all
events to be more generic
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-414-Current-events-handler-mechanism-relies-on-sqlalcehmy [created] ef3694589
changed the update mechanism in all events to be more generic
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/ef369458
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/ef369458
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/ef369458
Branch: refs/heads/ARIA-414-Current-events-handler-mechanism-relies-on-sqlalcehmy
Commit: ef3694589813514ccdd933bc8e2ed3bdfae625af
Parents: e71ddc9
Author: max-orlov <ma...@gigaspaces.com>
Authored: Tue Nov 28 17:19:48 2017 +0200
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed Nov 29 16:38:47 2017 +0200
----------------------------------------------------------------------
aria/orchestrator/context/operation.py | 6 -
aria/orchestrator/context/workflow.py | 6 -
.../workflows/core/events_handler.py | 164 ++++++++++---------
aria/orchestrator/workflows/executor/base.py | 5 +-
aria/orchestrator/workflows/executor/dry.py | 51 +++---
.../orchestrator/workflows/executor/__init__.py | 5 -
6 files changed, 113 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ef369458/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 8613ec3..bc4a58c 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -106,12 +106,6 @@ class BaseOperationContext(common.BaseContext):
self.model.log._session.remove()
self.model.log._engine.dispose()
- @property
- @contextmanager
- def persist_changes(self):
- yield
- self.model.task.update(self.task)
-
class NodeOperationContext(BaseOperationContext):
"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ef369458/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index 738d2fd..ba66a78 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -93,12 +93,6 @@ class WorkflowContext(BaseContext):
}
)
- @property
- @contextmanager
- def persist_changes(self):
- yield
- self._model.execution.update(self.execution)
-
class _CurrentContext(threading.local):
"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ef369458/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 473475e..2867216 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -28,126 +28,130 @@ from ... import exceptions
@events.sent_task_signal.connect
def _task_sent(ctx, *args, **kwargs):
- with ctx.persist_changes:
- ctx.task.status = ctx.task.SENT
+ task = ctx.task
+ task.status = ctx.task.SENT
+ ctx.model.task.update(task)
@events.start_task_signal.connect
def _task_started(ctx, *args, **kwargs):
- with ctx.persist_changes:
- ctx.task.started_at = datetime.utcnow()
- ctx.task.status = ctx.task.STARTED
- _update_node_state_if_necessary(ctx, is_transitional=True)
+ task = ctx.task
+ ctx.task.started_at = datetime.utcnow()
+ ctx.task.status = ctx.task.STARTED
+ _update_node_state_if_necessary(ctx, is_transitional=True)
+ ctx.model.task.update(task)
@events.on_failure_task_signal.connect
def _task_failed(ctx, exception, *args, **kwargs):
- with ctx.persist_changes:
- should_retry = all([
- not isinstance(exception, exceptions.TaskAbortException),
- ctx.task.attempts_count < ctx.task.max_attempts or
- ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
- # ignore_failure check here means the task will not be retried and it will be marked
- # as failed. The engine will also look at ignore_failure so it won't fail the
- # workflow.
- not ctx.task.ignore_failure
- ])
- if should_retry:
- retry_interval = None
- if isinstance(exception, exceptions.TaskRetryException):
- retry_interval = exception.retry_interval
- if retry_interval is None:
- retry_interval = ctx.task.retry_interval
- ctx.task.status = ctx.task.RETRYING
- ctx.task.attempts_count += 1
- ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
- else:
- ctx.task.ended_at = datetime.utcnow()
- ctx.task.status = ctx.task.FAILED
+ task = ctx.task
+ should_retry = all([
+ not isinstance(exception, exceptions.TaskAbortException),
+ task.attempts_count < task.max_attempts or
+ task.max_attempts == task.INFINITE_RETRIES,
+ # ignore_failure check here means the task will not be retried and it will be marked
+ # as failed. The engine will also look at ignore_failure so it won't fail the
+ # workflow.
+ not task.ignore_failure
+ ])
+ if should_retry:
+ retry_interval = None
+ if isinstance(exception, exceptions.TaskRetryException):
+ retry_interval = exception.retry_interval
+ if retry_interval is None:
+ retry_interval = ctx.task.retry_interval
+ task.status = task.RETRYING
+ task.attempts_count += 1
+ task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
+ else:
+ task.ended_at = datetime.utcnow()
+ task.status = task.FAILED
+ ctx.model.task.update(task)
@events.on_success_task_signal.connect
def _task_succeeded(ctx, *args, **kwargs):
- with ctx.persist_changes:
- ctx.task.ended_at = datetime.utcnow()
- ctx.task.status = ctx.task.SUCCESS
- ctx.task.attempts_count += 1
+ task = ctx.task
+ ctx.task.ended_at = datetime.utcnow()
+ ctx.task.status = ctx.task.SUCCESS
+ ctx.task.attempts_count += 1
- _update_node_state_if_necessary(ctx)
+ _update_node_state_if_necessary(ctx)
+ ctx.model.task.update(task)
@events.start_workflow_signal.connect
def _workflow_started(workflow_context, *args, **kwargs):
- with workflow_context.persist_changes:
- execution = workflow_context.execution
- # the execution may already be in the process of cancelling
- if execution.status in (execution.CANCELLING, execution.CANCELLED):
- return
- execution.status = execution.STARTED
- execution.started_at = datetime.utcnow()
+ execution = workflow_context.execution
+ # the execution may already be in the process of cancelling
+ if execution.status in (execution.CANCELLING, execution.CANCELLED):
+ return
+ execution.status = execution.STARTED
+ execution.started_at = datetime.utcnow()
+ workflow_context.model.execution.update(execution)
@events.on_failure_workflow_signal.connect
def _workflow_failed(workflow_context, exception, *args, **kwargs):
- with workflow_context.persist_changes:
- execution = workflow_context.execution
- execution.error = str(exception)
- execution.status = execution.FAILED
- execution.ended_at = datetime.utcnow()
+ execution = workflow_context.execution
+ execution.error = str(exception)
+ execution.status = execution.FAILED
+ execution.ended_at = datetime.utcnow()
+ workflow_context.model.execution.update(execution)
@events.on_success_workflow_signal.connect
def _workflow_succeeded(workflow_context, *args, **kwargs):
- with workflow_context.persist_changes:
- execution = workflow_context.execution
- execution.status = execution.SUCCEEDED
- execution.ended_at = datetime.utcnow()
+ execution = workflow_context.execution
+ execution.status = execution.SUCCEEDED
+ execution.ended_at = datetime.utcnow()
+ workflow_context.model.execution.update(execution)
@events.on_cancelled_workflow_signal.connect
def _workflow_cancelled(workflow_context, *args, **kwargs):
- with workflow_context.persist_changes:
- execution = workflow_context.execution
- # _workflow_cancelling function may have called this function already
- if execution.status == execution.CANCELLED:
- return
- # the execution may have already been finished
- elif execution.status in (execution.SUCCEEDED, execution.FAILED):
- _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
- else:
- execution.status = execution.CANCELLED
- execution.ended_at = datetime.utcnow()
+ execution = workflow_context.execution
+ # _workflow_cancelling function may have called this function already
+ if execution.status == execution.CANCELLED:
+ return
+ # the execution may have already been finished
+ elif execution.status in (execution.SUCCEEDED, execution.FAILED):
+ _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
+ else:
+ execution.status = execution.CANCELLED
+ execution.ended_at = datetime.utcnow()
+ workflow_context.model.execution.update(execution)
@events.on_resume_workflow_signal.connect
def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs):
- with workflow_context.persist_changes:
- execution = workflow_context.execution
- execution.status = execution.PENDING
- # Any non ended task would be put back to pending state
+ execution = workflow_context.execution
+ execution.status = execution.PENDING
+ # Any non ended task would be put back to pending state
+ for task in execution.tasks:
+ if not task.has_ended():
+ task.status = task.PENDING
+
+ if retry_failed:
for task in execution.tasks:
- if not task.has_ended():
+ if task.status == task.FAILED and not task.ignore_failure:
+ task.attempts_count = 0
task.status = task.PENDING
-
- if retry_failed:
- for task in execution.tasks:
- if task.status == task.FAILED and not task.ignore_failure:
- task.attempts_count = 0
- task.status = task.PENDING
+ workflow_context.model.execution.update(execution)
@events.on_cancelling_workflow_signal.connect
def _workflow_cancelling(workflow_context, *args, **kwargs):
- with workflow_context.persist_changes:
- execution = workflow_context.execution
- if execution.status == execution.PENDING:
- return _workflow_cancelled(workflow_context=workflow_context)
- # the execution may have already been finished
- elif execution.status in (execution.SUCCEEDED, execution.FAILED):
- _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
- else:
- execution.status = execution.CANCELLING
+ execution = workflow_context.execution
+ if execution.status == execution.PENDING:
+ return _workflow_cancelled(workflow_context=workflow_context)
+ # the execution may have already been finished
+ elif execution.status in (execution.SUCCEEDED, execution.FAILED):
+ _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
+ else:
+ execution.status = execution.CANCELLING
+ workflow_context.model.execution.update(execution)
def _update_node_state_if_necessary(ctx, is_transitional=False):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ef369458/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index e7d03ea..d550b53 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -71,5 +71,6 @@ class BaseExecutor(logger.LoggerMixin):
class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method
def execute(self, ctx, *args, **kwargs):
- with ctx.persist_changes:
- ctx.task.status = ctx.task.SUCCESS
+ task = ctx.task
+ task.status = ctx.task.SUCCESS
+ ctx.model.task.update(task)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ef369458/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index 9314e5d..628d9cb 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -27,28 +27,29 @@ class DryExecutor(base.BaseExecutor):
Dry task executor: prints task information without causing any side effects.
"""
def execute(self, ctx):
- with ctx.persist_changes:
- # updating the task manually instead of calling self._task_started(task),
- # to avoid any side effects raising that event might cause
- ctx.task.started_at = datetime.utcnow()
- ctx.task.status = ctx.task.STARTED
-
- dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}'
- logger = ctx.logger.info if ctx.task.function else ctx.logger.debug
-
- if hasattr(ctx.task.actor, 'source_node'):
- name = '{source_node.name}->{target_node.name}'.format(
- source_node=ctx.task.actor.source_node, target_node=ctx.task.actor.target_node)
- else:
- name = ctx.task.actor.name
-
- if ctx.task.function:
- logger(dry_msg.format(name=name, task=ctx.task, suffix='started...'))
- logger(dry_msg.format(name=name, task=ctx.task, suffix='successful'))
- else:
- logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation'))
-
- # updating the task manually instead of calling self._task_succeeded(task),
- # to avoid any side effects raising that event might cause
- ctx.task.ended_at = datetime.utcnow()
- ctx.task.status = ctx.task.SUCCESS
+ task = ctx.task
+ # updating the task manually instead of calling self._task_started(task),
+ # to avoid any side effects raising that event might cause
+ task.started_at = datetime.utcnow()
+ task.status = task.STARTED
+
+ dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}'
+ logger = ctx.logger.info if task.function else ctx.logger.debug
+
+ if hasattr(task.actor, 'source_node'):
+ name = '{source_node.name}->{target_node.name}'.format(
+ source_node=task.actor.source_node, target_node=task.actor.target_node)
+ else:
+ name = task.actor.name
+
+ if task.function:
+ logger(dry_msg.format(name=name, task=ctx.task, suffix='started...'))
+ logger(dry_msg.format(name=name, task=ctx.task, suffix='successful'))
+ else:
+ logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation'))
+
+ # updating the task manually instead of calling self._task_succeeded(task),
+ # to avoid any side effects raising that event might cause
+ task.ended_at = datetime.utcnow()
+ task.status = task.SUCCESS
+ ctx.model.task.update(task)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ef369458/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index 99d0b39..fa71a2a 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -58,11 +58,6 @@ class MockContext(object):
return cls(storage=aria.application_model_storage(**(storage_kwargs or {})),
task_kwargs=(task_kwargs or {}))
- @property
- @contextmanager
- def persist_changes(self):
- yield
-
class MockActor(object):
def __init__(self):