You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/05/03 13:57:24 UTC

incubator-ariatosca git commit: review2

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-163-Update-node-state-for-stub-tasks 95255a790 -> 43da804c5


review2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/43da804c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/43da804c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/43da804c

Branch: refs/heads/ARIA-163-Update-node-state-for-stub-tasks
Commit: 43da804c52fb0af2d1eb97d71059e3d08e676150
Parents: 95255a7
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed May 3 16:57:19 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed May 3 16:57:19 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/api/task.py         | 19 +++++++-----
 aria/orchestrator/workflows/core/engine.py      |  4 +--
 aria/orchestrator/workflows/core/task.py        | 31 ++++++++++----------
 aria/orchestrator/workflows/core/translation.py |  8 ++---
 aria/orchestrator/workflows/events_logging.py   | 20 ++++++-------
 aria/orchestrator/workflows/executor/base.py    |  5 ++--
 aria/orchestrator/workflows/executor/dry.py     | 14 +++++++--
 tests/end2end/test_hello_world.py               |  3 +-
 .../test_task_graph_into_execution_graph.py     |  2 +-
 9 files changed, 55 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index cb1618c..d376dd2 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -92,14 +92,17 @@ class OperationTask(BaseTask):
                                                      name=actor.name,
                                                      interface=self.interface_name,
                                                      operation=self.operation_name)
-        self.is_stub = self.is_empty(self.actor, self.interface_name, self.operation_name)
-        if self.is_stub:
-            return
-
-        operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
-        self.plugin = operation.plugin
-        self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs)
-        self.implementation = operation.implementation
+        self.is_empty = self.is_empty(self.actor, self.interface_name, self.operation_name)
+
+        if self.is_empty:
+            self.plugin = None
+            self.inputs = {}
+            self.implementation = None
+        else:
+            operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
+            self.plugin = operation.plugin
+            self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs)
+            self.implementation = operation.implementation
 
     def __repr__(self):
         return self.name

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index a4498aa..6a66eb1 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -45,7 +45,7 @@ class Engine(logger.LoggerMixin):
         self._executor = executor
         translation.build_execution_graph(task_graph=tasks_graph,
                                           execution_graph=self._execution_graph,
-                                          executor=self._executor)
+                                          default_executor=self._executor)
 
     def execute(self):
         """
@@ -112,8 +112,6 @@ class Engine(logger.LoggerMixin):
 
     @staticmethod
     def _handle_executable_task(task):
-        if not isinstance(task, engine_task.StubTask):
-            events.sent_task_signal.send(task)
         task.execute()
 
     def _handle_ended_tasks(self, task):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index 05e3365..2a2f010 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -25,10 +25,11 @@ from functools import (
 )
 
 
+from ... import events
 from ....modeling import models
 from ...context import operation as operation_context
-from .. import exceptions
 from ..executor import base
+from .. import exceptions
 
 
 def _locked(func=None):
@@ -54,6 +55,7 @@ class BaseTask(object):
         self._executor = executor
 
     def execute(self):
+        events.sent_task_signal.send(self)
         self._executor.execute(self)
 
     @property
@@ -69,22 +71,20 @@ class StubTask(BaseTask):
     Base stub task for marker user tasks that only mark the start/end of a workflow
     or sub-workflow
     """
+    STARTED = models.Task.STARTED
+    SUCCESS = models.Task.SUCCESS
+
     def __init__(self, *args, **kwargs):
-        super(StubTask, self).__init__(executor=base.StubExecutor(), *args, **kwargs)
+        super(StubTask, self).__init__(executor=base.StubTaskExecutor(), *args, **kwargs)
         self.status = models.Task.PENDING
         self.due_at = datetime.utcnow()
 
     def has_ended(self):
-        return self.status in (models.Task.SUCCESS, models.Task.FAILED)
-
-    def is_waiting(self):
-        return self.status in (models.Task.PENDING, models.Task.RETRYING)
-
-    def end(self):
-        self.status = models.Task.SUCCESS
+        return self.status == self.SUCCESS
 
-    def start(self):
-        self.status = models.Task.STARTED
+    @staticmethod
+    def is_waiting():
+        return False
 
 
 class StartWorkflowTask(StubTask):
