You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/04/30 17:03:10 UTC

incubator-ariatosca git commit: wip2 [Forced Update!]

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/stub_task_branch 222bef595 -> 9bdc64537 (forced update)


wip2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/9bdc6453
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/9bdc6453
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/9bdc6453

Branch: refs/heads/stub_task_branch
Commit: 9bdc64537cd44dd30f4e69d9090718f58e55e88f
Parents: 282fcbf
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Apr 30 19:54:12 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Sun Apr 30 20:03:05 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/events.py                     |  8 ----
 aria/orchestrator/workflows/api/task.py         | 34 +++++++------
 aria/orchestrator/workflows/builtin/utils.py    | 38 +++++++--------
 aria/orchestrator/workflows/core/engine.py      |  7 +--
 .../workflows/core/events_handler.py            |  3 ++
 aria/orchestrator/workflows/core/task.py        | 50 ++++++++++++++------
 aria/orchestrator/workflows/core/translation.py | 13 +++--
 aria/orchestrator/workflows/executor/dry.py     | 31 +++++-------
 tests/end2end/test_hello_world.py               |  5 +-
 9 files changed, 98 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/aria/orchestrator/events.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/events.py b/aria/orchestrator/events.py
index 812040b..a1c4922 100644
--- a/aria/orchestrator/events.py
+++ b/aria/orchestrator/events.py
@@ -28,14 +28,6 @@ start_task_signal = signal('start_task_signal')
 on_success_task_signal = signal('success_task_signal')
 on_failure_task_signal = signal('failure_task_signal')
 
-# node state signals:
-# Note that each signal corresponds with a task. The basic start_task_signal also changes the state
-# of the node on which it runs. (so does the on_success_task_signal and the on_failure_task_signal)
-start_node_signal = signal('start_task_signal')
-on_success_node_signal = signal('success_task_signal')
-on_failure_node_signal = signal('failure_task_signal')
-
-
 # workflow engine workflow signals:
 start_workflow_signal = signal('start_workflow_signal')
 on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index 82c40c3..15397c3 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -66,7 +66,8 @@ class OperationTask(BaseTask):
                  inputs=None,
                  max_attempts=None,
                  retry_interval=None,
-                 ignore_failure=None):
+                 ignore_failure=None,
+                 is_stub=False):
         """
         Do not call this constructor directly. Instead, use :meth:`for_node` or
         :meth:`for_relationship`.
@@ -87,15 +88,18 @@ class OperationTask(BaseTask):
                                if ignore_failure is None else ignore_failure)
         self.interface_name = interface_name
         self.operation_name = operation_name
+        self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
+                                                     name=actor.name,
+                                                     interface=self.interface_name,
+                                                     operation=self.operation_name)
+        self.is_stub = is_stub
+        if self.is_stub:
+            return
 
         operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
         self.plugin = operation.plugin
         self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs)
         self.implementation = operation.implementation
-        self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
-                                                     name=actor.name,
-                                                     interface=self.interface_name,
-                                                     operation=self.operation_name)
 
     def __repr__(self):
         return self.name
@@ -108,7 +112,8 @@ class OperationTask(BaseTask):
                  max_attempts=None,
                  retry_interval=None,
                  ignore_failure=None,
-                 inputs=None):
+                 inputs=None,
+                 is_stub=False):
         """
         Creates an operation on a node.
 
@@ -132,7 +137,9 @@ class OperationTask(BaseTask):
             max_attempts=max_attempts,
             retry_interval=retry_interval,
             ignore_failure=ignore_failure,
