You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/15 08:03:19 UTC
incubator-ariatosca git commit: shited stuff around
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-278-Remove-core-tasks d1cfd261d -> 5fce85b12
shited stuff around
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5fce85b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5fce85b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5fce85b1
Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 5fce85b12ddd7ac0feebecbb456e7240ff2512ca
Parents: d1cfd26
Author: max-orlov <ma...@gigaspaces.com>
Authored: Thu Jun 15 11:03:15 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu Jun 15 11:03:15 2017 +0300
----------------------------------------------------------------------
aria/modeling/orchestration.py | 3 +-
aria/orchestrator/context/operation.py | 8 ++++
aria/orchestrator/workflow_runner.py | 24 +++++-----
aria/orchestrator/workflows/executor/base.py | 48 +++++++++----------
aria/orchestrator/workflows/executor/dry.py | 51 ++++++++++-----------
tests/orchestrator/workflows/core/test_task.py | 5 +-
6 files changed, 70 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5fce85b1/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 6f69483..11a4684 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -452,7 +452,8 @@ class TaskBase(mixins.ModelMixin):
'api_id': api_task.id,
'_context_cls': context_cls,
'_executor': executor,
- })
+ }
+ )
instantiation_kwargs.update(**kwargs)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5fce85b1/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 7477912..7591d70 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -18,6 +18,7 @@ Workflow and operation contexts
"""
import threading
+from contextlib import contextmanager
import aria
from aria.utils import file
@@ -109,6 +110,13 @@ class BaseOperationContext(common.BaseContext):
self.model.log._session.remove()
self.model.log._engine.dispose()
+ @property
+ @contextmanager
+ def track_task(self):
+ self.model.task.update(self.task)
+ yield
+ self.model.task.update(self.task)
+
class NodeOperationContext(BaseOperationContext):
"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5fce85b1/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index f0a48ad..b24e474 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -29,7 +29,7 @@ from .workflows import builtin
from .workflows.core.engine import Engine
from .workflows.executor.process import ProcessExecutor
from .workflows.executor.base import StubTaskExecutor
-from .workflows.api import task
+from .workflows.api import task as api_task
from ..modeling import models
from ..modeling import utils as modeling_utils
from ..utils.imports import import_fullname
@@ -180,9 +180,9 @@ class WorkflowRunner(object):
def get_execution_graph(execution):
graph = DiGraph()
- for t in execution.tasks:
- for dependency in t.dependencies:
- graph.add_edge(dependency, t)
+ for task in execution.tasks:
+ for dependency in task.dependencies:
+ graph.add_edge(dependency, task)
return graph
@@ -210,16 +210,16 @@ def construct_execution_tasks(execution,
stub_type=start_stub_type,
dependencies=depends_on)
- for api_task in task_graph.topological_order(reverse=True):
+ for task in task_graph.topological_order(reverse=True):
operation_dependencies = _get_tasks_from_dependencies(
- execution, task_graph.get_dependencies(api_task), [start_task])
+ execution, task_graph.get_dependencies(task), [start_task])
- if isinstance(api_task, task.OperationTask):
- models.Task.from_api_task(api_task=api_task,
+ if isinstance(task, api_task.OperationTask):
+ models.Task.from_api_task(api_task=task,
executor=default_executor,
dependencies=operation_dependencies)
- elif isinstance(api_task, task.WorkflowTask):
+ elif isinstance(task, api_task.WorkflowTask):
# Build the graph recursively while adding start and end markers
construct_execution_tasks(
execution=execution,
@@ -230,7 +230,7 @@ def construct_execution_tasks(execution,
end_stub_type=models.Task.END_SUBWORKFLOW,
depends_on=operation_dependencies
)
- elif isinstance(api_task, task.StubTask):
+ elif isinstance(task, api_task.StubTask):
models.Task(api_id=api_task.id,
_executor=stub_executor,
execution=execution,
@@ -257,8 +257,8 @@ def _end_graph_suffix(api_id):
def _get_non_dependent_tasks(execution):
dependency_tasks = set()
- for t in execution.tasks:
- dependency_tasks.update(t.dependencies)
+ for task in execution.tasks:
+ dependency_tasks.update(task.dependencies)
return list(set(execution.tasks) - set(dependency_tasks))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5fce85b1/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index a727e5c..a1cfe4b 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -21,15 +21,6 @@ from aria import logger
from aria.orchestrator import events
-def update_ctx(func):
- def _wrapper(self, ctx, *args, **kwargs):
- ctx.update_task()
- func(self, ctx, *args, **kwargs)
- ctx.update_task()
-
- return _wrapper
-
-
class BaseExecutor(logger.LoggerMixin):
"""
Base class for executors for running tasks
@@ -37,20 +28,20 @@ class BaseExecutor(logger.LoggerMixin):
def _execute(self, task):
raise NotImplementedError
- @update_ctx
def execute(self, ctx):
"""
Execute a task
:param task: task to execute
"""
- if ctx.task.function:
- self._execute(ctx)
- else:
- # In this case the task is missing a function. This task still gets to an
- # executor, but since there is nothing to run, we by default simply skip the execution
- # itself.
- self._task_started(ctx)
- self._task_succeeded(ctx)
+ with ctx.track_task:
+ if ctx.task.function:
+ self._execute(ctx)
+ else:
+ # In this case the task is missing a function. This task still gets to an
+ # executor, but since there is nothing to run, we by default simply skip the
+ # execution itself.
+ self._task_started(ctx)
+ self._task_succeeded(ctx)
def close(self):
"""
@@ -58,17 +49,20 @@ class BaseExecutor(logger.LoggerMixin):
"""
pass
- @update_ctx
- def _task_started(self, ctx):
- events.start_task_signal.send(ctx)
+ @staticmethod
+ def _task_started(ctx):
+ with ctx.track_task:
+ events.start_task_signal.send(ctx)
- @update_ctx
- def _task_failed(self, ctx, exception, traceback=None):
- events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback)
+ @staticmethod
+ def _task_failed(ctx, exception, traceback=None):
+ with ctx.track_task:
+ events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback)
- @update_ctx
- def _task_succeeded(self, ctx):
- events.on_success_task_signal.send(ctx)
+ @staticmethod
+ def _task_succeeded(ctx):
+ with ctx.track_task:
+ events.on_success_task_signal.send(ctx)
class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5fce85b1/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index c12ba7c..a5f8507 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -25,30 +25,29 @@ class DryExecutor(base.BaseExecutor):
"""
Executor which dry runs tasks - prints task information without causing any side effects
"""
- @base.update_ctx
def execute(self, ctx):
- # updating the task manually instead of calling self._task_started(task),
- # to avoid any side effects raising that event might cause
- ctx.task.started_at = datetime.utcnow()
- ctx.task.status = ctx.task.STARTED
-
- dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}'
- logger = ctx.logger.info if ctx.task.function else ctx.logger.debug
-
- if hasattr(ctx.task.actor, 'source_node'):
- name = '{source_node.name}->{target_node.name}'.format(
- source_node=ctx.task.actor.source_node, target_node=ctx.task.actor.target_node)
- else:
- name = ctx.task.actor.name
-
- if ctx.task.function:
- logger(dry_msg.format(name=name, task=ctx.task, suffix='started...'))
- logger(dry_msg.format(name=name, task=ctx.task, suffix='successful'))
- else:
- logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation'))
-
- # updating the task manually instead of calling self._task_succeeded(task),
- # to avoid any side effects raising that event might cause
- ctx.task.ended_at = datetime.utcnow()
- ctx.task.status = ctx.task.SUCCESS
- ctx.update_task()
+ with ctx.track_task:
+ # updating the task manually instead of calling self._task_started(task),
+ # to avoid any side effects raising that event might cause
+ ctx.task.started_at = datetime.utcnow()
+ ctx.task.status = ctx.task.STARTED
+
+ dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}'
+ logger = ctx.logger.info if ctx.task.function else ctx.logger.debug
+
+ if hasattr(ctx.task.actor, 'source_node'):
+ name = '{source_node.name}->{target_node.name}'.format(
+ source_node=ctx.task.actor.source_node, target_node=ctx.task.actor.target_node)
+ else:
+ name = ctx.task.actor.name
+
+ if ctx.task.function:
+ logger(dry_msg.format(name=name, task=ctx.task, suffix='started...'))
+ logger(dry_msg.format(name=name, task=ctx.task, suffix='successful'))
+ else:
+ logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation'))
+
+ # updating the task manually instead of calling self._task_succeeded(task),
+ # to avoid any side effects raising that event might cause
+ ctx.task.ended_at = datetime.utcnow()
+ ctx.task.status = ctx.task.SUCCESS
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5fce85b1/tests/orchestrator/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py
index f0f3a3b..2b3f7d7 100644
--- a/tests/orchestrator/workflows/core/test_task.py
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -22,7 +22,6 @@ import pytest
from aria.orchestrator.context import workflow as workflow_context
from aria.orchestrator.workflows import (
api,
- core,
exceptions,
)
from aria.modeling import models
@@ -107,9 +106,9 @@ class TestOperationTask(object):
def test_relationship_operation_task_creation(self, ctx):
relationship = ctx.model.relationship.list()[0]
ctx.model.relationship.update(relationship)
- _, model_Task = self._create_relationship_operation_task(
+ _, model_task = self._create_relationship_operation_task(
ctx, relationship)
- assert model_Task.actor == relationship
+ assert model_task.actor == relationship
@pytest.mark.skip("Currently not supported for model tasks")
def test_operation_task_edit_locked_attribute(self, ctx):