@@ -120,7 +120,7 @@ class OperationTask(BaseTask):
     Operation task
     """
     def __init__(self, api_task, executor=None, *args, **kwargs):
-        # If no executor is provided, we defer that this is a stub task which does not need to be
+        # If no executor is provided, we infer that this is an empty task which does not need to be
         # executed.
         super(OperationTask, self).__init__(
             id=api_task.id, executor=executor or base.EmptyOperationExecutor(), *args, **kwargs)
@@ -130,7 +130,6 @@ class OperationTask(BaseTask):
         self.operation_name = api_task.operation_name
         model_storage = api_task._workflow_context.model
 
-        # This currently signal that this is a stub task
         base_task_model = model_storage.task.model_cls
         if isinstance(api_task.actor, models.Node):
             context_cls = operation_context.NodeOperationContext
@@ -152,9 +151,9 @@ class OperationTask(BaseTask):
             execution=self._workflow_context.execution,
 
             # Only non-stub tasks have these fields
-            plugin=getattr(api_task, 'plugin', None),
-            implementation=getattr(api_task, 'implementation', None),
-            inputs=getattr(api_task, 'inputs', {}),
+            plugin=api_task.plugin,
+            implementation=api_task.implementation,
+            inputs=api_task.inputs
 
         )
         self._workflow_context.model.task.put(task_model)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
index 487d44d..d764024 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -24,7 +24,7 @@ from . import task as core_task
 def build_execution_graph(
         task_graph,
         execution_graph,
-        executor,
+        default_executor,
         start_cls=core_task.StartWorkflowTask,
         end_cls=core_task.EndWorkflowTask,
         depends_on=()):
@@ -49,10 +49,10 @@ def build_execution_graph(
             default=[start_task])
 
         if isinstance(api_task, api.task.OperationTask):
-            if api_task.is_stub:
+            if api_task.is_empty:
                 operation_task = core_task.OperationTask(api_task)
             else:
-                operation_task = core_task.OperationTask(api_task, executor=executor)
+                operation_task = core_task.OperationTask(api_task, executor=default_executor)
             _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
 
         elif isinstance(api_task, api.task.WorkflowTask):
@@ -60,7 +60,7 @@ def build_execution_graph(
             build_execution_graph(
                 task_graph=api_task,
                 execution_graph=execution_graph,
-                executor=executor,
+                default_executor=default_executor,
                 start_cls=core_task.StartSubWorkflowTask,
                 end_cls=core_task.EndSubWorkflowTask,
                 depends_on=operation_dependencies

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py
index fa993d0..69b5b7a 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -34,25 +34,23 @@ def _get_task_name(task):
 
 
 @events.start_task_signal.connect
-def _start_task_handler(task, skip_logging=False, **kwargs):
-    if skip_logging:
-        return
-    task.context.logger.info('{name} {task.interface_name}.{task.operation_name} started...'
-                             .format(name=_get_task_name(task), task=task))
+def _start_task_handler(task, **kwargs):
+    if task.is_empty:
+        task.context.logger.debug('{name} {task.interface_name}.{task.operation_name} has no '
+                                  'implementation'.format(name=_get_task_name(task), task=task))
+    else:
+        task.context.logger.info('{name} {task.interface_name}.{task.operation_name} started...'
+                                 .format(name=_get_task_name(task), task=task))
 
 
 @events.on_success_task_signal.connect
-def _success_task_handler(task, skip_logging=False, **kwargs):
-    if skip_logging:
-        return
+def _success_task_handler(task, **kwargs):
     task.context.logger.info('{name} {task.interface_name}.{task.operation_name} successful'
                              .format(name=_get_task_name(task), task=task))
 
 
 @events.on_failure_task_signal.connect
-def _failure_operation_handler(task, traceback, skip_logging=False, **kwargs):
-    if skip_logging:
-        return
+def _failure_operation_handler(task, traceback, **kwargs):
     task.context.logger.error(
         '{name} {task.interface_name}.{task.operation_name} failed'
         .format(name=_get_task_name(task), task=task), extra=dict(traceback=traceback)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 3a3a1fe..0194ee7 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -56,10 +56,9 @@ class BaseExecutor(logger.LoggerMixin):
         events.on_success_task_signal.send(task)
 
 
-class StubExecutor(BaseExecutor):
+class StubTaskExecutor(BaseExecutor):
     def execute(self, task):
-        task.start()
-        task.end()
+        task.status = task.SUCCESS
 
 
 class EmptyOperationExecutor(BaseExecutor):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index 7da48f3..eb70a41 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -16,8 +16,8 @@
 """
 Dry executor
 """
+from datetime import datetime
 
-from aria.orchestrator import events
 from .base import BaseExecutor
 
 
@@ -27,7 +27,11 @@ class DryExecutor(BaseExecutor):
     """
 
     def execute(self, task):
-        events.start_task_signal.send(task, skip_logging=True)
+        # updating the task manually instead of calling self._task_started(task),
+        # to avoid any side effects raising that event might cause
+        with task._update():
+            task.started_at = datetime.utcnow()
+            task.status = task.STARTED
 
         if hasattr(task.actor, 'source_node'):
             name = '{source_node.name}->{target_node.name}'.format(
@@ -43,4 +47,8 @@ class DryExecutor(BaseExecutor):
             '<dry> {name} {task.interface_name}.{task.operation_name} successful'
             .format(name=name, task=task))
 
-        events.on_success_task_signal.send(task, skip_logging=True)
+        # updating the task manually instead of calling self._task_succeeded(task),
+        # to avoid any side effects raising that event might cause
+        with task._update():
+            task.ended_at = datetime.utcnow()
+            task.status = task.SUCCESS

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/tests/end2end/test_hello_world.py
----------------------------------------------------------------------
diff --git a/tests/end2end/test_hello_world.py b/tests/end2end/test_hello_world.py
index dc8a2a3..fc5f631 100644
--- a/tests/end2end/test_hello_world.py
+++ b/tests/end2end/test_hello_world.py
@@ -29,8 +29,7 @@ def test_hello_world(testenv):
     finally:
         # Even if some assertions failed, attempt to execute uninstall so the
         # webserver process doesn't stay up once the test is finished
-        # TODO: remove force_service_delete=True
-        testenv.uninstall_service(force_service_delete=True)
+        testenv.uninstall_service()
 
     _verify_webserver_down('http://localhost:9090')
     testenv.verify_clean_storage()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 5dc1206..2a96d01 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -68,7 +68,7 @@ def test_task_graph_into_execution_graph(tmpdir):
     execution_graph = DiGraph()
     core.translation.build_execution_graph(task_graph=test_task_graph,
                                            execution_graph=execution_graph,
-                                           executor=base.StubExecutor())
+                                           default_executor=base.StubTaskExecutor())
     execution_tasks = topological_sort(execution_graph)
 
     assert len(execution_tasks) == 7