You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/15 08:03:19 UTC

incubator-ariatosca git commit: shited stuff around

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-278-Remove-core-tasks d1cfd261d -> 5fce85b12


shited stuff around


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5fce85b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5fce85b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5fce85b1

Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 5fce85b12ddd7ac0feebecbb456e7240ff2512ca
Parents: d1cfd26
Author: max-orlov <ma...@gigaspaces.com>
Authored: Thu Jun 15 11:03:15 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu Jun 15 11:03:15 2017 +0300

----------------------------------------------------------------------
 aria/modeling/orchestration.py                 |  3 +-
 aria/orchestrator/context/operation.py         |  8 ++++
 aria/orchestrator/workflow_runner.py           | 24 +++++-----
 aria/orchestrator/workflows/executor/base.py   | 48 +++++++++----------
 aria/orchestrator/workflows/executor/dry.py    | 51 ++++++++++-----------
 tests/orchestrator/workflows/core/test_task.py |  5 +-
 6 files changed, 70 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5fce85b1/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 6f69483..11a4684 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -452,7 +452,8 @@ class TaskBase(mixins.ModelMixin):
                 'api_id': api_task.id,
                 '_context_cls': context_cls,
                 '_executor': executor,
-        })
+            }
+        )
 
         instantiation_kwargs.update(**kwargs)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5fce85b1/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 7477912..7591d70 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -18,6 +18,7 @@ Workflow and operation contexts
 """
 
 import threading
+from contextlib import contextmanager
 
 import aria
 from aria.utils import file
@@ -109,6 +110,13 @@ class BaseOperationContext(common.BaseContext):
             self.model.log._session.remove()
             self.model.log._engine.dispose()
 
+    @property
+    @contextmanager
+    def track_task(self):
+        self.model.task.update(self.task)
+        yield
+        self.model.task.update(self.task)
+
 
 class NodeOperationContext(BaseOperationContext):
     """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5fce85b1/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index f0a48ad..b24e474 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -29,7 +29,7 @@ from .workflows import builtin
 from .workflows.core.engine import Engine
 from .workflows.executor.process import ProcessExecutor
 from .workflows.executor.base import StubTaskExecutor
-from .workflows.api import task
+from .workflows.api import task as api_task
 from ..modeling import models
 from ..modeling import utils as modeling_utils
 from ..utils.imports import import_fullname
@@ -180,9 +180,9 @@ class WorkflowRunner(object):
 
 def get_execution_graph(execution):
     graph = DiGraph()
-    for t in execution.tasks:
-        for dependency in t.dependencies:
-            graph.add_edge(dependency, t)
+    for task in execution.tasks:
+        for dependency in task.dependencies:
+            graph.add_edge(dependency, task)
 
     return graph
 
@@ -210,16 +210,16 @@ def construct_execution_tasks(execution,
                              stub_type=start_stub_type,
                              dependencies=depends_on)
 
-    for api_task in task_graph.topological_order(reverse=True):
+    for task in task_graph.topological_order(reverse=True):
         operation_dependencies = _get_tasks_from_dependencies(
-            execution, task_graph.get_dependencies(api_task), [start_task])
+            execution, task_graph.get_dependencies(task), [start_task])
 
