You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/05/04 10:38:04 UTC

incubator-ariatosca git commit: reverted the 'safe' behavior back to a function [Forced Update!]

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-163-Update-node-state-for-stub-tasks 1303f0925 -> 33c7736f7 (forced update)


reverted the 'safe' behavior back to a function


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/33c7736f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/33c7736f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/33c7736f

Branch: refs/heads/ARIA-163-Update-node-state-for-stub-tasks
Commit: 33c7736f7858b1821d639223e36f77f350095b91
Parents: c56ab97
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed May 3 20:04:59 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu May 4 13:37:59 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/api/task.py         | 102 ++++++++++---------
 .../orchestrator/workflows/builtin/workflows.py |  10 +-
 aria/orchestrator/workflows/core/translation.py |  26 +++--
 aria/orchestrator/workflows/events_logging.py   |   9 +-
 .../orchestrator/workflows/core/test_engine.py  |  41 +++-----
 5 files changed, 93 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33c7736f/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index 889a86a..52f1d66 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -72,63 +72,43 @@ class OperationTask(BaseTask):
         Do not call this constructor directly. Instead, use :meth:`for_node` or
         :meth:`for_relationship`.
         """
-
-        actor_type = type(actor).__name__.lower()
         assert isinstance(actor, (models.Node, models.Relationship))
-        assert actor_type in ('node', 'relationship')
-        assert interface_name and operation_name
         super(OperationTask, self).__init__()
-
         self.actor = actor
-        self.max_attempts = (self.workflow_context._task_max_attempts
-                             if max_attempts is None else max_attempts)
-        self.retry_interval = (self.workflow_context._task_retry_interval
-                               if retry_interval is None else retry_interval)
-        self.ignore_failure = (self.workflow_context._task_ignore_failure
-                               if ignore_failure is None else ignore_failure)
         self.interface_name = interface_name
         self.operation_name = operation_name
-        self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
+        self.max_attempts = max_attempts or self.workflow_context._task_max_attempts
+        self.retry_interval = retry_interval or self.workflow_context._task_retry_interval
+        self.ignore_failure = ignore_failure or self.workflow_context._task_ignore_failure
+        self.name = OperationTask.NAME_FORMAT.format(type=type(actor).__name__.lower(),
                                                      name=actor.name,
                                                      interface=self.interface_name,
                                                      operation=self.operation_name)
-        if self.is_empty:
-            self.plugin = None
-            self.inputs = {}
-            self.implementation = None
-        else:
-            operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
-            self.plugin = operation.plugin
-            self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs)
-            self.implementation = operation.implementation
+        # Creating OperationTask directly should raise an error when there is no
+        # interface/operation.
+
+        if not has_operation(self.actor, self.interface_name, self.operation_name):
+            raise exceptions.OperationNotFoundException(
+                'Could not find operation "{self.operation_name}" on interface '
+                '"{self.interface_name}" for {actor_type} "{actor.name}"'.format(
+                    self=self,
+                    actor_type=type(actor).__name__.lower(),
+                    actor=actor)
+            )
+
+        operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
+        self.plugin = operation.plugin
+        self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs)
+        self.implementation = operation.implementation
 
     def __repr__(self):
         return self.name
 
-    # def __new__(cls, actor, interface_name, operation_name, *args, **kwargs):
-    #     """
-    #     Returns a new operation task if the operation exists in the node, otherwise returns None.
-    #     """
-    #     try:
-    #         cls.is_empty(actor, interface_name, operation_name)
-    #         return super(OperationTask, cls).__new__(cls)
-    #     except exceptions.OperationNotFoundException:
-    #         return None
-
-    @property
-    def is_empty(self):
-        interface = self.actor.interfaces.get(self.interface_name)
-        if interface:
-            operation = interface.operations.get(self.operation_name)
-            if operation:
-                return operation.implementation is None
 
-        raise exceptions.OperationNotFoundException(
-            'Could not find operation "{0}" on interface "{1}" for {2} "{3}"'
-            .format(self.operation_name,
-                    self.interface_name,
-                    type(self.actor).__name__.lower(),
-                    self.actor.name))
+class StubTask(BaseTask):
+    """
+    Enables creating empty tasks.
+    """
 
 
 class WorkflowTask(BaseTask):
@@ -161,6 +141,27 @@ class WorkflowTask(BaseTask):
             return super(WorkflowTask, self).__getattribute__(item)
 
 