-            inputs=inputs)
+            inputs=inputs,
+            is_stub=is_stub
+        )
 
     @classmethod
     def for_relationship(cls,
@@ -142,7 +149,8 @@ class OperationTask(BaseTask):
                          max_attempts=None,
                          retry_interval=None,
                          ignore_failure=None,
-                         inputs=None):
+                         inputs=None,
+                         is_stub=False):
         """
         Creates an operation on a relationship edge.
 
@@ -166,7 +174,9 @@ class OperationTask(BaseTask):
             max_attempts=max_attempts,
             retry_interval=retry_interval,
             ignore_failure=ignore_failure,
-            inputs=inputs)
+            inputs=inputs,
+            is_stub=is_stub
+        )
 
 
 class WorkflowTask(BaseTask):
@@ -197,9 +207,3 @@ class WorkflowTask(BaseTask):
             return getattr(self._graph, item)
         except AttributeError:
             return super(WorkflowTask, self).__getattribute__(item)
-
-
-class StubTask(BaseTask):
-    """
-    Enables creating empty tasks.
-    """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/aria/orchestrator/workflows/builtin/utils.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py
index 2254d13..e649006 100644
--- a/aria/orchestrator/workflows/builtin/utils.py
+++ b/aria/orchestrator/workflows/builtin/utils.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ..api.task import OperationTask, StubTask
+from ..api.task import OperationTask
 from .. import exceptions
 
 
@@ -23,12 +23,10 @@ def create_node_task(node, interface_name, operation_name, **kwargs):
     """
 
     try:
-        if _is_empty_task(node, interface_name, operation_name):
-            return StubTask()
-
         return OperationTask.for_node(node=node,
                                       interface_name=interface_name,
                                       operation_name=operation_name,
+                                      is_stub=_is_empty_task(node, interface_name, operation_name),
                                       **kwargs)
     except exceptions.OperationNotFoundException:
         # We will skip nodes which do not have the operation
