You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/04/30 17:03:10 UTC
incubator-ariatosca git commit: wip2 [Forced Update!]
Repository: incubator-ariatosca
Updated Branches:
refs/heads/stub_task_branch 222bef595 -> 9bdc64537 (forced update)
wip2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/9bdc6453
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/9bdc6453
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/9bdc6453
Branch: refs/heads/stub_task_branch
Commit: 9bdc64537cd44dd30f4e69d9090718f58e55e88f
Parents: 282fcbf
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Apr 30 19:54:12 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Sun Apr 30 20:03:05 2017 +0300
----------------------------------------------------------------------
aria/orchestrator/events.py | 8 ----
aria/orchestrator/workflows/api/task.py | 34 +++++++------
aria/orchestrator/workflows/builtin/utils.py | 38 +++++++--------
aria/orchestrator/workflows/core/engine.py | 7 +--
.../workflows/core/events_handler.py | 3 ++
aria/orchestrator/workflows/core/task.py | 50 ++++++++++++++------
aria/orchestrator/workflows/core/translation.py | 13 +++--
aria/orchestrator/workflows/executor/dry.py | 31 +++++-------
tests/end2end/test_hello_world.py | 5 +-
9 files changed, 98 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/aria/orchestrator/events.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/events.py b/aria/orchestrator/events.py
index 812040b..a1c4922 100644
--- a/aria/orchestrator/events.py
+++ b/aria/orchestrator/events.py
@@ -28,14 +28,6 @@ start_task_signal = signal('start_task_signal')
on_success_task_signal = signal('success_task_signal')
on_failure_task_signal = signal('failure_task_signal')
-# node state signals:
-# Note that each signal corresponds with a task. The basic start_task_signal also changes the state
-# of the node on which it runs. (so does the on_success_task_signal and the on_failure_task_signal)
-start_node_signal = signal('start_task_signal')
-on_success_node_signal = signal('success_task_signal')
-on_failure_node_signal = signal('failure_task_signal')
-
-
# workflow engine workflow signals:
start_workflow_signal = signal('start_workflow_signal')
on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal')
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index 82c40c3..15397c3 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -66,7 +66,8 @@ class OperationTask(BaseTask):
inputs=None,
max_attempts=None,
retry_interval=None,
- ignore_failure=None):
+ ignore_failure=None,
+ is_stub=False):
"""
Do not call this constructor directly. Instead, use :meth:`for_node` or
:meth:`for_relationship`.
@@ -87,15 +88,18 @@ class OperationTask(BaseTask):
if ignore_failure is None else ignore_failure)
self.interface_name = interface_name
self.operation_name = operation_name
+ self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
+ name=actor.name,
+ interface=self.interface_name,
+ operation=self.operation_name)
+ self.is_stub = is_stub
+ if self.is_stub:
+ return
operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
self.plugin = operation.plugin
self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs)
self.implementation = operation.implementation
- self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
- name=actor.name,
- interface=self.interface_name,
- operation=self.operation_name)
def __repr__(self):
return self.name
@@ -108,7 +112,8 @@ class OperationTask(BaseTask):
max_attempts=None,
retry_interval=None,
ignore_failure=None,
- inputs=None):
+ inputs=None,
+ is_stub=False):
"""
Creates an operation on a node.
@@ -132,7 +137,9 @@ class OperationTask(BaseTask):
max_attempts=max_attempts,
retry_interval=retry_interval,
ignore_failure=ignore_failure,
- inputs=inputs)
+ inputs=inputs,
+ is_stub=is_stub
+ )
@classmethod
def for_relationship(cls,
@@ -142,7 +149,8 @@ class OperationTask(BaseTask):
max_attempts=None,
retry_interval=None,
ignore_failure=None,
- inputs=None):
+ inputs=None,
+ is_stub=False):
"""
Creates an operation on a relationship edge.
@@ -166,7 +174,9 @@ class OperationTask(BaseTask):
max_attempts=max_attempts,
retry_interval=retry_interval,
ignore_failure=ignore_failure,
- inputs=inputs)
+ inputs=inputs,
+ is_stub=is_stub
+ )
class WorkflowTask(BaseTask):
@@ -197,9 +207,3 @@ class WorkflowTask(BaseTask):
return getattr(self._graph, item)
except AttributeError:
return super(WorkflowTask, self).__getattribute__(item)
-
-
-class StubTask(BaseTask):
- """
- Enables creating empty tasks.
- """
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/aria/orchestrator/workflows/builtin/utils.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py
index 2254d13..e649006 100644
--- a/aria/orchestrator/workflows/builtin/utils.py
+++ b/aria/orchestrator/workflows/builtin/utils.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ..api.task import OperationTask, StubTask
+from ..api.task import OperationTask
from .. import exceptions
@@ -23,12 +23,10 @@ def create_node_task(node, interface_name, operation_name, **kwargs):
"""
try:
- if _is_empty_task(node, interface_name, operation_name):
- return StubTask()
-
return OperationTask.for_node(node=node,
interface_name=interface_name,
operation_name=operation_name,
+ is_stub=_is_empty_task(node, interface_name, operation_name),
**kwargs)
except exceptions.OperationNotFoundException:
# We will skip nodes which do not have the operation
@@ -71,29 +69,29 @@ def relationship_tasks(relationship, interface_name, source_operation_name=None,
operations = []
if source_operation_name:
try:
- if _is_empty_task(relationship, interface_name, source_operation_name):
- operations.append(StubTask())
- else:
- operations.append(
- OperationTask.for_relationship(relationship=relationship,
- interface_name=interface_name,
- operation_name=source_operation_name,
- **kwargs)
+ operations.append(
+ OperationTask.for_relationship(
+ relationship=relationship,
+ interface_name=interface_name,
+ operation_name=source_operation_name,
+ is_stub=_is_empty_task(relationship, interface_name, source_operation_name),
+ **kwargs
)
+ )
except exceptions.OperationNotFoundException:
# We will skip relationships which do not have the operation
pass
if target_operation_name:
try:
- if _is_empty_task(relationship, interface_name, target_operation_name):
- operations.append(StubTask())
- else:
- operations.append(
- OperationTask.for_relationship(relationship=relationship,
- interface_name=interface_name,
- operation_name=target_operation_name,
- **kwargs)
+ operations.append(
+ OperationTask.for_relationship(
+ relationship=relationship,
+ interface_name=interface_name,
+ operation_name=target_operation_name,
+ is_stub=_is_empty_task(relationship, interface_name, target_operation_name),
+ **kwargs
)
+ )
except exceptions.OperationNotFoundException:
# We will skip relationships which do not have the operation
pass
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 97c4999..353d9db 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -111,12 +111,9 @@ class Engine(logger.LoggerMixin):
yield task
def _handle_executable_task(self, task):
- # import pydevd; pydevd.settrace('localhost', suspend=False)
- if isinstance(task, engine_task.StubTask):
- task.status = models.Task.SUCCESS
- else:
+ if not isinstance(task, engine_task.MarkerTaskBase):
events.sent_task_signal.send(task)
- task.execute()
+ task.execute()
def _handle_ended_tasks(self, task):
if task.status == models.Task.FAILED and not task.ignore_failure:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index b43082b..83b79d5 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -41,6 +41,9 @@ def _task_started(task, *args, **kwargs):
task.started_at = datetime.utcnow()
task.status = task.STARTED
+@events.start_task_signal.connect
+def _node_task_started(task, *args, **kwargs):
+ with task._update():
_update_node_state_if_necessary(task, is_transitional=True)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index 548bf47..39dd2c7 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -64,15 +64,21 @@ class BaseTask(object):
return self._id
-class StubTask(BaseTask):
+class MarkerTaskBase(BaseTask):
"""
- Base stub task for all tasks that don't actually run anything
+ Base stub task for marker user tasks that only mark the start/end of a workflow
+ or sub-workflow
"""
-
def __init__(self, *args, **kwargs):
- super(StubTask, self).__init__(executor=dry.StubExecutor(), *args, **kwargs)
+ super(MarkerTaskBase, self).__init__(executor=dry.MarkerExecutor(), *args, **kwargs)
self.status = models.Task.PENDING
self.due_at = datetime.utcnow()
+ self.started_at = None
+ self.ended_at = None
+
+ @contextmanager
+ def _update(self):
+ yield
def has_ended(self):
return self.status in (models.Task.SUCCESS, models.Task.FAILED)
@@ -80,29 +86,37 @@ class StubTask(BaseTask):
def is_waiting(self):
return self.status in (models.Task.PENDING, models.Task.RETRYING)
+ def end(self):
+ self.ended_at = datetime.utcnow()
+ self.status = models.Task.SUCCESS
-class StartWorkflowTask(StubTask):
+ def start(self):
+ self.started_at = datetime.utcnow()
+ self.status = models.Task.STARTED
+
+
+class StartWorkflowTask(MarkerTaskBase):
"""
Task marking a workflow start
"""
pass
-class EndWorkflowTask(StubTask):
+class EndWorkflowTask(MarkerTaskBase):
"""
Task marking a workflow end
"""
pass
-class StartSubWorkflowTask(StubTask):
+class StartSubWorkflowTask(MarkerTaskBase):
"""
Task marking a subworkflow start
"""
pass
-class EndSubWorkflowTask(StubTask):
+class EndSubWorkflowTask(MarkerTaskBase):
"""
Task marking a subworkflow end
"""
@@ -113,15 +127,18 @@ class OperationTask(BaseTask):
"""
Operation task
"""
+ def __init__(self, api_task, executor=None, *args, **kwargs):
+ # If no executor is provided, we defer that this is a stub task which does not need to be
+ # executed.
+ super(OperationTask, self).__init__(
+ id=api_task.id, executor=executor or dry.StubExecutor(), *args, **kwargs)
- def __init__(self, api_task, *args, **kwargs):
- super(OperationTask, self).__init__(id=api_task.id, **kwargs)
self._workflow_context = api_task._workflow_context
self.interface_name = api_task.interface_name
self.operation_name = api_task.operation_name
model_storage = api_task._workflow_context.model
- plugin = api_task.plugin
+ # This currently signal that this is a stub task
base_task_model = model_storage.task.model_cls
if isinstance(api_task.actor, models.Node):
context_cls = operation_context.NodeOperationContext
@@ -141,15 +158,18 @@ class OperationTask(BaseTask):
task_model = create_task_model(
name=api_task.name,
- implementation=api_task.implementation,
actor=api_task.actor,
- inputs=api_task.inputs,
status=base_task_model.PENDING,
max_attempts=api_task.max_attempts,
retry_interval=api_task.retry_interval,
ignore_failure=api_task.ignore_failure,
- plugin=plugin,
- execution=self._workflow_context.execution
+ execution=self._workflow_context.execution,
+
+ # Only non-stub tasks have these fields
+ plugin=getattr(api_task, 'plugin', None),
+ implementation=getattr(api_task, 'implementation', None),
+ inputs=getattr(api_task, 'inputs', {}),
+
)
self._workflow_context.model.task.put(task_model)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
index 1ae59a3..487d44d 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -49,9 +49,12 @@ def build_execution_graph(
default=[start_task])
if isinstance(api_task, api.task.OperationTask):
- # Add the task an the dependencies
- operation_task = core_task.OperationTask(api_task, executor=executor)
+ if api_task.is_stub:
+ operation_task = core_task.OperationTask(api_task)
+ else:
+ operation_task = core_task.OperationTask(api_task, executor=executor)
_add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
+
elif isinstance(api_task, api.task.WorkflowTask):
# Build the graph recursively while adding start and end markers
build_execution_graph(
@@ -62,9 +65,6 @@ def build_execution_graph(
end_cls=core_task.EndSubWorkflowTask,
depends_on=operation_dependencies
)
- elif isinstance(api_task, api.task.StubTask):
- stub_task = core_task.StubTask(id=api_task.id)
- _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies)
else:
raise RuntimeError('Undefined state')
@@ -88,8 +88,7 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
Returns task list from dependencies.
"""
return [execution_graph.node[dependency.id
- if isinstance(dependency, (api.task.OperationTask,
- api.task.StubTask))
+ if isinstance(dependency, api.task.OperationTask)
else _end_graph_suffix(dependency.id)]['task']
for dependency in dependencies] or default
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index 5be8015..7f3df4e 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -20,39 +20,32 @@ Dry executor
from datetime import datetime
from aria.orchestrator import events
+from aria.orchestrator.workflows.core.events_handler import _update_node_state_if_necessary
from .base import BaseExecutor
# TODO: the name of this module should definitely change
-class StubExecutor(BaseExecutor):
-
- @staticmethod
- def _task_sent(*args, **kwargs):
- pass
-
- @staticmethod
- def _task_started(task):
- events.start_task_signal.send(task, skip_logging=True)
-
- @staticmethod
- def _task_succeeded(task):
- events.on_success_task_signal.send(task, skip_logging=True)
+class MarkerExecutor(BaseExecutor):
+ def execute(self, task):
+ task.start()
+ task.end()
- @staticmethod
- def _task_failed(*args, **kwargs):
- pass
+class StubExecutor(BaseExecutor):
def execute(self, task):
- pass
+ events.start_task_signal.send(task)
+ events.on_success_task_signal.send(task)
-class DryExecutor(StubExecutor):
+class DryExecutor(BaseExecutor):
"""
Executor which dry runs tasks - prints task information without causing any side effects
"""
def execute(self, task):
+ events.start_task_signal.send(task, skip_logging=True)
+
if hasattr(task.actor, 'source_node'):
name = '{source_node.name}->{target_node.name}'.format(
source_node=task.actor.source_node, target_node=task.actor.target_node)
@@ -66,3 +59,5 @@ class DryExecutor(StubExecutor):
task.context.logger.info(
'<dry> {name} {task.interface_name}.{task.operation_name} successful'
.format(name=name, task=task))
+
+ events.on_success_task_signal.send(task, skip_logging=True)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9bdc6453/tests/end2end/test_hello_world.py
----------------------------------------------------------------------
diff --git a/tests/end2end/test_hello_world.py b/tests/end2end/test_hello_world.py
index 8895593..6714ce8 100644
--- a/tests/end2end/test_hello_world.py
+++ b/tests/end2end/test_hello_world.py
@@ -30,8 +30,7 @@ def test_hello_world(testenv):
# Even if some assertions failed, attempt to execute uninstall so the
# webserver process doesn't stay up once the test is finished
# TODO: remove force_service_delete=True
- pass
- # testenv.uninstall_service(force_service_delete=True)
+ testenv.uninstall_service(force_service_delete=True)
_verify_webserver_down('http://localhost:9090')
testenv.verify_clean_storage()
@@ -58,5 +57,5 @@ def _verify_deployed_service_in_storage(service_name, model_storage):
assert service.name == service_name
assert len(service.executions) == 1
assert len(service.nodes) == 2
- # TODO: validate node states
+ assert all(node.state == node.STARTED for node in service.nodes)
assert len(service.executions[0].logs) > 0