+def create_task(actor, interface_name, operation_name, **kwargs):
+    """
+    This helper function enables safe creation of OperationTask, if the supplied interface and
+    operation have no implementation, None is returned.
+    :param actor: the actor for this task
+    :param interface_name: the name of the interface
+    :param operation_name: the name of the operation
+    :param kwargs: any additional kwargs to be passed to the task OperationTask
+    :return: and OperationTask or None (if no interface/operation exists)
+    """
+    try:
+        return OperationTask(
+            actor,
+            interface_name=interface_name,
+            operation_name=operation_name,
+            **kwargs
+        )
+    except exceptions.OperationNotFoundException:
+        return None
+
+
 def create_relationships_tasks(
         node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs):
     """
@@ -197,7 +198,7 @@ def create_relationship_tasks(relationship, interface_name, source_operation_nam
     operations = []
     if source_operation_name:
         operations.append(
-            OperationTask(
+            create_task(
                 relationship,
                 interface_name=interface_name,
                 operation_name=source_operation_name,
@@ -206,7 +207,7 @@ def create_relationship_tasks(relationship, interface_name, source_operation_nam
         )
     if target_operation_name:
         operations.append(
-            OperationTask(
+            create_task(
                 relationship,
                 interface_name=interface_name,
                 operation_name=target_operation_name,
@@ -215,3 +216,12 @@ def create_relationship_tasks(relationship, interface_name, source_operation_nam
         )
 
     return operations
+
+
+def has_operation(actor, interface_name, operation_name):
+    interface = actor.interfaces.get(interface_name)
+    if interface:
+        operation = interface.operations.get(operation_name)
+        if operation and operation.implementation:
+            return True
+    return False

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33c7736f/aria/orchestrator/workflows/builtin/workflows.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py
index 1fc9eed..b286e98 100644
--- a/aria/orchestrator/workflows/builtin/workflows.py
+++ b/aria/orchestrator/workflows/builtin/workflows.py
@@ -69,14 +69,14 @@ __all__ = (
 @workflow(suffix_template='{node.name}')
 def install_node(graph, node, **kwargs):
     # Create
-    sequence = [task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)]
+    sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)]
 
     # Configure
     sequence += task.create_relationships_tasks(node,
                                                 NORMATIVE_CONFIGURE_INTERFACE,
                                                 NORMATIVE_PRE_CONFIGURE_SOURCE,
                                                 NORMATIVE_PRE_CONFIGURE_TARGET)
-    sequence.append(task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE))
+    sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE))
     sequence += task.create_relationships_tasks(node,
                                                 NORMATIVE_CONFIGURE_INTERFACE,
                                                 NORMATIVE_POST_CONFIGURE_SOURCE,
@@ -93,7 +93,7 @@ def uninstall_node(graph, node, **kwargs):
     sequence = _create_stop_tasks(node)
 
     # Delete
-    sequence.append(task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_DELETE))
+    sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_DELETE))
 
     graph.sequence(*sequence)
 
@@ -109,7 +109,7 @@ def stop_node(graph, node, **kwargs):
 
 
 def _create_start_tasks(node):
-    sequence = [task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)]
+    sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)]
     sequence += task.create_relationships_tasks(node,
                                                 NORMATIVE_CONFIGURE_INTERFACE,
                                                 NORMATIVE_ADD_SOURCE, NORMATIVE_ADD_TARGET)
@@ -117,7 +117,7 @@ def _create_start_tasks(node):
 
 
 def _create_stop_tasks(node):
-    sequence = [task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)]
+    sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)]
     sequence += task.create_relationships_tasks(node,
                                                 NORMATIVE_CONFIGURE_INTERFACE,
                                                 NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33c7736f/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
index d764024..b31ea8a 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -44,17 +44,14 @@ def build_execution_graph(
     for api_task in task_graph.topological_order(reverse=True):
         dependencies = task_graph.get_dependencies(api_task)
         operation_dependencies = _get_tasks_from_dependencies(
-            execution_graph,
-            dependencies,
-            default=[start_task])
+            execution_graph, dependencies, default=[start_task])
 
         if isinstance(api_task, api.task.OperationTask):
-            if api_task.is_empty:
-                operation_task = core_task.OperationTask(api_task)
-            else:
+            if api_task.implementation:
                 operation_task = core_task.OperationTask(api_task, executor=default_executor)
+            else:
+                operation_task = core_task.OperationTask(api_task)
             _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
-
         elif isinstance(api_task, api.task.WorkflowTask):
             # Build the graph recursively while adding start and end markers
             build_execution_graph(
@@ -65,6 +62,9 @@ def build_execution_graph(
                 end_cls=core_task.EndSubWorkflowTask,
                 depends_on=operation_dependencies
             )
+        elif isinstance(api_task, api.task.StubTask):
+            stub_task = core_task.StubTask(id=api_task.id)
+            _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies)
         else:
             raise RuntimeError('Undefined state')
 
@@ -87,10 +87,14 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
     """
     Returns task list from dependencies.
     """
-    return [execution_graph.node[dependency.id
-                                 if isinstance(dependency, api.task.OperationTask)
-                                 else _end_graph_suffix(dependency.id)]['task']
-            for dependency in dependencies] or default
+    tasks = []
+    for dependency in dependencies:
+        if isinstance(dependency, (api.task.OperationTask, api.task.StubTask)):
+            dependency_id = dependency.id
+        else:
+            dependency_id = _end_graph_suffix(dependency.id)
+        tasks.append(execution_graph.node[dependency_id]['task'])
+    return tasks or default
 
 
 def _start_graph_suffix(id):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33c7736f/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py
index b031146..9913012 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -36,12 +36,9 @@ def _get_task_name(task):
 @events.start_task_signal.connect
 def _start_task_handler(task, **kwargs):
     # If the task has not implementation this is an empty task.
-    if task.implementation:
-        task.context.logger.debug('{name} {task.interface_name}.{task.operation_name} has no '
-                                  'implementation'.format(name=_get_task_name(task), task=task))
-    else:
-        task.context.logger.info('{name} {task.interface_name}.{task.operation_name} started...'
-                                 .format(name=_get_task_name(task), task=task))
+    suffix = 'started...' if task.implementation else 'has no implementation'
+    task.context.logger.debug('{name} {task.interface_name}.{task.operation_name} {suffix}'
+                              .format(name=_get_task_name(task), task=task, suffix=suffix))
 
 
 @events.on_success_task_signal.connect

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33c7736f/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index c9911dc..8c0705b 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -56,40 +56,27 @@ class BaseTest(object):
 
     @staticmethod
     def _op(ctx,
-            func=None,
+            func,
             inputs=None,
             max_attempts=None,
             retry_interval=None,
-            ignore_failure=None,
-            is_stub=False):
+            ignore_failure=None):
         node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
-
-        if not is_stub:
-            operation_kwargs = dict(implementation='{name}.{func.__name__}'.format(
-                name=__name__, func=func))
-            if inputs:
-                # the operation has to declare the inputs before those may be passed
-                operation_kwargs['inputs'] = inputs
-
-            interface = mock.models.create_interface(
-                node.service,
-                'aria.interfaces.lifecycle',
-                'create',
-                operation_kwargs=operation_kwargs
-            )
-        else:
-            interface = mock.models.create_interface(
-                node.service,
-                'aria.interfaces.lifecycle',
-                'create',
-            )
-
+        interface_name = 'aria.interfaces.lifecycle'
+        operation_kwargs = dict(implementation='{name}.{func.__name__}'.format(
+            name=__name__, func=func))
+        if inputs:
+            # the operation has to declare the inputs before those may be passed
+            operation_kwargs['inputs'] = inputs
+        operation_name = 'create'
+        interface = mock.models.create_interface(node.service, interface_name, operation_name,
+                                                 operation_kwargs=operation_kwargs)
         node.interfaces[interface.name] = interface
 
         return api.task.OperationTask(
             node,
             interface_name='aria.interfaces.lifecycle',
-            operation_name='create',
+            operation_name=operation_name,
             inputs=inputs or {},
             max_attempts=max_attempts,
             retry_interval=retry_interval,
@@ -218,7 +205,7 @@ class TestEngine(BaseTest):
         @workflow
         def sub_workflow(ctx, graph):
             op1 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 1})
-            op2 = self._op(ctx, is_stub=True)
+            op2 = api.task.StubTask()
             op3 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 2})
             graph.sequence(op1, op2, op3)
 
@@ -231,7 +218,7 @@ class TestEngine(BaseTest):
         assert workflow_context.states == ['start', 'success']
         assert workflow_context.exception is None
         assert global_test_holder.get('invocations') == [1, 2]
-        assert global_test_holder.get('sent_task_signal_calls') == 3
+        assert global_test_holder.get('sent_task_signal_calls') == 2
 
 
 class TestCancel(BaseTest):