You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/05/03 13:57:24 UTC
incubator-ariatosca git commit: review2
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-163-Update-node-state-for-stub-tasks 95255a790 -> 43da804c5
review2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/43da804c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/43da804c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/43da804c
Branch: refs/heads/ARIA-163-Update-node-state-for-stub-tasks
Commit: 43da804c52fb0af2d1eb97d71059e3d08e676150
Parents: 95255a7
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed May 3 16:57:19 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed May 3 16:57:19 2017 +0300
----------------------------------------------------------------------
aria/orchestrator/workflows/api/task.py | 19 +++++++-----
aria/orchestrator/workflows/core/engine.py | 4 +--
aria/orchestrator/workflows/core/task.py | 31 ++++++++++----------
aria/orchestrator/workflows/core/translation.py | 8 ++---
aria/orchestrator/workflows/events_logging.py | 20 ++++++-------
aria/orchestrator/workflows/executor/base.py | 5 ++--
aria/orchestrator/workflows/executor/dry.py | 14 +++++++--
tests/end2end/test_hello_world.py | 3 +-
.../test_task_graph_into_execution_graph.py | 2 +-
9 files changed, 55 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index cb1618c..d376dd2 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -92,14 +92,17 @@ class OperationTask(BaseTask):
name=actor.name,
interface=self.interface_name,
operation=self.operation_name)
- self.is_stub = self.is_empty(self.actor, self.interface_name, self.operation_name)
- if self.is_stub:
- return
-
- operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
- self.plugin = operation.plugin
- self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs)
- self.implementation = operation.implementation
+ self.is_empty = self.is_empty(self.actor, self.interface_name, self.operation_name)
+
+ if self.is_empty:
+ self.plugin = None
+ self.inputs = {}
+ self.implementation = None
+ else:
+ operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
+ self.plugin = operation.plugin
+ self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs)
+ self.implementation = operation.implementation
def __repr__(self):
return self.name
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index a4498aa..6a66eb1 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -45,7 +45,7 @@ class Engine(logger.LoggerMixin):
self._executor = executor
translation.build_execution_graph(task_graph=tasks_graph,
execution_graph=self._execution_graph,
- executor=self._executor)
+ default_executor=self._executor)
def execute(self):
"""
@@ -112,8 +112,6 @@ class Engine(logger.LoggerMixin):
@staticmethod
def _handle_executable_task(task):
- if not isinstance(task, engine_task.StubTask):
- events.sent_task_signal.send(task)
task.execute()
def _handle_ended_tasks(self, task):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index 05e3365..2a2f010 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -25,10 +25,11 @@ from functools import (
)
+from ... import events
from ....modeling import models
from ...context import operation as operation_context
-from .. import exceptions
from ..executor import base
+from .. import exceptions
def _locked(func=None):
@@ -54,6 +55,7 @@ class BaseTask(object):
self._executor = executor
def execute(self):
+ events.sent_task_signal.send(self)
self._executor.execute(self)
@property
@@ -69,22 +71,20 @@ class StubTask(BaseTask):
Base stub task for marker user tasks that only mark the start/end of a workflow
or sub-workflow
"""
+ STARTED = models.Task.STARTED
+ SUCCESS = models.Task.SUCCESS
+
def __init__(self, *args, **kwargs):
- super(StubTask, self).__init__(executor=base.StubExecutor(), *args, **kwargs)
+ super(StubTask, self).__init__(executor=base.StubTaskExecutor(), *args, **kwargs)
self.status = models.Task.PENDING
self.due_at = datetime.utcnow()
def has_ended(self):
- return self.status in (models.Task.SUCCESS, models.Task.FAILED)
-
- def is_waiting(self):
- return self.status in (models.Task.PENDING, models.Task.RETRYING)
-
- def end(self):
- self.status = models.Task.SUCCESS
+ return self.status == self.SUCCESS
- def start(self):
- self.status = models.Task.STARTED
+ @staticmethod
+ def is_waiting():
+ return False
class StartWorkflowTask(StubTask):
@@ -120,7 +120,7 @@ class OperationTask(BaseTask):
Operation task
"""
def __init__(self, api_task, executor=None, *args, **kwargs):
- # If no executor is provided, we defer that this is a stub task which does not need to be
+ # If no executor is provided, we infer that this is an empty task which does not need to be
# executed.
super(OperationTask, self).__init__(
id=api_task.id, executor=executor or base.EmptyOperationExecutor(), *args, **kwargs)
@@ -130,7 +130,6 @@ class OperationTask(BaseTask):
self.operation_name = api_task.operation_name
model_storage = api_task._workflow_context.model
- # This currently signal that this is a stub task
base_task_model = model_storage.task.model_cls
if isinstance(api_task.actor, models.Node):
context_cls = operation_context.NodeOperationContext
@@ -152,9 +151,9 @@ class OperationTask(BaseTask):
execution=self._workflow_context.execution,
# Only non-stub tasks have these fields
- plugin=getattr(api_task, 'plugin', None),
- implementation=getattr(api_task, 'implementation', None),
- inputs=getattr(api_task, 'inputs', {}),
+ plugin=api_task.plugin,
+ implementation=api_task.implementation,
+ inputs=api_task.inputs
)
self._workflow_context.model.task.put(task_model)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
index 487d44d..d764024 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -24,7 +24,7 @@ from . import task as core_task
def build_execution_graph(
task_graph,
execution_graph,
- executor,
+ default_executor,
start_cls=core_task.StartWorkflowTask,
end_cls=core_task.EndWorkflowTask,
depends_on=()):
@@ -49,10 +49,10 @@ def build_execution_graph(
default=[start_task])
if isinstance(api_task, api.task.OperationTask):
- if api_task.is_stub:
+ if api_task.is_empty:
operation_task = core_task.OperationTask(api_task)
else:
- operation_task = core_task.OperationTask(api_task, executor=executor)
+ operation_task = core_task.OperationTask(api_task, executor=default_executor)
_add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
elif isinstance(api_task, api.task.WorkflowTask):
@@ -60,7 +60,7 @@ def build_execution_graph(
build_execution_graph(
task_graph=api_task,
execution_graph=execution_graph,
- executor=executor,
+ default_executor=default_executor,
start_cls=core_task.StartSubWorkflowTask,
end_cls=core_task.EndSubWorkflowTask,
depends_on=operation_dependencies
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py
index fa993d0..69b5b7a 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -34,25 +34,23 @@ def _get_task_name(task):
@events.start_task_signal.connect
-def _start_task_handler(task, skip_logging=False, **kwargs):
- if skip_logging:
- return
- task.context.logger.info('{name} {task.interface_name}.{task.operation_name} started...'
- .format(name=_get_task_name(task), task=task))
+def _start_task_handler(task, **kwargs):
+ if task.is_empty:
+ task.context.logger.debug('{name} {task.interface_name}.{task.operation_name} has no '
+ 'implementation'.format(name=_get_task_name(task), task=task))
+ else:
+ task.context.logger.info('{name} {task.interface_name}.{task.operation_name} started...'
+ .format(name=_get_task_name(task), task=task))
@events.on_success_task_signal.connect
-def _success_task_handler(task, skip_logging=False, **kwargs):
- if skip_logging:
- return
+def _success_task_handler(task, **kwargs):
task.context.logger.info('{name} {task.interface_name}.{task.operation_name} successful'
.format(name=_get_task_name(task), task=task))
@events.on_failure_task_signal.connect
-def _failure_operation_handler(task, traceback, skip_logging=False, **kwargs):
- if skip_logging:
- return
+def _failure_operation_handler(task, traceback, **kwargs):
task.context.logger.error(
'{name} {task.interface_name}.{task.operation_name} failed'
.format(name=_get_task_name(task), task=task), extra=dict(traceback=traceback)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 3a3a1fe..0194ee7 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -56,10 +56,9 @@ class BaseExecutor(logger.LoggerMixin):
events.on_success_task_signal.send(task)
-class StubExecutor(BaseExecutor):
+class StubTaskExecutor(BaseExecutor):
def execute(self, task):
- task.start()
- task.end()
+ task.status = task.SUCCESS
class EmptyOperationExecutor(BaseExecutor):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index 7da48f3..eb70a41 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -16,8 +16,8 @@
"""
Dry executor
"""
+from datetime import datetime
-from aria.orchestrator import events
from .base import BaseExecutor
@@ -27,7 +27,11 @@ class DryExecutor(BaseExecutor):
"""
def execute(self, task):
- events.start_task_signal.send(task, skip_logging=True)
+ # updating the task manually instead of calling self._task_started(task),
+ # to avoid any side effects raising that event might cause
+ with task._update():
+ task.started_at = datetime.utcnow()
+ task.status = task.STARTED
if hasattr(task.actor, 'source_node'):
name = '{source_node.name}->{target_node.name}'.format(
@@ -43,4 +47,8 @@ class DryExecutor(BaseExecutor):
'<dry> {name} {task.interface_name}.{task.operation_name} successful'
.format(name=name, task=task))
- events.on_success_task_signal.send(task, skip_logging=True)
+ # updating the task manually instead of calling self._task_succeeded(task),
+ # to avoid any side effects raising that event might cause
+ with task._update():
+ task.ended_at = datetime.utcnow()
+ task.status = task.SUCCESS
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/tests/end2end/test_hello_world.py
----------------------------------------------------------------------
diff --git a/tests/end2end/test_hello_world.py b/tests/end2end/test_hello_world.py
index dc8a2a3..fc5f631 100644
--- a/tests/end2end/test_hello_world.py
+++ b/tests/end2end/test_hello_world.py
@@ -29,8 +29,7 @@ def test_hello_world(testenv):
finally:
# Even if some assertions failed, attempt to execute uninstall so the
# webserver process doesn't stay up once the test is finished
- # TODO: remove force_service_delete=True
- testenv.uninstall_service(force_service_delete=True)
+ testenv.uninstall_service()
_verify_webserver_down('http://localhost:9090')
testenv.verify_clean_storage()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 5dc1206..2a96d01 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -68,7 +68,7 @@ def test_task_graph_into_execution_graph(tmpdir):
execution_graph = DiGraph()
core.translation.build_execution_graph(task_graph=test_task_graph,
execution_graph=execution_graph,
- executor=base.StubExecutor())
+ default_executor=base.StubTaskExecutor())
execution_tasks = topological_sort(execution_graph)
assert len(execution_tasks) == 7