You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/20 12:59:17 UTC
incubator-ariatosca git commit: phase 1 fixes [Forced Update!]
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-278-Remove-core-tasks 88325109f -> 3281ad952 (forced update)
phase 1 fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/3281ad95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/3281ad95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/3281ad95
Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 3281ad952dc8f9b41664f5e289a7cb39ecbc3fa9
Parents: 507796e
Author: max-orlov <ma...@gigaspaces.com>
Authored: Tue Jun 20 15:34:57 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Jun 20 15:59:11 2017 +0300
----------------------------------------------------------------------
aria/modeling/orchestration.py | 14 +--
aria/orchestrator/context/operation.py | 2 +-
aria/orchestrator/context/workflow.py | 2 +-
aria/orchestrator/workflow_runner.py | 4 +-
aria/orchestrator/workflows/api/task.py | 6 +
aria/orchestrator/workflows/core/engine.py | 122 +++----------------
.../workflows/core/events_handler.py | 19 ++-
aria/orchestrator/workflows/core/task.py | 110 +++++++++++++++++
aria/orchestrator/workflows/events_logging.py | 56 ++++-----
aria/orchestrator/workflows/executor/base.py | 4 +-
aria/orchestrator/workflows/executor/dry.py | 2 +-
tests/orchestrator/context/__init__.py | 5 +-
tests/orchestrator/context/test_serialize.py | 4 +-
.../orchestrator/execution_plugin/test_local.py | 5 +-
tests/orchestrator/execution_plugin/test_ssh.py | 4 +-
.../orchestrator/workflows/core/test_engine.py | 8 +-
.../orchestrator/workflows/core/test_events.py | 6 +-
.../test_task_graph_into_execution_graph.py | 15 +--
.../orchestrator/workflows/executor/__init__.py | 2 +-
.../executor/test_process_executor_extension.py | 4 +-
.../test_process_executor_tracked_changes.py | 4 +-
21 files changed, 203 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 007eefa..17d2476 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -304,10 +304,11 @@ class TaskBase(mixins.ModelMixin):
started_at = Column(DateTime, default=None)
ended_at = Column(DateTime, default=None)
attempts_count = Column(Integer, default=1)
- api_id = Column(String)
+ _api_id = Column(String)
_executor = Column(PickleType)
_context_cls = Column(PickleType)
+ _stub_type = Column(Enum(*STUB_TYPES))
@declared_attr
def logs(cls):
@@ -336,8 +337,6 @@ class TaskBase(mixins.ModelMixin):
interface_name = Column(String)
operation_name = Column(String)
- stub_type = Column(Enum(*STUB_TYPES))
-
@property
def actor(self):
"""
@@ -410,21 +409,18 @@ class TaskBase(mixins.ModelMixin):
return self.status in (self.SUCCESS, self.FAILED)
def is_waiting(self):
- if self.stub_type:
+ if self._stub_type:
return not self.has_ended()
else:
return self.status in (self.PENDING, self.RETRYING)
@classmethod
def from_api_task(cls, api_task, executor, **kwargs):
- from aria.orchestrator import context
instantiation_kwargs = {}
if hasattr(api_task.actor, 'outbound_relationships'):
- context_cls = context.operation.NodeOperationContext
instantiation_kwargs['node'] = api_task.actor
elif hasattr(api_task.actor, 'source_node'):
- context_cls = context.operation.RelationshipOperationContext
instantiation_kwargs['relationship'] = api_task.actor
else:
raise RuntimeError('No operation context could be created for {actor.model_cls}'
@@ -445,8 +441,8 @@ class TaskBase(mixins.ModelMixin):
'plugin': api_task.plugin,
'function': api_task.function,
'arguments': api_task.arguments,
- 'api_id': api_task.id,
- '_context_cls': context_cls,
+ '_api_id': api_task.id,
+ '_context_cls': api_task._context_cls,
'_executor': executor,
}
)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 07bf297..d43b847 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -109,7 +109,7 @@ class BaseOperationContext(common.BaseContext):
@property
@contextmanager
- def track_changes(self):
+ def persist_changes(self):
yield
self.model.task.update(self.task)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index 4b7573f..aa5a786 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -109,7 +109,7 @@ class WorkflowContext(BaseContext):
@property
@contextmanager
- def track_changes(self):
+ def persist_changes(self):
yield
self._model.execution.update(self.execution)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index c30ec4b..649e08f 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -24,7 +24,7 @@ from datetime import datetime
from . import exceptions
from .context.workflow import WorkflowContext
from .workflows import builtin
-from .workflows.core import engine
+from .workflows.core import engine, task
from .workflows.executor.process import ProcessExecutor
from ..modeling import models
from ..modeling import utils as modeling_utils
@@ -92,7 +92,7 @@ class WorkflowRunner(object):
# Update the state
self._model_storage.execution.update(execution)
- self._engine = engine.Engine(default_executor=executor)
+ self._engine = engine.Engine(executors={executor.__class__: executor})
@property
def execution_id(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index ca125a8..0e80e8a 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -140,6 +140,12 @@ class OperationTask(BaseTask):
self.arguments = modeling_utils.merge_parameter_values(arguments,
operation.arguments,
model_cls=models.Argument)
+ if isinstance(self.actor, models.Node):
+ self._context_cls = context.operation.NodeOperationContext
+ elif isinstance(self.actor, models.Relationship):
+ self._context_cls = context.operation.RelationshipOperationContext
+ else:
+ self._context_cls = context.operation.BaseOperationContext
def __repr__(self):
return self.name
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index cec561f..7d542d0 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -35,25 +35,25 @@ class Engine(logger.LoggerMixin):
The workflow engine. Executes workflows
"""
- def __init__(self, default_executor, **kwargs):
+ def __init__(self, executors, **kwargs):
super(Engine, self).__init__(**kwargs)
- self._executors = {default_executor.__class__: default_executor}
- self._executing_tasks = []
+ self._executors = executors.copy
def execute(self, ctx):
"""
execute the workflow
"""
+ executing_tasks = []
try:
events.start_workflow_signal.send(ctx)
while True:
cancel = self._is_cancel(ctx)
if cancel:
break
- for task in self._ended_tasks(ctx):
- self._handle_ended_tasks(ctx, task)
+ for task in self._ended_tasks(ctx, executing_tasks):
+ self._handle_ended_tasks(ctx, task, executing_tasks)
for task in self._executable_tasks(ctx):
- self._handle_executable_task(ctx, task)
+ self._handle_executable_task(ctx, task, executing_tasks)
if self._all_tasks_consumed(ctx):
break
else:
@@ -77,7 +77,7 @@ class Engine(logger.LoggerMixin):
@staticmethod
def _is_cancel(ctx):
- execution = ctx.model.execution.update(ctx.execution)
+ execution = ctx.model.execution.refresh(ctx.execution)
return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED)
def _executable_tasks(self, ctx):
@@ -88,8 +88,8 @@ class Engine(logger.LoggerMixin):
not self._task_has_dependencies(ctx, task)
)
- def _ended_tasks(self, ctx):
- for task in self._executing_tasks:
+ def _ended_tasks(self, ctx, executing_tasks):
+ for task in executing_tasks:
if task.has_ended() and task in ctx._graph:
yield task
@@ -106,12 +106,11 @@ class Engine(logger.LoggerMixin):
for task in ctx.execution.tasks:
yield ctx.model.task.refresh(task)
- def _handle_executable_task(self, ctx, task):
- if task._executor not in self._executors:
- self._executors[task._executor] = task._executor()
+ def _handle_executable_task(self, ctx, task, executing_tasks):
task_executor = self._executors[task._executor]
- context_cls = task._context_cls or operation.BaseOperationContext
+ # If the task is a stub, a default context is provided, else it should hold the context cls
+ context_cls = operation.BaseOperationContext if task._stub_type else task._context_cls
op_ctx = context_cls(
model_storage=ctx.model,
resource_storage=ctx.resource,
@@ -123,104 +122,15 @@ class Engine(logger.LoggerMixin):
name=task.name
)
- self._executing_tasks.append(task)
+ executing_tasks.append(task)
- if not task.stub_type:
+ if not task._stub_type:
events.sent_task_signal.send(op_ctx)
task_executor.execute(op_ctx)
- def _handle_ended_tasks(self, ctx, task):
- self._executing_tasks.remove(task)
+ def _handle_ended_tasks(self, ctx, task, executing_tasks):
+ executing_tasks.remove(task)
if task.status == models.Task.FAILED and not task.ignore_failure:
raise exceptions.ExecutorException('Workflow failed')
else:
ctx._graph.remove_node(task)
-
-
-def construct_execution_tasks(execution,
- task_graph,
- default_executor,
- stub_executor=executor.base.StubTaskExecutor,
- start_stub_type=models.Task.START_WORKFLOW,
- end_stub_type=models.Task.END_WORKFLOW,
- depends_on=()):
- """
- Translates the user graph to the execution graph
- :param task_graph: The user's graph
- :param start_stub_type: internal use
- :param end_stub_type: internal use
- :param depends_on: internal use
- """
- depends_on = list(depends_on)
-
- # Insert start marker
- start_task = models.Task(api_id=_start_graph_suffix(task_graph.id),
- _executor=stub_executor,
- execution=execution,
- stub_type=start_stub_type,
- dependencies=depends_on)
-
- for task in task_graph.topological_order(reverse=True):
- operation_dependencies = _get_tasks_from_dependencies(
- execution, task_graph.get_dependencies(task), [start_task])
-
- if isinstance(task, api.task.OperationTask):
- models.Task.from_api_task(api_task=task,
- executor=default_executor,
- dependencies=operation_dependencies)
-
- elif isinstance(task, api.task.WorkflowTask):
- # Build the graph recursively while adding start and end markers
- construct_execution_tasks(
- execution=execution,
- task_graph=task,
- default_executor=default_executor,
- stub_executor=stub_executor,
- start_stub_type=models.Task.START_SUBWROFKLOW,
- end_stub_type=models.Task.END_SUBWORKFLOW,
- depends_on=operation_dependencies
- )
- elif isinstance(task, api.task.StubTask):
- models.Task(api_id=task.id,
- _executor=stub_executor,
- execution=execution,
- stub_type=models.Task.STUB,
- dependencies=operation_dependencies)
- else:
- raise RuntimeError('Undefined state')
-
- # Insert end marker
- models.Task(api_id=_end_graph_suffix(task_graph.id),
- _executor=stub_executor,
- execution=execution,
- stub_type=end_stub_type,
- dependencies=_get_non_dependent_tasks(execution) or [start_task])
-
-
-def _start_graph_suffix(api_id):
- return '{0}-Start'.format(api_id)
-
-
-def _end_graph_suffix(api_id):
- return '{0}-End'.format(api_id)
-
-
-def _get_non_dependent_tasks(execution):
- dependency_tasks = set()
- for task in execution.tasks:
- dependency_tasks.update(task.dependencies)
- return list(set(execution.tasks) - set(dependency_tasks))
-
-
-def _get_tasks_from_dependencies(execution, dependencies, default=()):
- """
- Returns task list from dependencies.
- """
- tasks = []
- for dependency in dependencies:
- if getattr(dependency, 'actor', False):
- dependency_name = dependency.id
- else:
- dependency_name = _end_graph_suffix(dependency.id)
- tasks.extend(task for task in execution.tasks if task.api_id == dependency_name)
- return tasks or default
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 3a780d5..2d71d2a 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -31,13 +31,13 @@ from ... import exceptions
@events.sent_task_signal.connect
def _task_sent(ctx, *args, **kwargs):
- with ctx.track_changes:
+ with ctx.persist_changes:
ctx.task.status = ctx.task.SENT
@events.start_task_signal.connect
def _task_started(ctx, *args, **kwargs):
- with ctx.track_changes:
+ with ctx.persist_changes:
ctx.task.started_at = datetime.utcnow()
ctx.task.status = ctx.task.STARTED
_update_node_state_if_necessary(ctx, is_transitional=True)
@@ -45,7 +45,7 @@ def _task_started(ctx, *args, **kwargs):
@events.on_failure_task_signal.connect
def _task_failed(ctx, exception, *args, **kwargs):
- with ctx.track_changes:
+ with ctx.persist_changes:
should_retry = all([
not isinstance(exception, exceptions.TaskAbortException),
ctx.task.attempts_count < ctx.task.max_attempts or
@@ -71,7 +71,7 @@ def _task_failed(ctx, exception, *args, **kwargs):
@events.on_success_task_signal.connect
def _task_succeeded(ctx, *args, **kwargs):
- with ctx.track_changes:
+ with ctx.persist_changes:
ctx.task.ended_at = datetime.utcnow()
ctx.task.status = ctx.task.SUCCESS
@@ -80,7 +80,7 @@ def _task_succeeded(ctx, *args, **kwargs):
@events.start_workflow_signal.connect
def _workflow_started(workflow_context, *args, **kwargs):
- with workflow_context.track_changes:
+ with workflow_context.persist_changes:
execution = workflow_context.execution
# the execution may already be in the process of cancelling
if execution.status in (execution.CANCELLING, execution.CANCELLED):
@@ -91,7 +91,7 @@ def _workflow_started(workflow_context, *args, **kwargs):
@events.on_failure_workflow_signal.connect
def _workflow_failed(workflow_context, exception, *args, **kwargs):
- with workflow_context.track_changes:
+ with workflow_context.persist_changes:
execution = workflow_context.execution
execution.error = str(exception)
execution.status = execution.FAILED
@@ -100,7 +100,7 @@ def _workflow_failed(workflow_context, exception, *args, **kwargs):
@events.on_success_workflow_signal.connect
def _workflow_succeeded(workflow_context, *args, **kwargs):
- with workflow_context.track_changes:
+ with workflow_context.persist_changes:
execution = workflow_context.execution
execution.status = execution.SUCCEEDED
execution.ended_at = datetime.utcnow()
@@ -108,7 +108,7 @@ def _workflow_succeeded(workflow_context, *args, **kwargs):
@events.on_cancelled_workflow_signal.connect
def _workflow_cancelled(workflow_context, *args, **kwargs):
- with workflow_context.track_changes:
+ with workflow_context.persist_changes:
execution = workflow_context.execution
# _workflow_cancelling function may have called this function already
if execution.status == execution.CANCELLED:
@@ -123,7 +123,7 @@ def _workflow_cancelled(workflow_context, *args, **kwargs):
@events.on_cancelling_workflow_signal.connect
def _workflow_cancelling(workflow_context, *args, **kwargs):
- with workflow_context.track_changes:
+ with workflow_context.persist_changes:
execution = workflow_context.execution
if execution.status == execution.PENDING:
return _workflow_cancelled(workflow_context=workflow_context)
@@ -132,7 +132,6 @@ def _workflow_cancelling(workflow_context, *args, **kwargs):
_log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
else:
execution.status = execution.CANCELLING
- workflow_context.execution = execution
def _update_node_state_if_necessary(ctx, is_transitional=False):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
new file mode 100644
index 0000000..07d785c
--- /dev/null
+++ b/aria/orchestrator/workflows/core/task.py
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+The workflow engine. Executes workflows
+"""
+
+from ....modeling import models
+from .. import executor, api
+
+
+def construct_execution_tasks(execution,
+ task_graph,
+ default_executor,
+ stub_executor=executor.base.StubTaskExecutor,
+ start_stub_type=models.Task.START_WORKFLOW,
+ end_stub_type=models.Task.END_WORKFLOW,
+ depends_on=()):
+ """
+ Translates the user graph to the execution graph
+ :param task_graph: The user's graph
+ :param start_stub_type: internal use
+ :param end_stub_type: internal use
+ :param depends_on: internal use
+ """
+ depends_on = list(depends_on)
+
+ # Insert start marker
+ start_task = models.Task(api_id=_start_graph_suffix(task_graph.id),
+ _executor=stub_executor,
+ execution=execution,
+ stub_type=start_stub_type,
+ dependencies=depends_on)
+
+ for task in task_graph.topological_order(reverse=True):
+ operation_dependencies = _get_tasks_from_dependencies(
+ execution, task_graph.get_dependencies(task), [start_task])
+
+ if isinstance(task, api.task.OperationTask):
+ models.Task.from_api_task(api_task=task,
+ executor=default_executor,
+ dependencies=operation_dependencies)
+
+ elif isinstance(task, api.task.WorkflowTask):
+ # Build the graph recursively while adding start and end markers
+ construct_execution_tasks(
+ execution=execution,
+ task_graph=task,
+ default_executor=default_executor,
+ stub_executor=stub_executor,
+ start_stub_type=models.Task.START_SUBWROFKLOW,
+ end_stub_type=models.Task.END_SUBWORKFLOW,
+ depends_on=operation_dependencies
+ )
+ elif isinstance(task, api.task.StubTask):
+ models.Task(api_id=task.id,
+ _executor=stub_executor,
+ execution=execution,
+ stub_type=models.Task.STUB,
+ dependencies=operation_dependencies)
+ else:
+ raise RuntimeError('Undefined state')
+
+ # Insert end marker
+ models.Task(api_id=_end_graph_suffix(task_graph.id),
+ _executor=stub_executor,
+ execution=execution,
+ stub_type=end_stub_type,
+ dependencies=_get_non_dependent_tasks(execution) or [start_task])
+
+
+def _start_graph_suffix(api_id):
+ return '{0}-Start'.format(api_id)
+
+
+def _end_graph_suffix(api_id):
+ return '{0}-End'.format(api_id)
+
+
+def _get_non_dependent_tasks(execution):
+ tasks_with_dependencies = set()
+ for task in execution.tasks:
+ tasks_with_dependencies.update(task.dependencies)
+ return list(set(execution.tasks) - set(tasks_with_dependencies))
+
+
+def _get_tasks_from_dependencies(execution, dependencies, default=()):
+ """
+ Returns task list from dependencies.
+ """
+ tasks = []
+ for dependency in dependencies:
+ if dependency._stub_type is None:
+ dependency_name = _end_graph_suffix(dependency.id)
+ else:
+ dependency_name = dependency.id
+ tasks.extend(task for task in execution.tasks if task._api_id == dependency_name)
+ return tasks or default
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py
index 12aebab..4cee867 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -35,66 +35,54 @@ def _get_task_name(task):
@events.start_task_signal.connect
def _start_task_handler(ctx, **kwargs):
- with ctx.track_changes:
- # If the task has no function this is an empty task.
- if ctx.task.function:
- suffix = 'started...'
- logger = ctx.logger.info
- else:
- suffix = 'has no implementation'
- logger = ctx.logger.debug
+ # If the task has no function this is an empty task.
+ if ctx.task.function:
+ suffix = 'started...'
+ logger = ctx.logger.info
+ else:
+ suffix = 'has no implementation'
+ logger = ctx.logger.debug
- logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format(
- name=_get_task_name(ctx.task), task=ctx.task, suffix=suffix))
+ logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format(
+ name=_get_task_name(ctx.task), task=ctx.task, suffix=suffix))
@events.on_success_task_signal.connect
def _success_task_handler(ctx, **kwargs):
- with ctx.track_changes:
- if not ctx.task.function:
- return
- ctx.logger.info('{name} {task.interface_name}.{task.operation_name} successful'
- .format(name=_get_task_name(ctx.task), task=ctx.task))
+ if not ctx.task.function:
+ return
+ ctx.logger.info('{name} {task.interface_name}.{task.operation_name} successful'
+ .format(name=_get_task_name(ctx.task), task=ctx.task))
@events.on_failure_task_signal.connect
def _failure_operation_handler(ctx, traceback, **kwargs):
- with ctx.track_changes:
- ctx.logger.error(
- '{name} {task.interface_name}.{task.operation_name} failed'
- .format(name=_get_task_name(ctx.task), task=ctx.task), extra=dict(traceback=traceback)
- )
+ ctx.logger.error(
+ '{name} {task.interface_name}.{task.operation_name} failed'
+ .format(name=_get_task_name(ctx.task), task=ctx.task), extra=dict(traceback=traceback)
+ )
@events.start_workflow_signal.connect
def _start_workflow_handler(context, **kwargs):
- with context.track_changes:
- context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context))
+ context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context))
@events.on_failure_workflow_signal.connect
def _failure_workflow_handler(context, **kwargs):
- with context.track_changes:
- context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context))
+ context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context))
@events.on_success_workflow_signal.connect
def _success_workflow_handler(context, **kwargs):
- with context.track_changes:
- context.logger.info(
- "'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context)
- )
+ context.logger.info("'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context))
@events.on_cancelled_workflow_signal.connect
def _cancel_workflow_handler(context, **kwargs):
- with context.track_changes:
- context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context))
+ context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context))
@events.on_cancelling_workflow_signal.connect
def _cancelling_workflow_handler(context, **kwargs):
- with context.track_changes:
- context.logger.info(
- "Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context)
- )
+ context.logger.info("Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 9e1ce7e..257d12c 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -33,7 +33,7 @@ class BaseExecutor(logger.LoggerMixin):
Execute a task
:param task: task to execute
"""
- with ctx.track_changes:
+ with ctx.persist_changes:
if ctx.task.function:
self._execute(ctx)
else:
@@ -64,5 +64,5 @@ class BaseExecutor(logger.LoggerMixin):
class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method
def execute(self, ctx, *args, **kwargs):
- with ctx.track_changes:
+ with ctx.persist_changes:
ctx.task.status = ctx.task.SUCCESS
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index 88d2e12..9d86125 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -26,7 +26,7 @@ class DryExecutor(base.BaseExecutor):
Executor which dry runs tasks - prints task information without causing any side effects
"""
def execute(self, ctx):
- with ctx.track_changes:
+ with ctx.persist_changes:
# updating the task manually instead of calling self._task_started(task),
# to avoid any side effects raising that event might cause
ctx.task.started_at = datetime.utcnow()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/tests/orchestrator/context/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py
index abe92b9..d6c5d26 100644
--- a/tests/orchestrator/context/__init__.py
+++ b/tests/orchestrator/context/__init__.py
@@ -15,8 +15,7 @@
import sys
-from aria.orchestrator.workflows.core import engine
-from aria.orchestrator import workflow_runner
+from aria.orchestrator.workflows.core import engine, task
def op_path(func, module_path=None):
@@ -27,7 +26,7 @@ def op_path(func, module_path=None):
def execute(workflow_func, workflow_context, executor):
graph = workflow_func(ctx=workflow_context)
- engine.construct_execution_tasks(workflow_context.execution, graph, executor.__class__)
+ task.construct_execution_tasks(workflow_context.execution, graph, executor.__class__)
workflow_context.execution = workflow_context.execution
eng = engine.Engine(executor)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index 84d8952..ee02b41 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -16,7 +16,7 @@
import pytest
from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
from aria.orchestrator.workflows.executor import process
from aria.orchestrator import workflow, operation
import tests
@@ -48,7 +48,7 @@ def test_serialize_operation_context(context, executor, tmpdir):
context.model.node.update(node)
graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- engine.construct_execution_tasks(context.execution, graph, executor.__class__)
+ task.construct_execution_tasks(context.execution, graph, executor.__class__)
context.execution = context.execution
eng = engine.Engine(executor)
eng.execute(context)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index cb1503b..8f48e4a 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -28,7 +28,7 @@ from aria.orchestrator.execution_plugin.exceptions import ProcessException
from aria.orchestrator.execution_plugin import local
from aria.orchestrator.execution_plugin import constants
from aria.orchestrator.workflows.executor import process
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
from tests import mock
from tests import storage
@@ -500,8 +500,7 @@ if __name__ == '__main__':
arguments=arguments))
return graph
tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter
- engine.construct_execution_tasks(
- workflow_context.execution, tasks_graph, executor.__class__)
+ task.construct_execution_tasks(workflow_context.execution, tasks_graph, executor.__class__)
workflow_context.execution = workflow_context.execution
eng = engine.Engine(executor)
eng.execute(workflow_context)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index 2eb3c0b..bfc0b98 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -29,7 +29,7 @@ from aria.orchestrator import events
from aria.orchestrator import workflow
from aria.orchestrator.workflows import api
from aria.orchestrator.workflows.executor import process
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
from aria.orchestrator.workflows.exceptions import ExecutorException
from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException
from aria.orchestrator.execution_plugin import operations
@@ -254,7 +254,7 @@ class TestWithActualSSHServer(object):
graph.sequence(*ops)
return graph
tasks_graph = mock_workflow(ctx=self._workflow_context) # pylint: disable=no-value-for-parameter
- engine.construct_execution_tasks(
+ task.construct_execution_tasks(
self._workflow_context.execution, tasks_graph, self._executor.__class__)
self._workflow_context.execution = self._workflow_context.execution
eng = engine.Engine(self._executor)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index 7bdac67..f6f638f 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -28,7 +28,7 @@ from aria.orchestrator.workflows import (
api,
exceptions,
)
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
from aria.orchestrator.workflows.executor import thread
from tests import mock, storage
@@ -51,10 +51,10 @@ class BaseTest(object):
def _engine(workflow_func, workflow_context, executor):
graph = workflow_func(ctx=workflow_context)
execution = workflow_context.execution
- engine.construct_execution_tasks(execution, graph, executor.__class__)
+ task.construct_execution_tasks(execution, graph, executor.__class__)
workflow_context.execution = execution
- return engine.Engine(default_executor=executor)
+ return engine.Engine(executors=executor)
@staticmethod
def _create_interface(ctx, func, arguments=None):
@@ -101,7 +101,7 @@ class BaseTest(object):
@pytest.fixture(autouse=True)
def signals_registration(self, ):
def sent_task_handler(ctx, *args, **kwargs):
- if ctx.task.stub_type is None:
+ if ctx.task._stub_type is None:
calls = global_test_holder.setdefault('sent_task_signal_calls', 0)
global_test_holder['sent_task_signal_calls'] = calls + 1
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/tests/orchestrator/workflows/core/test_events.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py
index f0699df..c30b373 100644
--- a/tests/orchestrator/workflows/core/test_events.py
+++ b/tests/orchestrator/workflows/core/test_events.py
@@ -16,7 +16,7 @@
import pytest
from aria.orchestrator.decorators import operation, workflow
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
from aria.orchestrator.workflows.executor.thread import ThreadExecutor
from aria.orchestrator.workflows import api
from aria.modeling.service_instance import NodeBase
@@ -113,13 +113,13 @@ def run_operation_on_node(ctx, op_name, interface_name):
operation_name=op_name,
operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func)))
node.interfaces[interface.name] = interface
- engine.construct_execution_tasks(
+ task.construct_execution_tasks(
ctx.execution,
single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name),
ThreadExecutor)
ctx.execution = ctx.execution
- eng = engine.Engine(default_executor=ThreadExecutor())
+ eng = engine.Engine(executors=ThreadExecutor())
eng.execute(ctx)
return node
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 569e8be..faac35f 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -21,6 +21,7 @@ from aria.orchestrator.workflows import (
api,
core
)
+from aria.orchestrator.workflows.core import task
from aria.orchestrator.workflows.executor import base
from tests import mock
from tests import storage
@@ -70,7 +71,7 @@ def test_task_graph_into_execution_graph(tmpdir):
# Direct check
execution = workflow_context.model.execution.list()[0]
- core.engine.construct_execution_tasks(execution, test_task_graph, base.StubTaskExecutor)
+ task.construct_execution_tasks(execution, test_task_graph, base.StubTaskExecutor)
workflow_context.execution = execution
execution_tasks = topological_sort(workflow_context._graph)
@@ -87,23 +88,23 @@ def test_task_graph_into_execution_graph(tmpdir):
'{0}-End'.format(test_task_graph.id)
]
- assert expected_tasks_names == [t.api_id for t in execution_tasks]
+ assert expected_tasks_names == [t._api_id for t in execution_tasks]
assert all(isinstance(task, models.Task) for task in execution_tasks)
execution_tasks = iter(execution_tasks)
- assert next(execution_tasks).stub_type == models.Task.START_WORKFLOW
+ assert next(execution_tasks)._stub_type == models.Task.START_WORKFLOW
_assert_execution_is_api_task(next(execution_tasks), simple_before_task)
- assert next(execution_tasks).stub_type == models.Task.START_SUBWROFKLOW
+ assert next(execution_tasks)._stub_type == models.Task.START_SUBWROFKLOW
_assert_execution_is_api_task(next(execution_tasks), inner_task)
- assert next(execution_tasks).stub_type == models.Task.END_SUBWORKFLOW
+ assert next(execution_tasks)._stub_type == models.Task.END_SUBWORKFLOW
_assert_execution_is_api_task(next(execution_tasks), simple_after_task)
- assert next(execution_tasks).stub_type == models.Task.END_WORKFLOW
+ assert next(execution_tasks)._stub_type == models.Task.END_WORKFLOW
storage.release_sqlite_storage(workflow_context.model)
def _assert_execution_is_api_task(execution_task, api_task):
- assert execution_task.api_id == api_task.id
+ assert execution_task._api_id == api_task.id
assert execution_task.name == api_task.name
assert execution_task.function == api_task.function
assert execution_task.actor == api_task.actor
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index 97b24f3..83584a6 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -53,7 +53,7 @@ class MockContext(object):
@property
@contextmanager
- def track_changes(self):
+ def persist_changes(self):
yield
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 8ede925..4566ec3 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -17,7 +17,7 @@ import pytest
from aria import extension
from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
from aria.orchestrator.workflows.executor import process
from aria.orchestrator import workflow, operation
@@ -57,7 +57,7 @@ def test_decorate_extension(context, executor):
graph.add_tasks(task)
return graph
graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- engine.construct_execution_tasks(context.execution, graph, executor.__class__)
+ task.construct_execution_tasks(context.execution, graph, executor.__class__)
context.execution = context.execution
eng = engine.Engine(executor)
eng.execute(context)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3281ad95/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 9eb8916..0c15c8a 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -18,7 +18,7 @@ import copy
import pytest
from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
from aria.orchestrator.workflows.executor import process
from aria.orchestrator import workflow, operation
from aria.orchestrator.workflows import exceptions
@@ -107,7 +107,7 @@ def _run_workflow(context, executor, op_func, arguments=None):
graph.add_tasks(task)
return graph
graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- engine.construct_execution_tasks(context.execution, graph, executor.__class__)
+ task.construct_execution_tasks(context.execution, graph, executor.__class__)
context.execution = context.execution
eng = engine.Engine(executor)
eng.execute(context)