You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by em...@apache.org on 2017/05/08 18:35:15 UTC
[09/15] incubator-ariatosca git commit: ARIA-163 Update node state
for empty tasks
ARIA-163 Update node state for empty tasks
Additional changes:
* removed `for_node` and `for_relationship` from the api OperationTask.
* api based OperationTask could also have an empty implementation.
* each core task wields its own executor.
* Reordered some of the helper functions for creating tasks.
* intoduced 2 new executors: StubTaskExecutor (for stub tasks) and EmptyOperationExecutor (for empty tasks)
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/8ca3ff29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/8ca3ff29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/8ca3ff29
Branch: refs/heads/ARIA-148-extra-cli-commands
Commit: 8ca3ff297b71e270eccdd9a2e6b8bf468ccdab5d
Parents: 0878526
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Apr 30 16:05:27 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu May 4 17:35:56 2017 +0300
----------------------------------------------------------------------
aria/logger.py | 2 +
aria/orchestrator/workflows/api/task.py | 185 ++++++++++---------
.../workflows/builtin/execute_operation.py | 28 +--
aria/orchestrator/workflows/builtin/heal.py | 4 +-
aria/orchestrator/workflows/builtin/install.py | 9 +-
aria/orchestrator/workflows/builtin/start.py | 4 +-
aria/orchestrator/workflows/builtin/stop.py | 4 +-
.../orchestrator/workflows/builtin/uninstall.py | 11 +-
aria/orchestrator/workflows/builtin/utils.py | 138 --------------
.../orchestrator/workflows/builtin/workflows.py | 71 ++++---
aria/orchestrator/workflows/core/engine.py | 12 +-
.../workflows/core/events_handler.py | 1 -
aria/orchestrator/workflows/core/task.py | 35 ++--
aria/orchestrator/workflows/core/translation.py | 33 ++--
aria/orchestrator/workflows/events_logging.py | 13 +-
aria/orchestrator/workflows/executor/base.py | 11 ++
aria/orchestrator/workflows/executor/dry.py | 1 -
.../profiles/tosca-simple-1.0/interfaces.yaml | 5 +
tests/end2end/test_hello_world.py | 5 +-
tests/end2end/testenv.py | 2 +-
tests/orchestrator/context/test_operation.py | 32 ++--
tests/orchestrator/context/test_serialize.py | 2 +-
tests/orchestrator/context/test_toolbelt.py | 8 +-
.../orchestrator/execution_plugin/test_local.py | 4 +-
tests/orchestrator/execution_plugin/test_ssh.py | 4 +-
tests/orchestrator/workflows/api/test_task.py | 16 +-
.../orchestrator/workflows/core/test_engine.py | 60 +++---
.../orchestrator/workflows/core/test_events.py | 4 +-
tests/orchestrator/workflows/core/test_task.py | 15 +-
.../test_task_graph_into_execution_graph.py | 25 ++-
...process_executor_concurrent_modifications.py | 18 +-
.../executor/test_process_executor_extension.py | 9 +-
.../test_process_executor_tracked_changes.py | 9 +-
.../tosca-simple-1.0/node-cellar/workflows.py | 10 +-
34 files changed, 361 insertions(+), 429 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index 8e15f5b..97d3878 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -195,6 +195,8 @@ class _SQLAlchemyHandler(logging.Handler):
except BaseException:
self._session.rollback()
raise
+ finally:
+ self._session.close()
_default_file_formatter = logging.Formatter(
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index 82c40c3..cb79eb3 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -21,6 +21,7 @@ from ... import context
from ....modeling import models
from ....modeling import utils as modeling_utils
from ....utils.uuid import generate_uuid
+from .. import exceptions
class BaseTask(object):
@@ -71,102 +72,44 @@ class OperationTask(BaseTask):
Do not call this constructor directly. Instead, use :meth:`for_node` or
:meth:`for_relationship`.
"""
-
- actor_type = type(actor).__name__.lower()
assert isinstance(actor, (models.Node, models.Relationship))
- assert actor_type in ('node', 'relationship')
- assert interface_name and operation_name
super(OperationTask, self).__init__()
-
self.actor = actor
- self.max_attempts = (self.workflow_context._task_max_attempts
- if max_attempts is None else max_attempts)
- self.retry_interval = (self.workflow_context._task_retry_interval
- if retry_interval is None else retry_interval)
- self.ignore_failure = (self.workflow_context._task_ignore_failure
- if ignore_failure is None else ignore_failure)
self.interface_name = interface_name
self.operation_name = operation_name
+ self.max_attempts = max_attempts or self.workflow_context._task_max_attempts
+ self.retry_interval = retry_interval or self.workflow_context._task_retry_interval
+ self.ignore_failure = \
+ self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure
+ self.name = OperationTask.NAME_FORMAT.format(type=type(actor).__name__.lower(),
+ name=actor.name,
+ interface=self.interface_name,
+ operation=self.operation_name)
+ # Creating OperationTask directly should raise an error when there is no
+ # interface/operation.
+
+ if not has_operation(self.actor, self.interface_name, self.operation_name):
+ raise exceptions.OperationNotFoundException(
+ 'Could not find operation "{self.operation_name}" on interface '
+ '"{self.interface_name}" for {actor_type} "{actor.name}"'.format(
+ self=self,
+ actor_type=type(actor).__name__.lower(),
+ actor=actor)
+ )
operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
self.plugin = operation.plugin
self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs)
self.implementation = operation.implementation
- self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
- name=actor.name,
- interface=self.interface_name,
- operation=self.operation_name)
def __repr__(self):
return self.name
- @classmethod
- def for_node(cls,
- node,
- interface_name,
- operation_name,
- max_attempts=None,
- retry_interval=None,
- ignore_failure=None,
- inputs=None):
- """
- Creates an operation on a node.
-
- :param node: The node on which to run the operation
- :param interface_name: The interface name
- :param operation_name: The operation name within the interface
- :param max_attempts: The maximum number of attempts in case the operation fails
- (if not specified the defaults it taken from the workflow context)
- :param retry_interval: The interval in seconds between attempts when the operation fails
- (if not specified the defaults it taken from the workflow context)
- :param ignore_failure: Whether to ignore failures
- (if not specified the defaults it taken from the workflow context)
- :param inputs: Additional operation inputs
- """
-
- assert isinstance(node, models.Node)
- return cls(
- actor=node,
- interface_name=interface_name,
- operation_name=operation_name,
- max_attempts=max_attempts,
- retry_interval=retry_interval,
- ignore_failure=ignore_failure,
- inputs=inputs)
-
- @classmethod
- def for_relationship(cls,
- relationship,
- interface_name,
- operation_name,
- max_attempts=None,
- retry_interval=None,
- ignore_failure=None,
- inputs=None):
- """
- Creates an operation on a relationship edge.
-
- :param relationship: The relationship on which to run the operation
- :param interface_name: The interface name
- :param operation_name: The operation name within the interface
- :param max_attempts: The maximum number of attempts in case the operation fails
- (if not specified the defaults it taken from the workflow context)
- :param retry_interval: The interval in seconds between attempts when the operation fails
- (if not specified the defaults it taken from the workflow context)
- :param ignore_failure: Whether to ignore failures
- (if not specified the defaults it taken from the workflow context)
- :param inputs: Additional operation inputs
- """
- assert isinstance(relationship, models.Relationship)
- return cls(
- actor=relationship,
- interface_name=interface_name,
- operation_name=operation_name,
- max_attempts=max_attempts,
- retry_interval=retry_interval,
- ignore_failure=ignore_failure,
- inputs=inputs)
+class StubTask(BaseTask):
+ """
+ Enables creating empty tasks.
+ """
class WorkflowTask(BaseTask):
@@ -199,7 +142,83 @@ class WorkflowTask(BaseTask):
return super(WorkflowTask, self).__getattribute__(item)
-class StubTask(BaseTask):
+def create_task(actor, interface_name, operation_name, **kwargs):
"""
- Enables creating empty tasks.
+ This helper function enables safe creation of OperationTask, if the supplied interface or
+ operation do not exist, None is returned.
+ :param actor: the actor for this task
+ :param interface_name: the name of the interface
+ :param operation_name: the name of the operation
+ :param kwargs: any additional kwargs to be passed to the task OperationTask
+ :return: and OperationTask or None (if the interface/operation does not exists)
+ """
+ try:
+ return OperationTask(
+ actor,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ **kwargs
+ )
+ except exceptions.OperationNotFoundException:
+ return None
+
+
+def create_relationships_tasks(
+ node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs):
+ """
+ Creates a relationship task (source and target) for all of a node_instance relationships.
+ :param basestring source_operation_name: the relationship operation name.
+ :param basestring interface_name: the name of the interface.
+ :param source_operation_name:
+ :param target_operation_name:
+ :param NodeInstance node: the source_node
+ :return:
+ """
+ sub_tasks = []
+ for relationship in node.outbound_relationships:
+ relationship_operations = create_relationship_tasks(
+ relationship,
+ interface_name,
+ source_operation_name=source_operation_name,
+ target_operation_name=target_operation_name,
+ **kwargs)
+ sub_tasks.append(relationship_operations)
+ return sub_tasks
+
+
+def create_relationship_tasks(relationship, interface_name, source_operation_name=None,
+ target_operation_name=None, **kwargs):
+ """
+ Creates a relationship task source and target.
+ :param Relationship relationship: the relationship instance itself
+ :param source_operation_name:
+ :param target_operation_name:
+
+ :return:
"""
+ operations = []
+ if source_operation_name:
+ operations.append(
+ create_task(
+ relationship,
+ interface_name=interface_name,
+ operation_name=source_operation_name,
+ **kwargs
+ )
+ )
+ if target_operation_name:
+ operations.append(
+ create_task(
+ relationship,
+ interface_name=interface_name,
+ operation_name=target_operation_name,
+ **kwargs
+ )
+ )
+
+ return [o for o in operations if o]
+
+
+def has_operation(actor, interface_name, operation_name):
+ interface = actor.interfaces.get(interface_name, None)
+ return interface and interface.operations.get(operation_name, False)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/builtin/execute_operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/execute_operation.py b/aria/orchestrator/workflows/builtin/execute_operation.py
index 16504ec..02a654a 100644
--- a/aria/orchestrator/workflows/builtin/execute_operation.py
+++ b/aria/orchestrator/workflows/builtin/execute_operation.py
@@ -17,8 +17,8 @@
Builtin execute_operation workflow
"""
-from . import utils
from ... import workflow
+from ..api import task
@workflow
@@ -65,11 +65,11 @@ def execute_operation(
# registering actual tasks to sequences
for node in filtered_nodes:
graph.add_tasks(
- _create_node_task(
- node=node,
+ task.OperationTask(
+ node,
interface_name=interface_name,
operation_name=operation_name,
- operation_kwargs=operation_kwargs
+ inputs=operation_kwargs
)
)
@@ -99,23 +99,3 @@ def _filter_nodes(context, node_template_ids=(), node_ids=(), type_names=()):
_is_node_by_id(node.id),
_is_node_by_type(node.node_template.type))):
yield node
-
-
-def _create_node_task(
- node,
- interface_name,
- operation_name,
- operation_kwargs):
- """
- A workflow which executes a single operation
- :param node: the node instance to install
- :param basestring operation: the operation name
- :param dict operation_kwargs:
- :return:
- """
-
- return utils.create_node_task(
- node=node,
- interface_name=interface_name,
- operation_name=operation_name,
- inputs=operation_kwargs)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/builtin/heal.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/heal.py b/aria/orchestrator/workflows/builtin/heal.py
index 92b96ea..ca382e8 100644
--- a/aria/orchestrator/workflows/builtin/heal.py
+++ b/aria/orchestrator/workflows/builtin/heal.py
@@ -103,7 +103,7 @@ def heal_uninstall(ctx, graph, failing_nodes, targeted_nodes):
graph.add_dependency(target_node_subgraph, node_sub_workflow)
if target_node in failing_nodes:
- dependency = relationship_tasks(
+ dependency = task.create_relationship_tasks(
relationship=relationship,
operation_name='aria.interfaces.relationship_lifecycle.unlink')
graph.add_tasks(*dependency)
@@ -157,7 +157,7 @@ def heal_install(ctx, graph, failing_nodes, targeted_nodes):
graph.add_dependency(node_sub_workflow, target_node_subworkflow)
if target_node in failing_nodes:
- dependent = relationship_tasks(
+ dependent = task.create_relationship_tasks(
relationship=relationship,
operation_name='aria.interfaces.relationship_lifecycle.establish')
graph.add_tasks(*dependent)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/builtin/install.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/install.py b/aria/orchestrator/workflows/builtin/install.py
index 2b9ec66..821b190 100644
--- a/aria/orchestrator/workflows/builtin/install.py
+++ b/aria/orchestrator/workflows/builtin/install.py
@@ -17,16 +17,15 @@
Builtin install workflow
"""
-from .workflows import install_node
-from .utils import create_node_task_dependencies
-from ..api.task import WorkflowTask
from ... import workflow
+from ..api import task as api_task
+from . import workflows
@workflow
def install(ctx, graph):
tasks_and_nodes = []
for node in ctx.nodes:
- tasks_and_nodes.append((WorkflowTask(install_node, node=node), node))
+ tasks_and_nodes.append((api_task.WorkflowTask(workflows.install_node, node=node), node))
graph.add_tasks([task for task, _ in tasks_and_nodes])
- create_node_task_dependencies(graph, tasks_and_nodes)
+ workflows.create_node_task_dependencies(graph, tasks_and_nodes)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/builtin/start.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/start.py b/aria/orchestrator/workflows/builtin/start.py
index ad67554..1946143 100644
--- a/aria/orchestrator/workflows/builtin/start.py
+++ b/aria/orchestrator/workflows/builtin/start.py
@@ -18,11 +18,11 @@ Builtin start workflow
"""
from .workflows import start_node
-from ..api.task import WorkflowTask
from ... import workflow
+from ..api import task as api_task
@workflow
def start(ctx, graph):
for node in ctx.model.node.iter():
- graph.add_tasks(WorkflowTask(start_node, node=node))
+ graph.add_tasks(api_task.WorkflowTask(start_node, node=node))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/builtin/stop.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/stop.py b/aria/orchestrator/workflows/builtin/stop.py
index 23ac366..c1b60ae 100644
--- a/aria/orchestrator/workflows/builtin/stop.py
+++ b/aria/orchestrator/workflows/builtin/stop.py
@@ -18,11 +18,11 @@ Builtin stop workflow
"""
from .workflows import stop_node
-from ..api.task import WorkflowTask
+from ..api import task as api_task
from ... import workflow
@workflow
def stop(ctx, graph):
for node in ctx.model.node.iter():
- graph.add_tasks(WorkflowTask(stop_node, node=node))
+ graph.add_tasks(api_task.WorkflowTask(stop_node, node=node))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/builtin/uninstall.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/uninstall.py b/aria/orchestrator/workflows/builtin/uninstall.py
index e4afcd9..c35117e 100644
--- a/aria/orchestrator/workflows/builtin/uninstall.py
+++ b/aria/orchestrator/workflows/builtin/uninstall.py
@@ -17,18 +17,15 @@
Builtin uninstall workflow
"""
-from .workflows import uninstall_node
-from .utils import create_node_task_dependencies
-from ..api.task import WorkflowTask
from ... import workflow
+from ..api import task as api_task
+from . import workflows
@workflow
def uninstall(ctx, graph):
tasks_and_nodes = []
for node in ctx.nodes:
- tasks_and_nodes.append((
- WorkflowTask(uninstall_node, node=node),
- node))
+ tasks_and_nodes.append((api_task.WorkflowTask(workflows.uninstall_node, node=node), node))
graph.add_tasks([task for task, _ in tasks_and_nodes])
- create_node_task_dependencies(graph, tasks_and_nodes, reverse=True)
+ workflows.create_node_task_dependencies(graph, tasks_and_nodes, reverse=True)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/builtin/utils.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py
deleted file mode 100644
index 2254d13..0000000
--- a/aria/orchestrator/workflows/builtin/utils.py
+++ /dev/null
@@ -1,138 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ..api.task import OperationTask, StubTask
-from .. import exceptions
-
-
-def create_node_task(node, interface_name, operation_name, **kwargs):
- """
- Returns a new operation task if the operation exists in the node, otherwise returns None.
- """
-
- try:
- if _is_empty_task(node, interface_name, operation_name):
- return StubTask()
-
- return OperationTask.for_node(node=node,
- interface_name=interface_name,
- operation_name=operation_name,
- **kwargs)
- except exceptions.OperationNotFoundException:
- # We will skip nodes which do not have the operation
- return None
-
-
-def create_relationships_tasks(
- node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs):
- """
- Creates a relationship task (source and target) for all of a node_instance relationships.
- :param basestring source_operation_name: the relationship operation name.
- :param basestring interface_name: the name of the interface.
- :param source_operation_name:
- :param target_operation_name:
- :param NodeInstance node: the source_node
- :return:
- """
- sub_tasks = []
- for relationship in node.outbound_relationships:
- relationship_operations = relationship_tasks(
- relationship,
- interface_name,
- source_operation_name=source_operation_name,
- target_operation_name=target_operation_name,
- **kwargs)
- sub_tasks.append(relationship_operations)
- return sub_tasks
-
-
-def relationship_tasks(relationship, interface_name, source_operation_name=None,
- target_operation_name=None, **kwargs):
- """
- Creates a relationship task source and target.
- :param Relationship relationship: the relationship instance itself
- :param source_operation_name:
- :param target_operation_name:
-
- :return:
- """
- operations = []
- if source_operation_name:
- try:
- if _is_empty_task(relationship, interface_name, source_operation_name):
- operations.append(StubTask())
- else:
- operations.append(
- OperationTask.for_relationship(relationship=relationship,
- interface_name=interface_name,
- operation_name=source_operation_name,
- **kwargs)
- )
- except exceptions.OperationNotFoundException:
- # We will skip relationships which do not have the operation
- pass
- if target_operation_name:
- try:
- if _is_empty_task(relationship, interface_name, target_operation_name):
- operations.append(StubTask())
- else:
- operations.append(
- OperationTask.for_relationship(relationship=relationship,
- interface_name=interface_name,
- operation_name=target_operation_name,
- **kwargs)
- )
- except exceptions.OperationNotFoundException:
- # We will skip relationships which do not have the operation
- pass
-
- return operations
-
-
-def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False):
- """
- Creates dependencies between tasks if there is a relationship (outbound) between their nodes.
- """
-
- def get_task(node_name):
- for task, node in tasks_and_nodes:
- if node.name == node_name:
- return task
- return None
-
- for task, node in tasks_and_nodes:
- dependencies = []
- for relationship in node.outbound_relationships:
- dependency = get_task(relationship.target_node.name)
- if dependency:
- dependencies.append(dependency)
- if dependencies:
- if reverse:
- for dependency in dependencies:
- graph.add_dependency(dependency, task)
- else:
- graph.add_dependency(task, dependencies)
-
-
-def _is_empty_task(actor, interface_name, operation_name):
- interface = actor.interfaces.get(interface_name)
- if interface:
- operation = interface.operations.get(operation_name)
- if operation:
- return operation.implementation is None
-
- raise exceptions.OperationNotFoundException(
- 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"'
- .format(operation_name, interface_name, type(actor).__name__.lower(), actor.name))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/builtin/workflows.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py
index 60f14ed..b286e98 100644
--- a/aria/orchestrator/workflows/builtin/workflows.py
+++ b/aria/orchestrator/workflows/builtin/workflows.py
@@ -18,10 +18,7 @@ TSOCA normative lifecycle workflows.
"""
from ... import workflow
-from .utils import (
- create_node_task,
- create_relationships_tasks
-)
+from ..api import task
NORMATIVE_STANDARD_INTERFACE = 'Standard' # 'tosca.interfaces.node.lifecycle.Standard'
@@ -72,19 +69,18 @@ __all__ = (
@workflow(suffix_template='{node.name}')
def install_node(graph, node, **kwargs):
# Create
- sequence = [create_node_task(node,
- NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)]
+ sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)]
# Configure
- sequence += create_relationships_tasks(node,
- NORMATIVE_CONFIGURE_INTERFACE,
- NORMATIVE_PRE_CONFIGURE_SOURCE,
- NORMATIVE_PRE_CONFIGURE_TARGET)
- sequence.append(create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE))
- sequence += create_relationships_tasks(node,
- NORMATIVE_CONFIGURE_INTERFACE,
- NORMATIVE_POST_CONFIGURE_SOURCE,
- NORMATIVE_POST_CONFIGURE_TARGET)
+ sequence += task.create_relationships_tasks(node,
+ NORMATIVE_CONFIGURE_INTERFACE,
+ NORMATIVE_PRE_CONFIGURE_SOURCE,
+ NORMATIVE_PRE_CONFIGURE_TARGET)
+ sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE))
+ sequence += task.create_relationships_tasks(node,
+ NORMATIVE_CONFIGURE_INTERFACE,
+ NORMATIVE_POST_CONFIGURE_SOURCE,
+ NORMATIVE_POST_CONFIGURE_TARGET)
# Start
sequence += _create_start_tasks(node)
@@ -97,9 +93,7 @@ def uninstall_node(graph, node, **kwargs):
sequence = _create_stop_tasks(node)
# Delete
- sequence.append(create_node_task(node,
- NORMATIVE_STANDARD_INTERFACE,
- NORMATIVE_DELETE))
+ sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_DELETE))
graph.sequence(*sequence)
@@ -115,16 +109,41 @@ def stop_node(graph, node, **kwargs):
def _create_start_tasks(node):
- sequence = [create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)]
- sequence += create_relationships_tasks(node,
- NORMATIVE_CONFIGURE_INTERFACE,
- NORMATIVE_ADD_SOURCE, NORMATIVE_ADD_TARGET)
+ sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)]
+ sequence += task.create_relationships_tasks(node,
+ NORMATIVE_CONFIGURE_INTERFACE,
+ NORMATIVE_ADD_SOURCE, NORMATIVE_ADD_TARGET)
return sequence
def _create_stop_tasks(node):
- sequence = [create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)]
- sequence += create_relationships_tasks(node,
- NORMATIVE_CONFIGURE_INTERFACE,
- NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET)
+ sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)]
+ sequence += task.create_relationships_tasks(node,
+ NORMATIVE_CONFIGURE_INTERFACE,
+ NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET)
return sequence
+
+
+def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False):
+ """
+ Creates dependencies between tasks if there is a relationship (outbound) between their nodes.
+ """
+
+ def get_task(node_name):
+ for api_task, task_node in tasks_and_nodes:
+ if task_node.name == node_name:
+ return api_task
+ return None
+
+ for api_task, node in tasks_and_nodes:
+ dependencies = []
+ for relationship in node.outbound_relationships:
+ dependency = get_task(relationship.target_node.name)
+ if dependency:
+ dependencies.append(dependency)
+ if dependencies:
+ if reverse:
+ for dependency in dependencies:
+ graph.add_dependency(dependency, api_task)
+ else:
+ graph.add_dependency(api_task, dependencies)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 155d0ee..fd0dd6d 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -44,7 +44,8 @@ class Engine(logger.LoggerMixin):
self._execution_graph = networkx.DiGraph()
self._executor = executor
translation.build_execution_graph(task_graph=tasks_graph,
- execution_graph=self._execution_graph)
+ execution_graph=self._execution_graph,
+ default_executor=self._executor)
def execute(self):
"""
@@ -109,12 +110,11 @@ class Engine(logger.LoggerMixin):
self._workflow_context.model.task.refresh(task.model_task)
yield task
- def _handle_executable_task(self, task):
- if isinstance(task, engine_task.StubTask):
- task.status = models.Task.SUCCESS
- else:
+ @staticmethod
+ def _handle_executable_task(task):
+ if isinstance(task, engine_task.OperationTask):
events.sent_task_signal.send(task)
- self._executor.execute(task)
+ task.execute()
def _handle_ended_tasks(self, task):
if task.status == models.Task.FAILED and not task.ignore_failure:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 88c24bd..f3e4e7e 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -40,7 +40,6 @@ def _task_started(task, *args, **kwargs):
with task._update():
task.started_at = datetime.utcnow()
task.status = task.STARTED
-
_update_node_state_if_necessary(task, is_transitional=True)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index 8adeb7e..0e081c2 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -47,9 +47,13 @@ class BaseTask(object):
Base class for Task objects
"""
- def __init__(self, id, *args, **kwargs):
+ def __init__(self, id, executor, *args, **kwargs):
super(BaseTask, self).__init__(*args, **kwargs)
self._id = id
+ self._executor = executor
+
+ def execute(self):
+ return self._executor.execute(self)
@property
def id(self):
@@ -61,8 +65,11 @@ class BaseTask(object):
class StubTask(BaseTask):
"""
- Base stub task for all tasks that don't actually run anything
+ Base stub task for marker user tasks that only mark the start/end of a workflow
+ or sub-workflow
"""
+ STARTED = models.Task.STARTED
+ SUCCESS = models.Task.SUCCESS
def __init__(self, *args, **kwargs):
super(StubTask, self).__init__(*args, **kwargs)
@@ -70,10 +77,10 @@ class StubTask(BaseTask):
self.due_at = datetime.utcnow()
def has_ended(self):
- return self.status in (models.Task.SUCCESS, models.Task.FAILED)
+ return self.status == self.SUCCESS
def is_waiting(self):
- return self.status in (models.Task.PENDING, models.Task.RETRYING)
+ return not self.has_ended()
class StartWorkflowTask(StubTask):
@@ -108,14 +115,14 @@ class OperationTask(BaseTask):
"""
Operation task
"""
-
def __init__(self, api_task, *args, **kwargs):
- super(OperationTask, self).__init__(id=api_task.id, **kwargs)
+ # If no executor is provided, we infer that this is an empty task which does not need to be
+ # executed.
+ super(OperationTask, self).__init__(id=api_task.id, *args, **kwargs)
self._workflow_context = api_task._workflow_context
self.interface_name = api_task.interface_name
self.operation_name = api_task.operation_name
model_storage = api_task._workflow_context.model
- plugin = api_task.plugin
base_task_model = model_storage.task.model_cls
if isinstance(api_task.actor, models.Node):
@@ -130,15 +137,18 @@ class OperationTask(BaseTask):
task_model = create_task_model(
name=api_task.name,
- implementation=api_task.implementation,
actor=api_task.actor,
- inputs=api_task.inputs,
status=base_task_model.PENDING,
max_attempts=api_task.max_attempts,
retry_interval=api_task.retry_interval,
ignore_failure=api_task.ignore_failure,
- plugin=plugin,
- execution=self._workflow_context.execution
+ execution=self._workflow_context.execution,
+
+ # Only non-stub tasks have these fields
+ plugin=api_task.plugin,
+ implementation=api_task.implementation,
+ inputs=api_task.inputs
+
)
self._workflow_context.model.task.put(task_model)
@@ -153,6 +163,9 @@ class OperationTask(BaseTask):
self._task_id = task_model.id
self._update_fields = None
+ def execute(self):
+ super(OperationTask, self).execute()
+
@contextmanager
def _update(self):
"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
index b6cbdad..0bbce90 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -18,12 +18,14 @@ Translation of user graph's API to the execution graph
"""
from .. import api
+from ..executor import base
from . import task as core_task
def build_execution_graph(
task_graph,
execution_graph,
+ default_executor,
start_cls=core_task.StartWorkflowTask,
end_cls=core_task.EndWorkflowTask,
depends_on=()):
@@ -37,31 +39,33 @@ def build_execution_graph(
:param depends_on: internal use
"""
# Insert start marker
- start_task = start_cls(id=_start_graph_suffix(task_graph.id))
+ start_task = start_cls(id=_start_graph_suffix(task_graph.id), executor=base.StubTaskExecutor())
_add_task_and_dependencies(execution_graph, start_task, depends_on)
for api_task in task_graph.topological_order(reverse=True):
dependencies = task_graph.get_dependencies(api_task)
operation_dependencies = _get_tasks_from_dependencies(
- execution_graph,
- dependencies,
- default=[start_task])
+ execution_graph, dependencies, default=[start_task])
if isinstance(api_task, api.task.OperationTask):
- # Add the task an the dependencies
- operation_task = core_task.OperationTask(api_task)
+ if api_task.implementation:
+ operation_task = core_task.OperationTask(api_task, executor=default_executor)
+ else:
+ operation_task = core_task.OperationTask(api_task,
+ executor=base.EmptyOperationExecutor())
_add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
elif isinstance(api_task, api.task.WorkflowTask):
# Build the graph recursively while adding start and end markers
build_execution_graph(
task_graph=api_task,
execution_graph=execution_graph,
+ default_executor=default_executor,
start_cls=core_task.StartSubWorkflowTask,
end_cls=core_task.EndSubWorkflowTask,
depends_on=operation_dependencies
)
elif isinstance(api_task, api.task.StubTask):
- stub_task = core_task.StubTask(id=api_task.id)
+ stub_task = core_task.StubTask(id=api_task.id, executor=base.StubTaskExecutor())
_add_task_and_dependencies(execution_graph, stub_task, operation_dependencies)
else:
raise RuntimeError('Undefined state')
@@ -71,7 +75,7 @@ def build_execution_graph(
execution_graph,
_get_non_dependency_tasks(task_graph),
default=[start_task])
- end_task = end_cls(id=_end_graph_suffix(task_graph.id))
+ end_task = end_cls(id=_end_graph_suffix(task_graph.id), executor=base.StubTaskExecutor())
_add_task_and_dependencies(execution_graph, end_task, workflow_dependencies)
@@ -85,11 +89,14 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
"""
Returns task list from dependencies.
"""
- return [execution_graph.node[dependency.id
- if isinstance(dependency, (api.task.OperationTask,
- api.task.StubTask))
- else _end_graph_suffix(dependency.id)]['task']
- for dependency in dependencies] or default
+ tasks = []
+ for dependency in dependencies:
+ if isinstance(dependency, (api.task.OperationTask, api.task.StubTask)):
+ dependency_id = dependency.id
+ else:
+ dependency_id = _end_graph_suffix(dependency.id)
+ tasks.append(execution_graph.node[dependency_id]['task'])
+ return tasks or default
def _start_graph_suffix(id):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py
index 3ffe18b..236a55f 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -35,12 +35,21 @@ def _get_task_name(task):
@events.start_task_signal.connect
def _start_task_handler(task, **kwargs):
- task.context.logger.info('{name} {task.interface_name}.{task.operation_name} started...'
- .format(name=_get_task_name(task), task=task))
+ # If the task has not implementation this is an empty task.
+ if task.implementation:
+ suffix = 'started...'
+ logger = task.context.logger.info
+ else:
+ suffix = 'has no implementation'
+ logger = task.context.logger.debug
+ logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format(
+ name=_get_task_name(task), task=task, suffix=suffix))
@events.on_success_task_signal.connect
def _success_task_handler(task, **kwargs):
+ if not task.implementation:
+ return
task.context.logger.info('{name} {task.interface_name}.{task.operation_name} successful'
.format(name=_get_task_name(task), task=task))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 39becef..a225837 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -50,3 +50,14 @@ class BaseExecutor(logger.LoggerMixin):
@staticmethod
def _task_succeeded(task):
events.on_success_task_signal.send(task)
+
+
+class StubTaskExecutor(BaseExecutor):
+ def execute(self, task):
+ task.status = task.SUCCESS
+
+
+class EmptyOperationExecutor(BaseExecutor):
+ def execute(self, task):
+ events.start_task_signal.send(task)
+ events.on_success_task_signal.send(task)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index e1261bb..eb70a41 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -16,7 +16,6 @@
"""
Dry executor
"""
-
from datetime import datetime
from .base import BaseExecutor
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml
----------------------------------------------------------------------
diff --git a/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml b/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml
index ff6ba6c..1e83ef9 100644
--- a/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml
+++ b/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml
@@ -100,3 +100,8 @@ interface_types:
Operation to remove a target node.
_extensions:
relationship_edge: source
+ remove_source:
+ description: >-
+ Operation to remove the source node.
+ _extensions:
+ relationship_edge: target
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/end2end/test_hello_world.py
----------------------------------------------------------------------
diff --git a/tests/end2end/test_hello_world.py b/tests/end2end/test_hello_world.py
index 09e5d06..fc5f631 100644
--- a/tests/end2end/test_hello_world.py
+++ b/tests/end2end/test_hello_world.py
@@ -29,8 +29,7 @@ def test_hello_world(testenv):
finally:
# Even if some assertions failed, attempt to execute uninstall so the
# webserver process doesn't stay up once the test is finished
- # TODO: remove force_service_delete=True
- testenv.uninstall_service(force_service_delete=True)
+ testenv.uninstall_service()
_verify_webserver_down('http://localhost:9090')
testenv.verify_clean_storage()
@@ -57,5 +56,5 @@ def _verify_deployed_service_in_storage(service_name, model_storage):
assert service.name == service_name
assert len(service.executions) == 1
assert len(service.nodes) == 2
- # TODO: validate node states
+ assert all(node.state == node.STARTED for node in service.nodes.values())
assert len(service.executions[0].logs) > 0
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/end2end/testenv.py
----------------------------------------------------------------------
diff --git a/tests/end2end/testenv.py b/tests/end2end/testenv.py
index 3950b20..85714e5 100644
--- a/tests/end2end/testenv.py
+++ b/tests/end2end/testenv.py
@@ -70,7 +70,7 @@ class TestEnvironment(object):
assert len(self.model_storage.log.list()) == 0
def _get_cli(self):
- cli = sh.aria.bake(_out=sys.stdout.write, _err=sys.stderr.write)
+ cli = sh.aria.bake('-vvv', _out=sys.stdout.write, _err=sys.stderr.write)
# the `sh` library supports underscore-dash auto-replacement for commands and option flags
# yet not for subcommands (e.g. `aria service-templates`); The following class fixes this.
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index c399474..971e0db 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -84,10 +84,10 @@ def test_node_operation_task_execution(ctx, thread_executor):
@workflow
def basic_workflow(graph, **_):
graph.add_tasks(
- api.task.OperationTask.for_node(
+ api.task.OperationTask(
+ node,
interface_name=interface_name,
operation_name=operation_name,
- node=node,
inputs=inputs
)
)
@@ -141,8 +141,8 @@ def test_relationship_operation_task_execution(ctx, thread_executor):
@workflow
def basic_workflow(graph, **_):
graph.add_tasks(
- api.task.OperationTask.for_relationship(
- relationship=relationship,
+ api.task.OperationTask(
+ relationship,
interface_name=interface_name,
operation_name=operation_name,
inputs=inputs
@@ -209,9 +209,10 @@ def test_invalid_task_operation_id(ctx, thread_executor):
@workflow
def basic_workflow(graph, **_):
graph.add_tasks(
- api.task.OperationTask.for_node(node=node,
- interface_name=interface_name,
- operation_name=operation_name)
+ api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name)
)
execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
@@ -250,10 +251,11 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
@workflow
def basic_workflow(graph, **_):
- graph.add_tasks(api.task.OperationTask.for_node(node=node,
- interface_name=interface_name,
- operation_name=operation_name,
- inputs=inputs))
+ graph.add_tasks(api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ inputs=inputs))
execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
expected_file = tmpdir.join('workdir', 'plugins', str(ctx.service.id),
@@ -298,10 +300,10 @@ def test_node_operation_logging(ctx, executor):
@workflow
def basic_workflow(graph, **_):
graph.add_tasks(
- api.task.OperationTask.for_node(
+ api.task.OperationTask(
+ node,
interface_name=interface_name,
operation_name=operation_name,
- node=node,
inputs=inputs
)
)
@@ -331,10 +333,10 @@ def test_relationship_operation_logging(ctx, executor):
@workflow
def basic_workflow(graph, **_):
graph.add_tasks(
- api.task.OperationTask.for_relationship(
+ api.task.OperationTask(
+ relationship,
interface_name=interface_name,
operation_name=operation_name,
- relationship=relationship,
inputs=inputs
)
)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index f4acc36..8a5db6f 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -51,7 +51,7 @@ def _mock_workflow(ctx, graph):
plugin=plugin)
)
node.interfaces[interface.name] = interface
- task = api.task.OperationTask.for_node(node=node, interface_name='test', operation_name='op')
+ task = api.task.OperationTask(node, interface_name='test', operation_name='op')
graph.add_tasks(task)
return graph
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/context/test_toolbelt.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py
index 213d964..ecc3ac2 100644
--- a/tests/orchestrator/context/test_toolbelt.py
+++ b/tests/orchestrator/context/test_toolbelt.py
@@ -90,8 +90,8 @@ def test_host_ip(workflow_context, executor):
@workflow
def basic_workflow(graph, **_):
graph.add_tasks(
- api.task.OperationTask.for_node(
- node=dependency_node,
+ api.task.OperationTask(
+ dependency_node,
interface_name=interface_name,
operation_name=operation_name,
inputs=inputs
@@ -121,8 +121,8 @@ def test_relationship_tool_belt(workflow_context, executor):
@workflow
def basic_workflow(graph, **_):
graph.add_tasks(
- api.task.OperationTask.for_relationship(
- relationship=relationship,
+ api.task.OperationTask(
+ relationship,
interface_name=interface_name,
operation_name=operation_name,
inputs=inputs
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 58506ba..09d0499 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -489,8 +489,8 @@ if __name__ == '__main__':
inputs=inputs)
)
node.interfaces[interface.name] = interface
- graph.add_tasks(api.task.OperationTask.for_node(
- node=node,
+ graph.add_tasks(api.task.OperationTask(
+ node,
interface_name='test',
operation_name='op',
inputs=inputs))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index a75d59a..a9dc5e8 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -245,8 +245,8 @@ class TestWithActualSSHServer(object):
for test_operation in test_operations:
op_inputs = inputs.copy()
op_inputs['test_operation'] = test_operation
- ops.append(api.task.OperationTask.for_node(
- node=node,
+ ops.append(api.task.OperationTask(
+ node,
interface_name='test',
operation_name='op',
inputs=op_inputs))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/workflows/api/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/api/test_task.py b/tests/orchestrator/workflows/api/test_task.py
index ab62361..642c785 100644
--- a/tests/orchestrator/workflows/api/test_task.py
+++ b/tests/orchestrator/workflows/api/test_task.py
@@ -62,8 +62,8 @@ class TestOperationTask(object):
ignore_failure = True
with context.workflow.current.push(ctx):
- api_task = api.task.OperationTask.for_node(
- node=node,
+ api_task = api.task.OperationTask(
+ node,
interface_name=interface_name,
operation_name=operation_name,
inputs=inputs,
@@ -109,8 +109,8 @@ class TestOperationTask(object):
retry_interval = 10
with context.workflow.current.push(ctx):
- api_task = api.task.OperationTask.for_relationship(
- relationship=relationship,
+ api_task = api.task.OperationTask(
+ relationship,
interface_name=interface_name,
operation_name=operation_name,
inputs=inputs,
@@ -154,8 +154,8 @@ class TestOperationTask(object):
retry_interval = 10
with context.workflow.current.push(ctx):
- api_task = api.task.OperationTask.for_relationship(
- relationship=relationship,
+ api_task = api.task.OperationTask(
+ relationship,
interface_name=interface_name,
operation_name=operation_name,
inputs=inputs,
@@ -193,8 +193,8 @@ class TestOperationTask(object):
dependency_node.interfaces[interface_name] = interface
with context.workflow.current.push(ctx):
- task = api.task.OperationTask.for_node(
- node=dependency_node,
+ task = api.task.OperationTask(
+ dependency_node,
interface_name=interface_name,
operation_name=operation_name)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index af9af17..8c0705b 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -55,34 +55,32 @@ class BaseTest(object):
tasks_graph=graph)
@staticmethod
- def _op(func, ctx,
+ def _op(ctx,
+ func,
inputs=None,
max_attempts=None,
retry_interval=None,
ignore_failure=None):
node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
-
+ interface_name = 'aria.interfaces.lifecycle'
operation_kwargs = dict(implementation='{name}.{func.__name__}'.format(
name=__name__, func=func))
if inputs:
# the operation has to declare the inputs before those may be passed
operation_kwargs['inputs'] = inputs
-
- interface = mock.models.create_interface(
- node.service,
- 'aria.interfaces.lifecycle',
- 'create',
- operation_kwargs=operation_kwargs
- )
+ operation_name = 'create'
+ interface = mock.models.create_interface(node.service, interface_name, operation_name,
+ operation_kwargs=operation_kwargs)
node.interfaces[interface.name] = interface
- return api.task.OperationTask.for_node(
- node=node,
+
+ return api.task.OperationTask(
+ node,
interface_name='aria.interfaces.lifecycle',
- operation_name='create',
- inputs=inputs,
+ operation_name=operation_name,
+ inputs=inputs or {},
max_attempts=max_attempts,
retry_interval=retry_interval,
- ignore_failure=ignore_failure
+ ignore_failure=ignore_failure,
)
@pytest.fixture(autouse=True)
@@ -162,7 +160,7 @@ class TestEngine(BaseTest):
def test_single_task_successful_execution(self, workflow_context, executor):
@workflow
def mock_workflow(ctx, graph):
- graph.add_tasks(self._op(mock_success_task, ctx))
+ graph.add_tasks(self._op(ctx, func=mock_success_task))
self._execute(
workflow_func=mock_workflow,
workflow_context=workflow_context,
@@ -174,7 +172,7 @@ class TestEngine(BaseTest):
def test_single_task_failed_execution(self, workflow_context, executor):
@workflow
def mock_workflow(ctx, graph):
- graph.add_tasks(self._op(mock_failed_task, ctx))
+ graph.add_tasks(self._op(ctx, func=mock_failed_task))
with pytest.raises(exceptions.ExecutorException):
self._execute(
workflow_func=mock_workflow,
@@ -191,8 +189,8 @@ class TestEngine(BaseTest):
def test_two_tasks_execution_order(self, workflow_context, executor):
@workflow
def mock_workflow(ctx, graph):
- op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1})
- op2 = self._op(mock_ordered_task, ctx, inputs={'counter': 2})
+ op1 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 1})
+ op2 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 2})
graph.sequence(op1, op2)
self._execute(
workflow_func=mock_workflow,
@@ -206,9 +204,9 @@ class TestEngine(BaseTest):
def test_stub_and_subworkflow_execution(self, workflow_context, executor):
@workflow
def sub_workflow(ctx, graph):
- op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1})
+ op1 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 1})
op2 = api.task.StubTask()
- op3 = self._op(mock_ordered_task, ctx, inputs={'counter': 2})
+ op3 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 2})
graph.sequence(op1, op2, op3)
@workflow
@@ -231,7 +229,7 @@ class TestCancel(BaseTest):
@workflow
def mock_workflow(ctx, graph):
operations = (
- self._op(mock_sleep_task, ctx, inputs=dict(seconds=0.1))
+ self._op(ctx, func=mock_sleep_task, inputs=dict(seconds=0.1))
for _ in range(number_of_tasks)
)
return graph.sequence(*operations)
@@ -271,7 +269,7 @@ class TestRetries(BaseTest):
def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor):
@workflow
def mock_workflow(ctx, graph):
- op = self._op(mock_conditional_failure_task, ctx,
+ op = self._op(ctx, func=mock_conditional_failure_task,
inputs={'failure_count': 1},
max_attempts=2)
graph.add_tasks(op)
@@ -287,7 +285,7 @@ class TestRetries(BaseTest):
def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor):
@workflow
def mock_workflow(ctx, graph):
- op = self._op(mock_conditional_failure_task, ctx,
+ op = self._op(ctx, func=mock_conditional_failure_task,
inputs={'failure_count': 2},
max_attempts=2)
graph.add_tasks(op)
@@ -304,7 +302,7 @@ class TestRetries(BaseTest):
def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor):
@workflow
def mock_workflow(ctx, graph):
- op = self._op(mock_conditional_failure_task, ctx,
+ op = self._op(ctx, func=mock_conditional_failure_task,
inputs={'failure_count': 1},
max_attempts=3)
graph.add_tasks(op)
@@ -320,7 +318,7 @@ class TestRetries(BaseTest):
def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor):
@workflow
def mock_workflow(ctx, graph):
- op = self._op(mock_conditional_failure_task, ctx,
+ op = self._op(ctx, func=mock_conditional_failure_task,
inputs={'failure_count': 2},
max_attempts=3)
graph.add_tasks(op)
@@ -336,7 +334,7 @@ class TestRetries(BaseTest):
def test_infinite_retries(self, workflow_context, executor):
@workflow
def mock_workflow(ctx, graph):
- op = self._op(mock_conditional_failure_task, ctx,
+ op = self._op(ctx, func=mock_conditional_failure_task,
inputs={'failure_count': 1},
max_attempts=-1)
graph.add_tasks(op)
@@ -362,7 +360,7 @@ class TestRetries(BaseTest):
def _test_retry_interval(self, retry_interval, workflow_context, executor):
@workflow
def mock_workflow(ctx, graph):
- op = self._op(mock_conditional_failure_task, ctx,
+ op = self._op(ctx, func=mock_conditional_failure_task,
inputs={'failure_count': 1},
max_attempts=2,
retry_interval=retry_interval)
@@ -382,7 +380,7 @@ class TestRetries(BaseTest):
def test_ignore_failure(self, workflow_context, executor):
@workflow
def mock_workflow(ctx, graph):
- op = self._op(mock_conditional_failure_task, ctx,
+ op = self._op(ctx, func=mock_conditional_failure_task,
ignore_failure=True,
inputs={'failure_count': 100},
max_attempts=100)
@@ -406,7 +404,7 @@ class TestTaskRetryAndAbort(BaseTest):
@workflow
def mock_workflow(ctx, graph):
- op = self._op(mock_task_retry, ctx,
+ op = self._op(ctx, func=mock_task_retry,
inputs={'message': self.message},
retry_interval=default_retry_interval,
max_attempts=2)
@@ -430,7 +428,7 @@ class TestTaskRetryAndAbort(BaseTest):
@workflow
def mock_workflow(ctx, graph):
- op = self._op(mock_task_retry, ctx,
+ op = self._op(ctx, func=mock_task_retry,
inputs={'message': self.message,
'retry_interval': custom_retry_interval},
retry_interval=default_retry_interval,
@@ -453,7 +451,7 @@ class TestTaskRetryAndAbort(BaseTest):
def test_task_abort(self, workflow_context, executor):
@workflow
def mock_workflow(ctx, graph):
- op = self._op(mock_task_abort, ctx,
+ op = self._op(ctx, func=mock_task_abort,
inputs={'message': self.message},
retry_interval=100,
max_attempts=100)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/workflows/core/test_events.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py
index b9bff77..184071d 100644
--- a/tests/orchestrator/workflows/core/test_events.py
+++ b/tests/orchestrator/workflows/core/test_events.py
@@ -136,8 +136,8 @@ def _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node,
@workflow
def single_operation_workflow(graph, node, interface_name, op_name, **_):
- graph.add_tasks(api.task.OperationTask.for_node(
- node=node,
+ graph.add_tasks(api.task.OperationTask(
+ node,
interface_name=interface_name,
operation_name=op_name))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py
index 0765350..748ee20 100644
--- a/tests/orchestrator/workflows/core/test_task.py
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -24,6 +24,7 @@ from aria.orchestrator.workflows import (
api,
core,
exceptions,
+ executor
)
from tests import mock, storage
@@ -66,20 +67,22 @@ class TestOperationTask(object):
def _create_node_operation_task(self, ctx, node):
with workflow_context.current.push(ctx):
- api_task = api.task.OperationTask.for_node(
- node=node,
+ api_task = api.task.OperationTask(
+ node,
interface_name=NODE_INTERFACE_NAME,
operation_name=NODE_OPERATION_NAME)
- core_task = core.task.OperationTask(api_task=api_task)
+ core_task = core.task.OperationTask(api_task=api_task,
+ executor=executor.base.EmptyOperationExecutor())
return api_task, core_task
def _create_relationship_operation_task(self, ctx, relationship):
with workflow_context.current.push(ctx):
- api_task = api.task.OperationTask.for_relationship(
- relationship=relationship,
+ api_task = api.task.OperationTask(
+ relationship,
interface_name=RELATIONSHIP_INTERFACE_NAME,
operation_name=RELATIONSHIP_OPERATION_NAME)
- core_task = core.task.OperationTask(api_task=api_task)
+ core_task = core.task.OperationTask(api_task=api_task,
+ executor=executor.base.EmptyOperationExecutor())
return api_task, core_task
def test_node_operation_task_creation(self, ctx):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 514bce9..2a96d01 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -17,6 +17,7 @@ from networkx import topological_sort, DiGraph
from aria.orchestrator import context
from aria.orchestrator.workflows import api, core
+from aria.orchestrator.workflows.executor import base
from tests import mock
from tests import storage
@@ -41,17 +42,20 @@ def test_task_graph_into_execution_graph(tmpdir):
with context.workflow.current.push(task_context):
test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph')
- simple_before_task = api.task.OperationTask.for_node(node=node,
- interface_name=interface_name,
- operation_name=operation_name)
- simple_after_task = api.task.OperationTask.for_node(node=node,
- interface_name=interface_name,
- operation_name=operation_name)
+ simple_before_task = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name)
+ simple_after_task = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name)
inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph')
- inner_task = api.task.OperationTask.for_node(node=node,
- interface_name=interface_name,
- operation_name=operation_name)
+ inner_task = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name)
inner_task_graph.add_tasks(inner_task)
test_task_graph.add_tasks(simple_before_task)
@@ -63,7 +67,8 @@ def test_task_graph_into_execution_graph(tmpdir):
# Direct check
execution_graph = DiGraph()
core.translation.build_execution_graph(task_graph=test_task_graph,
- execution_graph=execution_graph)
+ execution_graph=execution_graph,
+ default_executor=base.StubTaskExecutor())
execution_tasks = topological_sort(execution_graph)
assert len(execution_tasks) == 7
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
index 88e7ae0..1dbfae1 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
@@ -99,14 +99,16 @@ def _test(context, executor, lock_files, func, expected_failure):
@workflow
def mock_workflow(graph, **_):
graph.add_tasks(
- api.task.OperationTask.for_node(node=node,
- interface_name=interface_name,
- operation_name=operation_name,
- inputs=inputs),
- api.task.OperationTask.for_node(node=node,
- interface_name=interface_name,
- operation_name=operation_name,
- inputs=inputs)
+ api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ inputs=inputs),
+ api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ inputs=inputs)
)
signal = events.on_failure_task_signal
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 7ae337d..878ac24 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -46,10 +46,11 @@ def test_decorate_extension(context, executor):
inputs=inputs)
)
node.interfaces[interface.name] = interface
- task = api.task.OperationTask.for_node(node=node,
- interface_name=interface_name,
- operation_name=operation_name,
- inputs=inputs)
+ task = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ inputs=inputs)
graph.add_tasks(task)
return graph
graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 3a8c54b..4fbe9c1 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -99,10 +99,11 @@ def _run_workflow(context, executor, op_func, inputs=None):
inputs=wf_inputs)
)
node.interfaces[interface.name] = interface
- task = api.task.OperationTask.for_node(node=node,
- interface_name=interface_name,
- operation_name=operation_name,
- inputs=wf_inputs)
+ task = api.task.OperationTask(
+ node,
+ interface_name=interface_name,
+ operation_name=operation_name,
+ inputs=wf_inputs)
graph.add_tasks(task)
return graph
graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py
----------------------------------------------------------------------
diff --git a/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py b/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py
index abe1ee2..06e4f9e 100644
--- a/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py
+++ b/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py
@@ -1,5 +1,5 @@
from aria import workflow
-from aria.orchestrator.workflows.builtin import utils
+from aria.orchestrator.workflows.api import task
from aria.orchestrator.workflows.exceptions import TaskException
@@ -16,9 +16,9 @@ def maintenance(ctx, graph, enabled):
for node in ctx.model.node.iter():
try:
- graph.add_tasks(utils.create_node_task(node=node,
- interface_name=INTERFACE_NAME,
- operation_name=ENABLE_OPERATION_NAME if enabled
- else DISABLE_OPERATION_NAME))
+ graph.add_tasks(task.OperationTask(node,
+ interface_name=INTERFACE_NAME,
+ operation_name=ENABLE_OPERATION_NAME if enabled
+ else DISABLE_OPERATION_NAME))
except TaskException:
pass