You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/05/04 10:38:04 UTC
incubator-ariatosca git commit: reverted the 'safe' behavior back to
a function [Forced Update!]
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-163-Update-node-state-for-stub-tasks 1303f0925 -> 33c7736f7 (forced update)
reverted the 'safe' behavior back to a function
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/33c7736f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/33c7736f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/33c7736f
Branch: refs/heads/ARIA-163-Update-node-state-for-stub-tasks
Commit: 33c7736f7858b1821d639223e36f77f350095b91
Parents: c56ab97
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed May 3 20:04:59 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu May 4 13:37:59 2017 +0300
----------------------------------------------------------------------
aria/orchestrator/workflows/api/task.py | 102 ++++++++++---------
.../orchestrator/workflows/builtin/workflows.py | 10 +-
aria/orchestrator/workflows/core/translation.py | 26 +++--
aria/orchestrator/workflows/events_logging.py | 9 +-
.../orchestrator/workflows/core/test_engine.py | 41 +++-----
5 files changed, 93 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33c7736f/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index 889a86a..52f1d66 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -72,63 +72,43 @@ class OperationTask(BaseTask):
Do not call this constructor directly. Instead, use :meth:`for_node` or
:meth:`for_relationship`.
"""
-
- actor_type = type(actor).__name__.lower()
assert isinstance(actor, (models.Node, models.Relationship))
- assert actor_type in ('node', 'relationship')
- assert interface_name and operation_name
super(OperationTask, self).__init__()
-
self.actor = actor
- self.max_attempts = (self.workflow_context._task_max_attempts
- if max_attempts is None else max_attempts)
- self.retry_interval = (self.workflow_context._task_retry_interval
- if retry_interval is None else retry_interval)
- self.ignore_failure = (self.workflow_context._task_ignore_failure
- if ignore_failure is None else ignore_failure)
self.interface_name = interface_name
self.operation_name = operation_name
- self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
+ self.max_attempts = max_attempts or self.workflow_context._task_max_attempts
+ self.retry_interval = retry_interval or self.workflow_context._task_retry_interval
+ self.ignore_failure = ignore_failure or self.workflow_context._task_ignore_failure
+ self.name = OperationTask.NAME_FORMAT.format(type=type(actor).__name__.lower(),
name=actor.name,
interface=self.interface_name,
operation=self.operation_name)
- if self.is_empty:
- self.plugin = None
- self.inputs = {}
- self.implementation = None
- else:
- operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
- self.plugin = operation.plugin
- self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs)
- self.implementation = operation.implementation
+ # Creating OperationTask directly should raise an error when there is no
+ # interface/operation.
+
+ if not has_operation(self.actor, self.interface_name, self.operation_name):
+ raise exceptions.OperationNotFoundException(
+ 'Could not find operation "{self.operation_name}" on interface '
+ '"{self.interface_name}" for {actor_type} "{actor.name}"'.format(
+ self=self,
+ actor_type=type(actor).__name__.lower(),
+ actor=actor)
+ )
+
+ operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
+ self.plugin = operation.plugin
+ self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs)
+ self.implementation = operation.implementation
def __repr__(self):
return self.name
- # def __new__(cls, actor, interface_name, operation_name, *args, **kwargs):
- # """
- # Returns a new operation task if the operation exists in the node, otherwise returns None.
- # """
- # try:
- # cls.is_empty(actor, interface_name, operation_name)
- # return super(OperationTask, cls).__new__(cls)
- # except exceptions.OperationNotFoundException:
- # return None
-
- @property
- def is_empty(self):
- interface = self.actor.interfaces.get(self.interface_name)
- if interface:
- operation = interface.operations.get(self.operation_name)
- if operation:
- return operation.implementation is None
- raise exceptions.OperationNotFoundException(
- 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"'
- .format(self.operation_name,
- self.interface_name,
- type(self.actor).__name__.lower(),
- self.actor.name))
+class StubTask(BaseTask):
+ """
+ Enables creating empty tasks.
+ """
class WorkflowTask(BaseTask):
@@ -161,6 +141,27 @@ class WorkflowTask(BaseTask):
return super(WorkflowTask, self).__getattribute__(item)
+def create_task(actor, interface_name, operation_name, **kwargs):
+ """
+ This helper function enables safe creation of OperationTask, if the supplied interface and
+ operation have no implementation, None is returned.
+ :param actor: the actor for this task
+ :param interface_name: the name of the interface
+ :param operation_name: the name of the operation
+ :param kwargs: any additional kwargs to be passed to the task OperationTask
+ :return: and OperationTask or None (if no interface/operation exists)
+ """
+ try:
+ return OperationTask(
+ actor,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ **kwargs
+ )
+ except exceptions.OperationNotFoundException:
+ return None
+
+
def create_relationships_tasks(
node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs):
"""
@@ -197,7 +198,7 @@ def create_relationship_tasks(relationship, interface_name, source_operation_nam
operations = []
if source_operation_name:
operations.append(
- OperationTask(
+ create_task(
relationship,
interface_name=interface_name,
operation_name=source_operation_name,
@@ -206,7 +207,7 @@ def create_relationship_tasks(relationship, interface_name, source_operation_nam
)
if target_operation_name:
operations.append(
- OperationTask(
+ create_task(
relationship,
interface_name=interface_name,
operation_name=target_operation_name,
@@ -215,3 +216,12 @@ def create_relationship_tasks(relationship, interface_name, source_operation_nam
)
return operations
+
+
+def has_operation(actor, interface_name, operation_name):
+ interface = actor.interfaces.get(interface_name)
+ if interface:
+ operation = interface.operations.get(operation_name)
+ if operation and operation.implementation:
+ return True
+ return False
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33c7736f/aria/orchestrator/workflows/builtin/workflows.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py
index 1fc9eed..b286e98 100644
--- a/aria/orchestrator/workflows/builtin/workflows.py
+++ b/aria/orchestrator/workflows/builtin/workflows.py
@@ -69,14 +69,14 @@ __all__ = (
@workflow(suffix_template='{node.name}')
def install_node(graph, node, **kwargs):
# Create
- sequence = [task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)]
+ sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)]
# Configure
sequence += task.create_relationships_tasks(node,
NORMATIVE_CONFIGURE_INTERFACE,
NORMATIVE_PRE_CONFIGURE_SOURCE,
NORMATIVE_PRE_CONFIGURE_TARGET)
- sequence.append(task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE))
+ sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE))
sequence += task.create_relationships_tasks(node,
NORMATIVE_CONFIGURE_INTERFACE,
NORMATIVE_POST_CONFIGURE_SOURCE,
@@ -93,7 +93,7 @@ def uninstall_node(graph, node, **kwargs):
sequence = _create_stop_tasks(node)
# Delete
- sequence.append(task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_DELETE))
+ sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_DELETE))
graph.sequence(*sequence)
@@ -109,7 +109,7 @@ def stop_node(graph, node, **kwargs):
def _create_start_tasks(node):
- sequence = [task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)]
+ sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)]
sequence += task.create_relationships_tasks(node,
NORMATIVE_CONFIGURE_INTERFACE,
NORMATIVE_ADD_SOURCE, NORMATIVE_ADD_TARGET)
@@ -117,7 +117,7 @@ def _create_start_tasks(node):
def _create_stop_tasks(node):
- sequence = [task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)]
+ sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)]
sequence += task.create_relationships_tasks(node,
NORMATIVE_CONFIGURE_INTERFACE,
NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33c7736f/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
index d764024..b31ea8a 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -44,17 +44,14 @@ def build_execution_graph(
for api_task in task_graph.topological_order(reverse=True):
dependencies = task_graph.get_dependencies(api_task)
operation_dependencies = _get_tasks_from_dependencies(
- execution_graph,
- dependencies,
- default=[start_task])
+ execution_graph, dependencies, default=[start_task])
if isinstance(api_task, api.task.OperationTask):
- if api_task.is_empty:
- operation_task = core_task.OperationTask(api_task)
- else:
+ if api_task.implementation:
operation_task = core_task.OperationTask(api_task, executor=default_executor)
+ else:
+ operation_task = core_task.OperationTask(api_task)
_add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
-
elif isinstance(api_task, api.task.WorkflowTask):
# Build the graph recursively while adding start and end markers
build_execution_graph(
@@ -65,6 +62,9 @@ def build_execution_graph(
end_cls=core_task.EndSubWorkflowTask,
depends_on=operation_dependencies
)
+ elif isinstance(api_task, api.task.StubTask):
+ stub_task = core_task.StubTask(id=api_task.id)
+ _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies)
else:
raise RuntimeError('Undefined state')
@@ -87,10 +87,14 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
"""
Returns task list from dependencies.
"""
- return [execution_graph.node[dependency.id
- if isinstance(dependency, api.task.OperationTask)
- else _end_graph_suffix(dependency.id)]['task']
- for dependency in dependencies] or default
+ tasks = []
+ for dependency in dependencies:
+ if isinstance(dependency, (api.task.OperationTask, api.task.StubTask)):
+ dependency_id = dependency.id
+ else:
+ dependency_id = _end_graph_suffix(dependency.id)
+ tasks.append(execution_graph.node[dependency_id]['task'])
+ return tasks or default
def _start_graph_suffix(id):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33c7736f/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py
index b031146..9913012 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -36,12 +36,9 @@ def _get_task_name(task):
@events.start_task_signal.connect
def _start_task_handler(task, **kwargs):
# If the task has not implementation this is an empty task.
- if task.implementation:
- task.context.logger.debug('{name} {task.interface_name}.{task.operation_name} has no '
- 'implementation'.format(name=_get_task_name(task), task=task))
- else:
- task.context.logger.info('{name} {task.interface_name}.{task.operation_name} started...'
- .format(name=_get_task_name(task), task=task))
+ suffix = 'started...' if task.implementation else 'has no implementation'
+ task.context.logger.debug('{name} {task.interface_name}.{task.operation_name} {suffix}'
+ .format(name=_get_task_name(task), task=task, suffix=suffix))
@events.on_success_task_signal.connect
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33c7736f/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index c9911dc..8c0705b 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -56,40 +56,27 @@ class BaseTest(object):
@staticmethod
def _op(ctx,
- func=None,
+ func,
inputs=None,
max_attempts=None,
retry_interval=None,
- ignore_failure=None,
- is_stub=False):
+ ignore_failure=None):
node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
-
- if not is_stub:
- operation_kwargs = dict(implementation='{name}.{func.__name__}'.format(
- name=__name__, func=func))
- if inputs:
- # the operation has to declare the inputs before those may be passed
- operation_kwargs['inputs'] = inputs
-
- interface = mock.models.create_interface(
- node.service,
- 'aria.interfaces.lifecycle',
- 'create',
- operation_kwargs=operation_kwargs
- )
- else:
- interface = mock.models.create_interface(
- node.service,
- 'aria.interfaces.lifecycle',
- 'create',
- )
-
+ interface_name = 'aria.interfaces.lifecycle'
+ operation_kwargs = dict(implementation='{name}.{func.__name__}'.format(
+ name=__name__, func=func))
+ if inputs:
+ # the operation has to declare the inputs before those may be passed
+ operation_kwargs['inputs'] = inputs
+ operation_name = 'create'
+ interface = mock.models.create_interface(node.service, interface_name, operation_name,
+ operation_kwargs=operation_kwargs)
node.interfaces[interface.name] = interface
return api.task.OperationTask(
node,
interface_name='aria.interfaces.lifecycle',
- operation_name='create',
+ operation_name=operation_name,
inputs=inputs or {},
max_attempts=max_attempts,
retry_interval=retry_interval,
@@ -218,7 +205,7 @@ class TestEngine(BaseTest):
@workflow
def sub_workflow(ctx, graph):
op1 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 1})
- op2 = self._op(ctx, is_stub=True)
+ op2 = api.task.StubTask()
op3 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 2})
graph.sequence(op1, op2, op3)
@@ -231,7 +218,7 @@ class TestEngine(BaseTest):
assert workflow_context.states == ['start', 'success']
assert workflow_context.exception is None
assert global_test_holder.get('invocations') == [1, 2]
- assert global_test_holder.get('sent_task_signal_calls') == 3
+ assert global_test_holder.get('sent_task_signal_calls') == 2
class TestCancel(BaseTest):