@@ -71,29 +69,29 @@ def relationship_tasks(relationship, interface_name, source_operation_name=None,
     operations = []
     if source_operation_name:
         try:
-            if _is_empty_task(relationship, interface_name, source_operation_name):
-                operations.append(StubTask())
-            else:
-                operations.append(
-                    OperationTask.for_relationship(relationship=relationship,
-                                                   interface_name=interface_name,
-                                                   operation_name=source_operation_name,
-                                                   **kwargs)
+            operations.append(
+                OperationTask.for_relationship(
+                    relationship=relationship,
+                    interface_name=interface_name,
+                    operation_name=source_operation_name,
+                    is_stub=_is_empty_task(relationship, interface_name, source_operation_name),
+                    **kwargs
                 )
+            )
         except exceptions.OperationNotFoundException:
             # We will skip relationships which do not have the operation
             pass
     if target_operation_name:
         try:
-            if _is_empty_task(relationship, interface_name, target_operation_name):
-                operations.append(StubTask())
-            else:
-                operations.append(
-                    OperationTask.for_relationship(relationship=relationship,
-                                                   interface_name=interface_name,
-                                                   operation_name=target_operation_name,
-                                                   **kwargs)
+            operations.append(
+                OperationTask.for_relationship(
+                    relationship=relationship,
+                    interface_name=interface_name,
+                    operation_name=target_operation_name,
+                    is_stub=_is_empty_task(relationship, interface_name, target_operation_name),
+                    **kwargs
                 )
+            )
         except exceptions.OperationNotFoundException:
             # We will skip relationships which do not have the operation
             pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 97c4999..353d9db 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -111,12 +111,9 @@ class Engine(logger.LoggerMixin):
             yield task
 
     def _handle_executable_task(self, task):
-        # import pydevd; pydevd.settrace('localhost', suspend=False)
-        if isinstance(task, engine_task.StubTask):
-            task.status = models.Task.SUCCESS
-        else:
+        if not isinstance(task, engine_task.MarkerTaskBase):
             events.sent_task_signal.send(task)
-            task.execute()
+        task.execute()
 
     def _handle_ended_tasks(self, task):
         if task.status == models.Task.FAILED and not task.ignore_failure:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index b43082b..83b79d5 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -41,6 +41,9 @@ def _task_started(task, *args, **kwargs):
         task.started_at = datetime.utcnow()
         task.status = task.STARTED
 
+@events.start_task_signal.connect
+def _node_task_started(task, *args, **kwargs):
+    with task._update():
         _update_node_state_if_necessary(task, is_transitional=True)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index 548bf47..39dd2c7 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -64,15 +64,21 @@ class BaseTask(object):
         return self._id
 
 
-class StubTask(BaseTask):
+class MarkerTaskBase(BaseTask):
     """
-    Base stub task for all tasks that don't actually run anything
+    Base stub task for marker user tasks that only mark the start/end of a workflow 
+    or sub-workflow
     """
-
     def __init__(self, *args, **kwargs):
-        super(StubTask, self).__init__(executor=dry.StubExecutor(), *args, **kwargs)
+        super(MarkerTaskBase, self).__init__(executor=dry.MarkerExecutor(), *args, **kwargs)
         self.status = models.Task.PENDING
         self.due_at = datetime.utcnow()
+        self.started_at = None
+        self.ended_at = None
+
+    @contextmanager
+    def _update(self):
+        yield
 
     def has_ended(self):
         return self.status in (models.Task.SUCCESS, models.Task.FAILED)
@@ -80,29 +86,37 @@ class StubTask(BaseTask):
     def is_waiting(self):
         return self.status in (models.Task.PENDING, models.Task.RETRYING)
 
+    def end(self):
+        self.ended_at = datetime.utcnow()
+        self.status = models.Task.SUCCESS
 
-class StartWorkflowTask(StubTask):
+    def start(self):
+        self.started_at = datetime.utcnow()
+        self.status = models.Task.STARTED
+
+
+class StartWorkflowTask(MarkerTaskBase):
     """
     Task marking a workflow start
     """
     pass
 
 
-class EndWorkflowTask(StubTask):
+class EndWorkflowTask(MarkerTaskBase):
     """
     Task marking a workflow end
     """
     pass
 
 
-class StartSubWorkflowTask(StubTask):
+class StartSubWorkflowTask(MarkerTaskBase):
     """
     Task marking a subworkflow start
     """
     pass
 
 
-class EndSubWorkflowTask(StubTask):
+class EndSubWorkflowTask(MarkerTaskBase):
     """
     Task marking a subworkflow end
     """
@@ -113,15 +127,18 @@ class OperationTask(BaseTask):
     """
     Operation task
     """
+    def __init__(self, api_task, executor=None, *args, **kwargs):
+        # If no executor is provided, we defer that this is a stub task which does not need to be
+        # executed.
+        super(OperationTask, self).__init__(
+            id=api_task.id, executor=executor or dry.StubExecutor(), *args, **kwargs)
 
-    def __init__(self, api_task, *args, **kwargs):
-        super(OperationTask, self).__init__(id=api_task.id, **kwargs)
         self._workflow_context = api_task._workflow_context
         self.interface_name = api_task.interface_name
         self.operation_name = api_task.operation_name
         model_storage = api_task._workflow_context.model
-        plugin = api_task.plugin
 
+        # This currently signal that this is a stub task
         base_task_model = model_storage.task.model_cls
         if isinstance(api_task.actor, models.Node):
             context_cls = operation_context.NodeOperationContext
@@ -141,15 +158,18 @@ class OperationTask(BaseTask):
 
         task_model = create_task_model(
             name=api_task.name,
-            implementation=api_task.implementation,
             actor=api_task.actor,
-            inputs=api_task.inputs,
             status=base_task_model.PENDING,
             max_attempts=api_task.max_attempts,
             retry_interval=api_task.retry_interval,
             ignore_failure=api_task.ignore_failure,
-            plugin=plugin,
-            execution=self._workflow_context.execution
+            execution=self._workflow_context.execution,
+
+            # Only non-stub tasks have these fields
+            plugin=getattr(api_task, 'plugin', None),
+            implementation=getattr(api_task, 'implementation', None),
+            inputs=getattr(api_task, 'inputs', {}),
+
         )
         self._workflow_context.model.task.put(task_model)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
index 1ae59a3..487d44d 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -49,9 +49,12 @@ def build_execution_graph(
             default=[start_task])
 
         if isinstance(api_task, api.task.OperationTask):
-            # Add the task an the dependencies
-            operation_task = core_task.OperationTask(api_task, executor=executor)
+            if api_task.is_stub:
+                operation_task = core_task.OperationTask(api_task)
+            else:
+                operation_task = core_task.OperationTask(api_task, executor=executor)
             _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
+
         elif isinstance(api_task, api.task.WorkflowTask):
             # Build the graph recursively while adding start and end markers
             build_execution_graph(
@@ -62,9 +65,6 @@ def build_execution_graph(
                 end_cls=core_task.EndSubWorkflowTask,
                 depends_on=operation_dependencies
             )
-        elif isinstance(api_task, api.task.StubTask):
-            stub_task = core_task.StubTask(id=api_task.id)
-            _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies)
         else:
             raise RuntimeError('Undefined state')
 
@@ -88,8 +88,7 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
     Returns task list from dependencies.
     """
     return [execution_graph.node[dependency.id
-                                 if isinstance(dependency, (api.task.OperationTask,
-                                                            api.task.StubTask))
+                                 if isinstance(dependency, api.task.OperationTask)
                                  else _end_graph_suffix(dependency.id)]['task']
             for dependency in dependencies] or default
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index 5be8015..7f3df4e 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -20,39 +20,32 @@ Dry executor
 from datetime import datetime
 
 from aria.orchestrator import events
+from aria.orchestrator.workflows.core.events_handler import _update_node_state_if_necessary
 from .base import BaseExecutor
 
 
 # TODO: the name of this module should definitely change
 
-class StubExecutor(BaseExecutor):
-
-    @staticmethod
-    def _task_sent(*args, **kwargs):
-        pass
-
-    @staticmethod
-    def _task_started(task):
-        events.start_task_signal.send(task, skip_logging=True)
-
-    @staticmethod
-    def _task_succeeded(task):
-        events.on_success_task_signal.send(task, skip_logging=True)
+class MarkerExecutor(BaseExecutor):
+    def execute(self, task):
+        task.start()
+        task.end()
 
-    @staticmethod
-    def _task_failed(*args, **kwargs):
-        pass
 
+class StubExecutor(BaseExecutor):
     def execute(self, task):
-        pass
+        events.start_task_signal.send(task)
+        events.on_success_task_signal.send(task)
 
 
-class DryExecutor(StubExecutor):
+class DryExecutor(BaseExecutor):
     """
     Executor which dry runs tasks - prints task information without causing any side effects
     """
 
     def execute(self, task):
+        events.start_task_signal.send(task, skip_logging=True)
+
         if hasattr(task.actor, 'source_node'):
             name = '{source_node.name}->{target_node.name}'.format(
                 source_node=task.actor.source_node, target_node=task.actor.target_node)
@@ -66,3 +59,5 @@ class DryExecutor(StubExecutor):
         task.context.logger.info(
             '<dry> {name} {task.interface_name}.{task.operation_name} successful'
             .format(name=name, task=task))
+
+        events.on_success_task_signal.send(task, skip_logging=True)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/tests/end2end/test_hello_world.py
----------------------------------------------------------------------
diff --git a/tests/end2end/test_hello_world.py b/tests/end2end/test_hello_world.py
index 8895593..6714ce8 100644
--- a/tests/end2end/test_hello_world.py
+++ b/tests/end2end/test_hello_world.py
@@ -30,8 +30,7 @@ def test_hello_world(testenv):
         # Even if some assertions failed, attempt to execute uninstall so the
         # webserver process doesn't stay up once the test is finished
         # TODO: remove force_service_delete=True
-        pass
-        # testenv.uninstall_service(force_service_delete=True)
+        testenv.uninstall_service(force_service_delete=True)
 
     _verify_webserver_down('http://localhost:9090')
     testenv.verify_clean_storage()
@@ -58,5 +57,5 @@ def _verify_deployed_service_in_storage(service_name, model_storage):
     assert service.name == service_name
     assert len(service.executions) == 1
     assert len(service.nodes) == 2
-    # TODO: validate node states
+    assert all(node.state == node.STARTED for node in service.nodes)
     assert len(service.executions[0].logs) > 0