-        if isinstance(api_task, task.OperationTask):
-            models.Task.from_api_task(api_task=api_task,
+        if isinstance(task, api_task.OperationTask):
+            models.Task.from_api_task(api_task=task,
                                       executor=default_executor,
                                       dependencies=operation_dependencies)
 
-        elif isinstance(api_task, task.WorkflowTask):
+        elif isinstance(task, api_task.WorkflowTask):
             # Build the graph recursively while adding start and end markers
             construct_execution_tasks(
                 execution=execution,
@@ -230,7 +230,7 @@ def construct_execution_tasks(execution,
                 end_stub_type=models.Task.END_SUBWORKFLOW,
                 depends_on=operation_dependencies
             )
-        elif isinstance(api_task, task.StubTask):
+        elif isinstance(task, api_task.StubTask):
             models.Task(api_id=api_task.id,
                         _executor=stub_executor,
                         execution=execution,
@@ -257,8 +257,8 @@ def _end_graph_suffix(api_id):
 
 def _get_non_dependent_tasks(execution):
     dependency_tasks = set()
-    for t in execution.tasks:
-        dependency_tasks.update(t.dependencies)
+    for task in execution.tasks:
+        dependency_tasks.update(task.dependencies)
     return list(set(execution.tasks) - set(dependency_tasks))
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5fce85b1/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index a727e5c..a1cfe4b 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -21,15 +21,6 @@ from aria import logger
 from aria.orchestrator import events
 
 
-def update_ctx(func):
-    def _wrapper(self, ctx, *args, **kwargs):
-        ctx.update_task()
-        func(self, ctx, *args, **kwargs)
-        ctx.update_task()
-
-    return _wrapper
-
-
 class BaseExecutor(logger.LoggerMixin):
     """
     Base class for executors for running tasks
@@ -37,20 +28,20 @@ class BaseExecutor(logger.LoggerMixin):
     def _execute(self, task):
         raise NotImplementedError
 
-    @update_ctx
     def execute(self, ctx):
         """
         Execute a task
         :param task: task to execute
         """
-        if ctx.task.function:
-            self._execute(ctx)
-        else:
-            # In this case the task is missing a function. This task still gets to an
-            # executor, but since there is nothing to run, we by default simply skip the execution
-            # itself.
-            self._task_started(ctx)
-            self._task_succeeded(ctx)
+        with ctx.track_task:
+            if ctx.task.function:
+                self._execute(ctx)
+            else:
+                # In this case the task is missing a function. This task still gets to an
+                # executor, but since there is nothing to run, we by default simply skip the
+                # execution itself.
+                self._task_started(ctx)
+                self._task_succeeded(ctx)
 
     def close(self):
         """
@@ -58,17 +49,20 @@ class BaseExecutor(logger.LoggerMixin):
         """
         pass
 
-    @update_ctx
-    def _task_started(self, ctx):
-        events.start_task_signal.send(ctx)
+    @staticmethod
+    def _task_started(ctx):
+        with ctx.track_task:
+            events.start_task_signal.send(ctx)
 
-    @update_ctx
-    def _task_failed(self, ctx, exception, traceback=None):
-        events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback)
+    @staticmethod
+    def _task_failed(ctx, exception, traceback=None):
+        with ctx.track_task:
+            events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback)
 
-    @update_ctx
-    def _task_succeeded(self, ctx):
-        events.on_success_task_signal.send(ctx)
+    @staticmethod
+    def _task_succeeded(ctx):
+        with ctx.track_task:
+            events.on_success_task_signal.send(ctx)
 
 
 class StubTaskExecutor(BaseExecutor):                                                               # pylint: disable=abstract-method

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5fce85b1/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index c12ba7c..a5f8507 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -25,30 +25,29 @@ class DryExecutor(base.BaseExecutor):
     """
     Executor which dry runs tasks - prints task information without causing any side effects
     """
-    @base.update_ctx
     def execute(self, ctx):
-        # updating the task manually instead of calling self._task_started(task),
-        # to avoid any side effects raising that event might cause
-        ctx.task.started_at = datetime.utcnow()
-        ctx.task.status = ctx.task.STARTED
-
-        dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}'
-        logger = ctx.logger.info if ctx.task.function else ctx.logger.debug
-
-        if hasattr(ctx.task.actor, 'source_node'):
-            name = '{source_node.name}->{target_node.name}'.format(
-                source_node=ctx.task.actor.source_node, target_node=ctx.task.actor.target_node)
-        else:
-            name = ctx.task.actor.name
-
-        if ctx.task.function:
-            logger(dry_msg.format(name=name, task=ctx.task, suffix='started...'))
-            logger(dry_msg.format(name=name, task=ctx.task, suffix='successful'))
-        else:
-            logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation'))
-
-        # updating the task manually instead of calling self._task_succeeded(task),
-        # to avoid any side effects raising that event might cause
-        ctx.task.ended_at = datetime.utcnow()
-        ctx.task.status = ctx.task.SUCCESS
-        ctx.update_task()
+        with ctx.track_task:
+            # updating the task manually instead of calling self._task_started(task),
+            # to avoid any side effects raising that event might cause
+            ctx.task.started_at = datetime.utcnow()
+            ctx.task.status = ctx.task.STARTED
+
+            dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}'
+            logger = ctx.logger.info if ctx.task.function else ctx.logger.debug
+
+            if hasattr(ctx.task.actor, 'source_node'):
+                name = '{source_node.name}->{target_node.name}'.format(
+                    source_node=ctx.task.actor.source_node, target_node=ctx.task.actor.target_node)
+            else:
+                name = ctx.task.actor.name
+
+            if ctx.task.function:
+                logger(dry_msg.format(name=name, task=ctx.task, suffix='started...'))
+                logger(dry_msg.format(name=name, task=ctx.task, suffix='successful'))
+            else:
+                logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation'))
+
+            # updating the task manually instead of calling self._task_succeeded(task),
+            # to avoid any side effects raising that event might cause
+            ctx.task.ended_at = datetime.utcnow()
+            ctx.task.status = ctx.task.SUCCESS

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5fce85b1/tests/orchestrator/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py
index f0f3a3b..2b3f7d7 100644
--- a/tests/orchestrator/workflows/core/test_task.py
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -22,7 +22,6 @@ import pytest
 from aria.orchestrator.context import workflow as workflow_context
 from aria.orchestrator.workflows import (
     api,
-    core,
     exceptions,
 )
 from aria.modeling import models
@@ -107,9 +106,9 @@ class TestOperationTask(object):
     def test_relationship_operation_task_creation(self, ctx):
         relationship = ctx.model.relationship.list()[0]
         ctx.model.relationship.update(relationship)
-        _, model_Task = self._create_relationship_operation_task(
+        _, model_task = self._create_relationship_operation_task(
             ctx, relationship)
-        assert model_Task.actor == relationship
+        assert model_task.actor == relationship
 
     @pytest.mark.skip("Currently not supported for model tasks")
     def test_operation_task_edit_locked_attribute(self, ctx):