You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/03/27 20:38:44 UTC

[1/3] incubator-ariatosca git commit: ARIA-126 Add node states [Forced Update!]

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-120-Builtin-workflows-relationship-operations-execution-order d07cc3a5e -> 95c692c8c (forced update)


ARIA-126 Add node states

1. The states are described in section 3.3.1 of the TOSCA spec.

2. The state is changed if a standard lifecycle operation runs of the
node, as described in sections 5.7.4.1, 5.7.4.4.1, and 5.7.4.4.2 of the TOSCA spec.

3. We did not address the 'error' state yet.
This state is defined as part of the possible node states in the model, but currently no execution path leads to setting a node's state to 'error'.

4. No validation of state transiontions.
For example, we do not validate if a node goes from 'created'
to 'started' without going through the 'configured' state in between.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/0e107933
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/0e107933
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/0e107933

Branch: refs/heads/ARIA-120-Builtin-workflows-relationship-operations-execution-order
Commit: 0e107933e68ec8a6fbc690a02d59e2fca900a540
Parents: b3cf69a
Author: Avia Efrat <av...@gigaspaces.com>
Authored: Wed Mar 22 17:19:58 2017 +0200
Committer: Avia Efrat <av...@gigaspaces.com>
Committed: Mon Mar 27 16:38:52 2017 +0300

----------------------------------------------------------------------
 aria/modeling/service_instance.py               |  52 ++++++-
 aria/modeling/service_template.py               |   2 +-
 .../workflows/core/events_handler.py            |  18 ++-
 tests/mock/models.py                            |   4 +-
 tests/modeling/test_mixins.py                   |   2 +-
 tests/modeling/test_models.py                   |  12 +-
 .../orchestrator/workflows/core/test_events.py  | 147 +++++++++++++++++++
 7 files changed, 222 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0e107933/aria/modeling/service_instance.py
----------------------------------------------------------------------
diff --git a/aria/modeling/service_instance.py b/aria/modeling/service_instance.py
index f120734..1e18db0 100644
--- a/aria/modeling/service_instance.py
+++ b/aria/modeling/service_instance.py
@@ -18,7 +18,8 @@
 from sqlalchemy import (
     Column,
     Text,
-    Integer
+    Integer,
+    Enum,
 )
 from sqlalchemy import DateTime
 from sqlalchemy.ext.associationproxy import association_proxy
@@ -322,8 +323,8 @@ class NodeBase(InstanceModelMixin): # pylint: disable=too-many-public-methods
     :vartype runtime_properties: {}
     :ivar scaling_groups: ??
     :vartype scaling_groups: []
-    :ivar state: ??
-    :vartype state: basestring
+    :ivar state: The state of the node, according to to the TOSCA-defined node states
+    :vartype state: string
     :ivar version: Used by `aria.storage.instrumentation`
     :vartype version: int
 
@@ -347,6 +348,49 @@ class NodeBase(InstanceModelMixin): # pylint: disable=too-many-public-methods
                           'node_template_fk',
                           'service_name']
 
+    INITIAL = 'initial'
+    CREATING = 'creating'
+    CREATED = 'created'
+    CONFIGURING = 'configuring'
+    CONFIGURED = 'configured'
+    STARTING = 'starting'
+    STARTED = 'started'
+    STOPPING = 'stopping'
+    DELETING = 'deleting'
+    # 'deleted' isn't actually part of the tosca spec, since according the description of the
+    # 'deleting' state: "Node is transitioning from its current state to one where it is deleted and
+    #  its state is no longer tracked by the instance model."
+    # However, we prefer to be able to retrieve information about deleted nodes, so we chose to add
+    # this 'deleted' state to enable us to do so.
+    DELETED = 'deleted'
+    ERROR = 'error'
+
+    STATES = [INITIAL, CREATING, CREATED, CONFIGURING, CONFIGURED, STARTING, STARTED, STOPPING,
+              DELETING, DELETED, ERROR]
+
+    _op_to_state = {'create': {'transitional': CREATING, 'finished': CREATED},
+                    'configure': {'transitional': CONFIGURING, 'finished': CONFIGURED},
+                    'start': {'transitional': STARTING, 'finished': STARTED},
+                    'stop': {'transitional': STOPPING, 'finished': CONFIGURED},
+                    'delete': {'transitional': DELETING, 'finished': DELETED}}
+
+    @classmethod
+    def determine_state(cls, op_name, is_transitional):
+        """ :returns the state the node should be in as a result of running the
+            operation on this node.
+
+            e.g. if we are running tosca.interfaces.node.lifecycle.Standard.create, then
+            the resulting state should either 'creating' (if the task just started) or 'created'
+            (if the task ended).
+
+            If the operation is not a standard tosca lifecycle operation, then we return None"""
+
+        state_type = 'transitional' if is_transitional else 'finished'
+        try:
+            return cls._op_to_state[op_name][state_type]
+        except KeyError:
+            return None
+
     @declared_attr
     def node_template(cls):
         return relationship.many_to_one(cls, 'node_template')
@@ -391,7 +435,7 @@ class NodeBase(InstanceModelMixin): # pylint: disable=too-many-public-methods
 
     runtime_properties = Column(modeling_types.Dict)
     scaling_groups = Column(modeling_types.List)
-    state = Column(Text, nullable=False)
+    state = Column(Enum(*STATES, name='node_state'), nullable=False, default=INITIAL)
     version = Column(Integer, default=1)
 
     __mapper_args__ = {'version_id_col': version} # Enable SQLAlchemy automatic version counting

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0e107933/aria/modeling/service_template.py
----------------------------------------------------------------------
diff --git a/aria/modeling/service_template.py b/aria/modeling/service_template.py
index 7246ff1..8b619bf 100644
--- a/aria/modeling/service_template.py
+++ b/aria/modeling/service_template.py
@@ -497,7 +497,7 @@ class NodeTemplateBase(TemplateModelMixin):
         node = models.Node(name=name,
                            type=self.type,
                            description=deepcopy_with_locators(self.description),
-                           state='',
+                           state=models.Node.INITIAL,
                            node_template=self)
         utils.instantiate_dict(node, node.properties, self.properties)
         utils.instantiate_dict(node, node.interfaces, self.interface_templates)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0e107933/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index a420d2b..4ac4b64 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -20,7 +20,7 @@ Path: aria.events.storage_event_handler
 Implementation of storage handlers for workflow and operation events.
 """
 
-
+import re
 from datetime import (
     datetime,
     timedelta,
@@ -29,6 +29,7 @@ from datetime import (
 from ... import events
 from ... import exceptions
 
+
 @events.sent_task_signal.connect
 def _task_sent(task, *args, **kwargs):
     with task._update():
@@ -41,6 +42,8 @@ def _task_started(task, *args, **kwargs):
         task.started_at = datetime.utcnow()
         task.status = task.STARTED
 
+        _update_node_state_if_necessary(task, is_transitional=True)
+
 
 @events.on_failure_task_signal.connect
 def _task_failed(task, exception, *args, **kwargs):
@@ -73,6 +76,8 @@ def _task_succeeded(task, *args, **kwargs):
         task.ended_at = datetime.utcnow()
         task.status = task.SUCCESS
 
+        _update_node_state_if_necessary(task)
+
 
 @events.start_workflow_signal.connect
 def _workflow_started(workflow_context, *args, **kwargs):
@@ -118,3 +123,14 @@ def _workflow_cancelling(workflow_context, *args, **kwargs):
         return _workflow_cancelled(workflow_context=workflow_context)
     execution.status = execution.CANCELLING
     workflow_context.execution = execution
+
+
+def _update_node_state_if_necessary(task, is_transitional=False):
+    match = re.search(r'^(?:tosca.interfaces.node.lifecycle.Standard|Standard):(\S+)@node',
+                      task.name)
+    if match:
+        node = task.runs_on
+        state = node.determine_state(op_name=match.group(1), is_transitional=is_transitional)
+        if state:
+            node.state = state
+            task.context.model.node.update(node)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0e107933/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index a60b35e..9ae2815 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -121,7 +121,7 @@ def create_dependency_node(dependency_node_template, service):
         runtime_properties={'ip': '1.1.1.1'},
         version=None,
         node_template=dependency_node_template,
-        state='',
+        state=models.Node.INITIAL,
         scaling_groups=[],
         service=service
     )
@@ -136,7 +136,7 @@ def create_dependent_node(dependent_node_template, service):
         runtime_properties={},
         version=None,
         node_template=dependent_node_template,
-        state='',
+        state=models.Node.INITIAL,
         scaling_groups=[],
         service=service
     )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0e107933/tests/modeling/test_mixins.py
----------------------------------------------------------------------
diff --git a/tests/modeling/test_mixins.py b/tests/modeling/test_mixins.py
index 7795b57..651f53f 100644
--- a/tests/modeling/test_mixins.py
+++ b/tests/modeling/test_mixins.py
@@ -127,7 +127,7 @@ def test_relationship_model_ordering(context):
         service=service,
         version=None,
         node_template=new_node_template,
-        state='',
+        state=modeling.models.Node.INITIAL,
         scaling_groups=[]
     )
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0e107933/tests/modeling/test_models.py
----------------------------------------------------------------------
diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py
index c3b98c1..84200d5 100644
--- a/tests/modeling/test_models.py
+++ b/tests/modeling/test_models.py
@@ -554,11 +554,11 @@ class TestNode(object):
             (False, 'name', {}, [], m_cls, 1),
             (False, m_cls, {}, [], 'state', m_cls),
 
-            (True, 'name', {}, [], 'state', 1),
-            (True, None, {}, [], 'state', 1),
-            (True, 'name', None, [], 'state', 1),
-            (True, 'name', {}, None, 'state', 1),
-            (True, 'name', {}, [], 'state', None),
+            (True, 'name', {}, [], 'initial', 1),
+            (True, None, {}, [], 'initial', 1),
+            (True, 'name', None, [], 'initial', 1),
+            (True, 'name', {}, None, 'initial', 1),
+            (True, 'name', {}, [], 'initial', None),
         ]
     )
     def test_node_model_creation(self, node_template_storage, is_valid, name, runtime_properties,
@@ -645,7 +645,7 @@ class TestNodeIP(object):
             node_template=node,
             type=storage.type.list()[0],
             runtime_properties={},
-            state='',
+            state='initial',
             service=storage.service.list()[0]
         )
         if ip:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0e107933/tests/orchestrator/workflows/core/test_events.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py
new file mode 100644
index 0000000..b9bff77
--- /dev/null
+++ b/tests/orchestrator/workflows/core/test_events.py
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from tests import mock, storage
+from aria.modeling.service_instance import NodeBase
+from aria.orchestrator.decorators import operation, workflow
+from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.executor.thread import ThreadExecutor
+from aria.orchestrator.workflows import api
+
+global_test_dict = {}  # used to capture transitional node state changes
+
+
+@pytest.fixture
+def ctx(tmpdir):
+    context = mock.context.simple(str(tmpdir))
+    yield context
+    storage.release_sqlite_storage(context.model)
+
+# TODO another possible approach of writing these tests:
+# Don't create a ctx for every test.
+# Problem is, that if for every test we create a workflow that contains just one standard
+# lifecycle operation, then by the time we try to run the second test, the workflow failes since
+# the execution tries to go from 'terminated' to 'pending'.
+# And if we write a workflow that contains all the lifecycle operations, then first we need to
+# change the api of `mock.models.create_interface`, which a lot of other tests use, and second how
+# do we check all the state transition during the workflow execution in a convenient way.
+
+TYPE_URI_NAME = 'tosca.interfaces.node.lifecycle.Standard'
+SHORTHAND_NAME = 'Standard'
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_create(ctx):
+    node = run_operation_on_node(ctx, interface_name=TYPE_URI_NAME, op_name='create')
+    _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'create')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_configure(ctx):
+    node = run_operation_on_node(ctx, interface_name=TYPE_URI_NAME, op_name='configure')
+    _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'configure')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_start(ctx):
+    node = run_operation_on_node(ctx, interface_name=TYPE_URI_NAME, op_name='start')
+    _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'start')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_stop(ctx):
+    node = run_operation_on_node(ctx, interface_name=TYPE_URI_NAME, op_name='stop')
+    _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'stop')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_delete(ctx):
+    node = run_operation_on_node(ctx, interface_name=TYPE_URI_NAME, op_name='delete')
+    _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'delete')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_create_shorthand_name(ctx):
+    node = run_operation_on_node(ctx, interface_name=SHORTHAND_NAME, op_name='create')
+    _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'create')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_configure_shorthand_name(ctx):
+    node = run_operation_on_node(ctx, interface_name=SHORTHAND_NAME, op_name='configure')
+    _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'configure')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_start_shorthand_name(ctx):
+    node = run_operation_on_node(ctx, interface_name=SHORTHAND_NAME, op_name='start')
+    _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'start')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_stop_shorthand_name(ctx):
+    node = run_operation_on_node(ctx, interface_name=SHORTHAND_NAME, op_name='stop')
+    _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'stop')
+
+
+def test_node_state_changes_as_a_result_of_standard_lifecycle_delete_shorthand_name(ctx):
+    node = run_operation_on_node(ctx, interface_name=SHORTHAND_NAME, op_name='delete')
+    _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, 'delete')
+
+
+def test_node_state_doesnt_change_as_a_result_of_an_operation_that_is_not_standard_lifecycle1(ctx):
+    node = run_operation_on_node(ctx, interface_name='interface_name', op_name='op_name')
+    assert node.state == node.INITIAL
+
+
+def test_node_state_doesnt_change_as_a_result_of_an_operation_that_is_not_standard_lifecycle2(ctx):
+    node = run_operation_on_node(ctx, interface_name='interface_name', op_name='create')
+    assert node.state == node.INITIAL
+
+
+def run_operation_on_node(ctx, op_name, interface_name):
+    node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+    interface = mock.models.create_interface(
+        service=node.service,
+        interface_name=interface_name,
+        operation_name=op_name,
+        operation_kwargs=dict(implementation='{name}.{func.__name__}'.format(name=__name__,
+                                                                             func=func)))
+    node.interfaces[interface.name] = interface
+
+    eng = engine.Engine(executor=ThreadExecutor(),
+                        workflow_context=ctx,
+                        tasks_graph=single_operation_workflow(ctx=ctx,  # pylint: disable=no-value-for-parameter
+                                                              node=node,
+                                                              interface_name=interface_name,
+                                                              op_name=op_name))
+    eng.execute()
+    return node
+
+
+def run_standard_lifecycle_operation_on_node(ctx, op_name):
+    return run_operation_on_node(ctx, interface_name='aria.interfaces.lifecycle.Standard',
+                                 op_name=op_name)
+
+
+def _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, op_name):
+    assert global_test_dict['transitional_state'] == NodeBase._op_to_state[op_name]['transitional']
+    assert node.state == NodeBase._op_to_state[op_name]['finished']
+
+
+@workflow
+def single_operation_workflow(graph, node, interface_name, op_name, **_):
+    graph.add_tasks(api.task.OperationTask.for_node(
+        node=node,
+        interface_name=interface_name,
+        operation_name=op_name))
+
+
+@operation
+def func(ctx):
+    global_test_dict['transitional_state'] = ctx.node.state


[2/3] incubator-ariatosca git commit: ARIA-120-Builtin-workflows-relationship-operations-execution-order

Posted by mx...@apache.org.
ARIA-120-Builtin-workflows-relationship-operations-execution-order


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/85427299
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/85427299
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/85427299

Branch: refs/heads/ARIA-120-Builtin-workflows-relationship-operations-execution-order
Commit: 85427299b504fa6299516204ecc1510c50143241
Parents: 0e10793
Author: max-orlov <ma...@gigaspaces.com>
Authored: Mon Mar 13 16:35:10 2017 +0200
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Mon Mar 27 22:50:58 2017 +0300

----------------------------------------------------------------------
 aria/modeling/relationship.py                   |   2 +-
 aria/orchestrator/workflows/api/task.py         |   3 +
 aria/orchestrator/workflows/builtin/utils.py    |  75 ++++++++---
 .../orchestrator/workflows/builtin/workflows.py | 100 +++++----------
 tests/mock/context.py                           |   7 +-
 tests/mock/models.py                            | 123 ++++++++++++++-----
 tests/mock/topology.py                          |  41 +++++--
 tests/modeling/test_models.py                   |  30 +++--
 .../orchestrator/workflows/builtin/__init__.py  |  69 +++++++++--
 .../orchestrator/workflows/builtin/test_heal.py |   7 +-
 .../workflows/builtin/test_install.py           |  15 ++-
 .../workflows/builtin/test_uninstall.py         |  15 ++-
 12 files changed, 315 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/85427299/aria/modeling/relationship.py
----------------------------------------------------------------------
diff --git a/aria/modeling/relationship.py b/aria/modeling/relationship.py
index ef2bcdd..70691b3 100644
--- a/aria/modeling/relationship.py
+++ b/aria/modeling/relationship.py
@@ -308,7 +308,7 @@ def many_to_many(model_class,
     if prefix is not None:
         secondary_table = '{0}_{1}'.format(prefix, secondary_table)
         if other_property is None:
-            other_property = '{0}_{1}'.format(prefix, this_table)
+            other_property = '{0}_{1}'.format(prefix, formatting.pluralize(this_table))
 
     backref_kwargs = backref_kwargs or {}
     backref_kwargs.setdefault('uselist', True)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/85427299/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index 009b81c..25d631d 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -123,6 +123,9 @@ class OperationTask(BaseTask):
                                                      interface=interface_name,
                                                      operation=operation_name)
 
+    def __repr__(self):
+        return self.name
+
     @classmethod
     def for_node(cls,
                  node,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/85427299/aria/orchestrator/workflows/builtin/utils.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py
index 8efa889..79d02ab 100644
--- a/aria/orchestrator/workflows/builtin/utils.py
+++ b/aria/orchestrator/workflows/builtin/utils.py
@@ -12,12 +12,13 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from itertools import groupby
 
 from ..api.task import OperationTask
 from .. import exceptions
 
 
-def create_node_task(interface_name, operation_name, node):
+def create_node_task(node, interface_name, operation_name):
     """
     Returns a new operation task if the operation exists in the node, otherwise returns None.
     """
@@ -31,24 +32,59 @@ def create_node_task(interface_name, operation_name, node):
         return None
 
 
-def create_relationship_tasks(interface_name, operation_name, runs_on, node):
+def create_relationships_tasks(
+        node, interface_name, source_operation_name=None, target_operation_name=None):
     """
-    Returns a list of operation tasks for each outbound relationship of the node if the operation
-    exists there.
+    Creates a relationship task (source and target) for all of a node_instance relationships.
+    :param basestring source_operation_name: the relationship operation name.
+    :param source_operation_name:
+    :param target_operation_name:
+    :param NodeInstance node: the source_node
+    :return:
     """
+    relationships_groups = groupby(node.outbound_relationships,
+                                   key=lambda relationship: relationship.target_node.id)
 
-    sequence = []
-    for relationship in node.outbound_relationships:
-        try:
-            sequence.append(
-                OperationTask.for_relationship(relationship=relationship,
-                                               interface_name=interface_name,
-                                               operation_name=operation_name,
-                                               runs_on=runs_on))
-        except exceptions.OperationNotFoundException:
-            # We will skip relationships which do not have the operation
-            pass
-    return sequence
+    sub_tasks = []
+    for _, (_, relationship_group) in enumerate(relationships_groups):
+        for relationship in relationship_group:
+            relationship_operations = relationship_tasks(
+                relationship,
+                interface_name,
+                source_operation_name=source_operation_name,
+                target_operation_name=target_operation_name)
+            sub_tasks.append(relationship_operations)
+
+    return sub_tasks
+
+
+def relationship_tasks(
+        relationship, interface_name, source_operation_name=None, target_operation_name=None):
+    """
+    Creates a relationship task source and target.
+    :param Relationship relationship: the relationship instance itself
+    :param source_operation_name:
+    :param target_operation_name:
+
+    :return:
+    """
+    operations = []
+    if source_operation_name and _has_operation(relationship.interfaces, source_operation_name):
+        operations.append(
+            OperationTask.for_relationship(relationship=relationship,
+                                           interface_name=interface_name,
+                                           operation_name=source_operation_name,
+                                           runs_on='source')
+        )
+    if target_operation_name and _has_operation(relationship.interfaces, target_operation_name):
+        operations.append(
+            OperationTask.for_relationship(relationship=relationship,
+                                           interface_name=interface_name,
+                                           operation_name=target_operation_name,
+                                           runs_on='target')
+        )
+
+    return operations
 
 
 def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False):
@@ -74,3 +110,10 @@ def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False):
                     graph.add_dependency(dependency, task)
             else:
                 graph.add_dependency(task, dependencies)
+
+
+def _has_operation(interfaces, operation_name):
+    for interface in interfaces.values():
+        if operation_name in interface.operations:
+            return True
+    return False

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/85427299/aria/orchestrator/workflows/builtin/workflows.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py
index 6065343..67badcf 100644
--- a/aria/orchestrator/workflows/builtin/workflows.py
+++ b/aria/orchestrator/workflows/builtin/workflows.py
@@ -18,8 +18,10 @@ TSOCA normative lifecycle workflows.
 """
 
 from ... import workflow
-from ....modeling.models import Task
-from .utils import (create_node_task, create_relationship_tasks)
+from .utils import (
+    create_node_task,
+    create_relationships_tasks
+)
 
 
 NORMATIVE_STANDARD_INTERFACE = 'Standard' # 'tosca.interfaces.node.lifecycle.Standard'
@@ -39,6 +41,7 @@ NORMATIVE_POST_CONFIGURE_TARGET = 'post_configure_target'
 NORMATIVE_ADD_SOURCE = 'add_source'
 NORMATIVE_ADD_TARGET = 'add_target'
 NORMATIVE_REMOVE_TARGET = 'remove_target'
+NORMATIVE_REMOVE_SOURCE = 'remove_source'
 NORMATIVE_TARGET_CHANGED = 'target_changed'
 
 
@@ -56,6 +59,7 @@ __all__ = (
     'NORMATIVE_POST_CONFIGURE_TARGET',
     'NORMATIVE_ADD_SOURCE',
     'NORMATIVE_ADD_TARGET',
+    'NORMATIVE_REMOVE_SOURCE',
     'NORMATIVE_REMOVE_TARGET',
     'NORMATIVE_TARGET_CHANGED',
     'install_node',
@@ -67,40 +71,20 @@ __all__ = (
 
 @workflow(suffix_template='{node.name}')
 def install_node(graph, node, **kwargs):
-    sequence = []
-
     # Create
-    sequence.append(
-        create_node_task(
-            NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE,
-            node))
+    sequence = [create_node_task(node,
+                                 NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)]
 
     # Configure
-    sequence += \
-        create_relationship_tasks(
-            NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_PRE_CONFIGURE_SOURCE,
-            Task.RUNS_ON_SOURCE,
-            node)
-    sequence += \
-        create_relationship_tasks(
-            NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_PRE_CONFIGURE_TARGET,
-            Task.RUNS_ON_TARGET,
-            node)
-    sequence.append(
-        create_node_task(
-            NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE,
-            node))
-    sequence += \
-        create_relationship_tasks(
-            NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_POST_CONFIGURE_SOURCE,
-            Task.RUNS_ON_SOURCE,
-            node)
-    sequence += \
-        create_relationship_tasks(
-            NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_POST_CONFIGURE_TARGET,
-            Task.RUNS_ON_TARGET,
-            node)
-
+    sequence += create_relationships_tasks(node,
+                                           NORMATIVE_CONFIGURE_INTERFACE,
+                                           NORMATIVE_PRE_CONFIGURE_SOURCE,
+                                           NORMATIVE_PRE_CONFIGURE_TARGET)
+    sequence.append(create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE))
+    sequence += create_relationships_tasks(node,
+                                           NORMATIVE_CONFIGURE_INTERFACE,
+                                           NORMATIVE_POST_CONFIGURE_SOURCE,
+                                           NORMATIVE_POST_CONFIGURE_TARGET)
     # Start
     sequence += _create_start_tasks(node)
 
@@ -113,10 +97,9 @@ def uninstall_node(graph, node, **kwargs):
     sequence = _create_stop_tasks(node)
 
     # Delete
-    sequence.append(
-        create_node_task(
-            NORMATIVE_STANDARD_INTERFACE, NORMATIVE_DELETE,
-            node))
+    sequence.append(create_node_task(node,
+                                     NORMATIVE_STANDARD_INTERFACE,
+                                     NORMATIVE_DELETE))
 
     graph.sequence(*sequence)
 
@@ -132,43 +115,16 @@ def stop_node(graph, node, **kwargs):
 
 
 def _create_start_tasks(node):
-    sequence = []
-    sequence.append(
-        create_node_task(
-            NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START,
-            node))
-    sequence += \
-        create_relationship_tasks(
-            NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_ADD_SOURCE,
-            Task.RUNS_ON_SOURCE,
-            node)
-    sequence += \
-        create_relationship_tasks(
-            NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_ADD_TARGET,
-            Task.RUNS_ON_TARGET,
-            node)
-    sequence += \
-        create_relationship_tasks(
-            NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_TARGET_CHANGED,
-            Task.RUNS_ON_TARGET,
-            node)
+    sequence = [create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)]
+    sequence += create_relationships_tasks(node,
+                                           NORMATIVE_CONFIGURE_INTERFACE,
+                                           NORMATIVE_ADD_SOURCE, NORMATIVE_ADD_TARGET)
     return sequence
 
 
 def _create_stop_tasks(node):
-    sequence = []
-    sequence += \
-        create_relationship_tasks(
-            NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_REMOVE_TARGET,
-            Task.RUNS_ON_TARGET,
-            node)
-    sequence += \
-        create_relationship_tasks(
-            NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_TARGET_CHANGED,
-            Task.RUNS_ON_TARGET,
-            node)
-    sequence.append(
-        create_node_task(
-            NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP,
-            node))
+    sequence = [create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)]
+    sequence += create_relationships_tasks(node,
+                                           NORMATIVE_CONFIGURE_INTERFACE,
+                                           NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET)
     return sequence

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/85427299/tests/mock/context.py
----------------------------------------------------------------------
diff --git a/tests/mock/context.py b/tests/mock/context.py
index 3de3133..f943d7e 100644
--- a/tests/mock/context.py
+++ b/tests/mock/context.py
@@ -27,9 +27,10 @@ from ..storage import init_inmemory_model_storage
 from .topology import create_simple_topology_two_nodes
 
 
-def simple(tmpdir, inmemory=False, context_kwargs=None):
+def simple(tmpdir, inmemory=False, context_kwargs=None, topology=None):
     initiator = init_inmemory_model_storage if inmemory else None
     initiator_kwargs = {} if inmemory else dict(base_dir=tmpdir)
+    topology = topology or create_simple_topology_two_nodes
 
     model_storage = aria.application_model_storage(
         sql_mapi.SQLAlchemyModelAPI, initiator=initiator, initiator_kwargs=initiator_kwargs)
@@ -38,13 +39,11 @@ def simple(tmpdir, inmemory=False, context_kwargs=None):
         api_kwargs=dict(directory=os.path.join(tmpdir, 'resources'))
     )
 
-    service_id = create_simple_topology_two_nodes(model_storage)
-
     final_kwargs = dict(
         name='simple_context',
         model_storage=model_storage,
         resource_storage=resource_storage,
-        service_id=service_id,
+        service_id=topology(model_storage),
         workflow_name=models.WORKFLOW_NAME,
         task_max_attempts=models.TASK_MAX_ATTEMPTS,
         task_retry_interval=models.TASK_RETRY_INTERVAL

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/85427299/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index 9ae2815..457e7cb 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -16,7 +16,26 @@
 from datetime import datetime
 
 from aria.modeling import models
-from . import operations
+from aria.orchestrator import decorators
+from aria.orchestrator.workflows.builtin.workflows import (
+    NORMATIVE_STANDARD_INTERFACE,
+    NORMATIVE_CREATE,
+    NORMATIVE_START,
+    NORMATIVE_STOP,
+    NORMATIVE_DELETE,
+    NORMATIVE_CONFIGURE,
+
+    NORMATIVE_CONFIGURE_INTERFACE,
+    NORMATIVE_PRE_CONFIGURE_SOURCE,
+    NORMATIVE_PRE_CONFIGURE_TARGET,
+    NORMATIVE_POST_CONFIGURE_SOURCE,
+    NORMATIVE_POST_CONFIGURE_TARGET,
+
+    NORMATIVE_ADD_SOURCE,
+    NORMATIVE_ADD_TARGET,
+    NORMATIVE_REMOVE_TARGET,
+    NORMATIVE_REMOVE_SOURCE
+)
 
 SERVICE_NAME = 'test_service_name'
 SERVICE_TEMPLATE_NAME = 'test_service_template_name'
@@ -62,7 +81,7 @@ def create_service(service_template):
     )
 
 
-def create_dependency_node_template(service_template):
+def create_dependency_node_template(name, service_template):
     node_type = service_template.node_types.get_descendant('test_node_type')
     capability_type = service_template.capability_types.get_descendant('test_capability_type')
 
@@ -72,7 +91,7 @@ def create_dependency_node_template(service_template):
     )
 
     node_template = models.NodeTemplate(
-        name=DEPENDENCY_NODE_TEMPLATE_NAME,
+        name=name,
         type=node_type,
         capability_templates=_dictify(capability_template),
         default_instances=1,
@@ -84,29 +103,21 @@ def create_dependency_node_template(service_template):
     return node_template
 
 
-def create_dependent_node_template(service_template, dependency_node_template):
+def create_dependent_node_template(name, service_template, dependency_node_template):
     the_type = service_template.node_types.get_descendant('test_node_type')
 
-    operation_templates = dict((op, models.OperationTemplate(
-        name=op,
-        implementation='test'))
-                               for _, op in operations.NODE_OPERATIONS)
-    interface_template = models.InterfaceTemplate(
-        type=service_template.interface_types.get_descendant('test_interface_type'),
-        operation_templates=operation_templates)
-
     requirement_template = models.RequirementTemplate(
         name='requirement',
         target_node_template=dependency_node_template
     )
 
     node_template = models.NodeTemplate(
-        name=DEPENDENT_NODE_TEMPLATE_NAME,
+        name=name,
         type=the_type,
         default_instances=1,
         min_instances=1,
         max_instances=1,
-        interface_templates=_dictify(interface_template),
+        interface_templates=_dictify(get_standard_interface_template(service_template)),
         requirement_templates=[requirement_template],
         service_template=service_template
     )
@@ -114,31 +125,17 @@ def create_dependent_node_template(service_template, dependency_node_template):
     return node_template
 
 
-def create_dependency_node(dependency_node_template, service):
+def create_node(name, dependency_node_template, service):
     node = models.Node(
-        name=DEPENDENCY_NODE_NAME,
+        name=name,
         type=dependency_node_template.type,
         runtime_properties={'ip': '1.1.1.1'},
         version=None,
         node_template=dependency_node_template,
         state=models.Node.INITIAL,
         scaling_groups=[],
-        service=service
-    )
-    service.nodes[node.name] = node
-    return node
-
-
-def create_dependent_node(dependent_node_template, service):
-    node = models.Node(
-        name=DEPENDENT_NODE_NAME,
-        type=dependent_node_template.type,
-        runtime_properties={},
-        version=None,
-        node_template=dependent_node_template,
-        state=models.Node.INITIAL,
-        scaling_groups=[],
-        service=service
+        service=service,
+        interfaces=get_standard_interface(service),
     )
     service.nodes[node.name] = node
     return node
@@ -147,7 +144,8 @@ def create_dependent_node(dependent_node_template, service):
 def create_relationship(source, target):
     return models.Relationship(
         source_node=source,
-        target_node=target
+        target_node=target,
+        interfaces=get_configure_interfaces(service=source.service),
     )
 
 
@@ -217,3 +215,62 @@ def create_plugin_specification(name='test_plugin', version='0.1'):
 
 def _dictify(item):
     return dict(((item.name, item),))
+
+
+def get_standard_interface_template(service_template):
+    the_type = service_template.interface_types.get_descendant('test_interface_type')
+
+    op_templates = dict(
+        (op_name, models.OperationTemplate(
+            name=op_name, implementation='{0}.{1}'.format(__file__, mock_operation.__name__)))
+        for op_name in [NORMATIVE_CREATE, NORMATIVE_CONFIGURE, NORMATIVE_START,
+                        NORMATIVE_STOP, NORMATIVE_DELETE]
+    )
+    return models.InterfaceTemplate(name=NORMATIVE_STANDARD_INTERFACE,
+                                    operation_templates=op_templates,
+                                    type=the_type)
+
+
+def get_standard_interface(service):
+    the_type = service.service_template.interface_types.get_descendant('test_interface_type')
+
+    ops = dict(
+        (op_name, models.Operation(
+            name=op_name, implementation='{0}.{1}'.format(__file__, mock_operation.__name__)))
+        for op_name in [NORMATIVE_CREATE, NORMATIVE_CONFIGURE, NORMATIVE_START,
+                        NORMATIVE_STOP, NORMATIVE_DELETE]
+    )
+    return {
+        NORMATIVE_STANDARD_INTERFACE:
+            models.Interface(name=NORMATIVE_STANDARD_INTERFACE, operations=ops, type=the_type)
+    }
+
+
+def get_configure_interfaces(service):
+    the_type = service.service_template.interface_types.get_descendant('test_interface_type')
+
+    operations = dict(
+        (op_name, models.Operation(
+            name=op_name, implementation='{0}.{1}'.format(__file__, mock_operation.__name__)))
+        for op_name in [NORMATIVE_PRE_CONFIGURE_SOURCE,
+                        NORMATIVE_POST_CONFIGURE_SOURCE,
+                        NORMATIVE_ADD_SOURCE,
+                        NORMATIVE_REMOVE_SOURCE,
+
+                        NORMATIVE_PRE_CONFIGURE_TARGET,
+                        NORMATIVE_POST_CONFIGURE_TARGET,
+                        NORMATIVE_ADD_TARGET,
+                        NORMATIVE_REMOVE_TARGET
+                       ]
+    )
+    interface = {
+        NORMATIVE_CONFIGURE_INTERFACE: models.Interface(
+            name=NORMATIVE_CONFIGURE_INTERFACE, operations=operations, type=the_type)
+    }
+
+    return interface
+
+
+@decorators.operation
+def mock_operation(*args, **kwargs):
+    pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/85427299/tests/mock/topology.py
----------------------------------------------------------------------
diff --git a/tests/mock/topology.py b/tests/mock/topology.py
index 7ccc885..edf6b7d 100644
--- a/tests/mock/topology.py
+++ b/tests/mock/topology.py
@@ -22,7 +22,8 @@ def create_simple_topology_single_node(model_storage, create_operation):
     service_template = models.create_service_template()
     service = models.create_service(service_template)
 
-    node_template = models.create_dependency_node_template(service_template)
+    node_template = models.create_dependency_node_template(
+        models.DEPENDENCY_NODE_TEMPLATE_NAME, service_template)
     interface_template = models.create_interface_template(
         service_template,
         'Standard', 'create',
@@ -33,7 +34,7 @@ def create_simple_topology_single_node(model_storage, create_operation):
     )
     node_template.interface_templates[interface_template.name] = interface_template                 # pylint: disable=unsubscriptable-object
 
-    node = models.create_dependency_node(node_template, service)
+    node = models.create_node(models.DEPENDENCY_NODE_NAME, node_template, service)
     interface = models.create_interface(
         service,
         'Standard', 'create',
@@ -54,12 +55,15 @@ def create_simple_topology_two_nodes(model_storage):
 
     # Creating a simple service with node -> node as a graph
 
-    dependency_node_template = models.create_dependency_node_template(service_template)
-    dependent_node_template = models.create_dependent_node_template(service_template,
-                                                                    dependency_node_template)
+    dependency_node_template = models.create_dependency_node_template(
+        models.DEPENDENCY_NODE_TEMPLATE_NAME, service_template)
+    dependent_node_template = models.create_dependent_node_template(
+        models.DEPENDENT_NODE_TEMPLATE_NAME, service_template, dependency_node_template)
 
-    dependency_node = models.create_dependency_node(dependency_node_template, service)
-    dependent_node = models.create_dependent_node(dependent_node_template, service)
+    dependency_node = models.create_node(
+        models.DEPENDENCY_NODE_NAME, dependency_node_template, service)
+    dependent_node = models.create_node(
+        models.DEPENDENT_NODE_NAME, dependent_node_template, service)
 
     dependent_node.outbound_relationships.append(models.create_relationship(                        # pylint: disable=no-member
         source=dependent_node,
@@ -70,3 +74,26 @@ def create_simple_topology_two_nodes(model_storage):
     model_storage.service.put(service)
 
     return service.id
+
+
+def create_simple_topology_three_nodes(model_storage):
+    #################################################################################
+    # Creating a simple deployment with the following topology:
+    #               node1    <----|
+    #                             | <- node0
+    #               node2    <----|
+    # meaning node0 has two relationships: node1 and node2 (one each).
+
+    service_id = create_simple_topology_two_nodes(model_storage)
+    service = model_storage.service.get(service_id)
+    third_node_template = models.create_dependency_node_template(
+        'another_dependency_node_template', service.service_template)
+    third_node = models.create_node(
+        'another_dependency_node', third_node_template, service)
+    new_relationship = models.create_relationship(
+        source=model_storage.node.get_by_name(models.DEPENDENT_NODE_NAME),
+        target=third_node,
+    )
+    model_storage.relationship.put(new_relationship)
+
+    return service_id

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/85427299/tests/modeling/test_models.py
----------------------------------------------------------------------
diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py
index 84200d5..35ae09f 100644
--- a/tests/modeling/test_models.py
+++ b/tests/modeling/test_models.py
@@ -89,21 +89,27 @@ def _service_update_storage():
 def _node_template_storage():
     storage = _service_storage()
     service_template = storage.service_template.list()[0]
-    dependency_node_template = mock.models.create_dependency_node_template(service_template)
-    mock.models.create_dependent_node_template(service_template, dependency_node_template)
+    dependency_node_template = mock.models.create_dependency_node_template(
+        mock.models.DEPENDENCY_NODE_TEMPLATE_NAME, service_template)
+    mock.models.create_dependent_node_template(
+        mock.models.DEPENDENCY_NODE_NAME, service_template, dependency_node_template)
     storage.service_template.update(service_template)
     return storage
 
 
-def _node_storage():
+def _nodes_storage():
     storage = _node_template_storage()
     service = storage.service.get_by_name(mock.models.SERVICE_NAME)
     dependency_node_template = storage.node_template.get_by_name(
         mock.models.DEPENDENCY_NODE_TEMPLATE_NAME)
-    dependent_node_template = storage.node_template.get_by_name(
-        mock.models.DEPENDENT_NODE_TEMPLATE_NAME)
-    mock.models.create_dependency_node(dependency_node_template, service)
-    mock.models.create_dependent_node(dependent_node_template, service)
+    mock.models.create_node(mock.models.DEPENDENCY_NODE_NAME, dependency_node_template, service)
+
+    dependent_node_template = \
+        mock.models.create_dependent_node_template(mock.models.DEPENDENT_NODE_TEMPLATE_NAME,
+                                                   service.service_template,
+                                                   dependency_node_template)
+
+    mock.models.create_node(mock.models.DEPENDENT_NODE_NAME, dependent_node_template, service)
     storage.service.update(service)
     return storage
 
@@ -148,8 +154,8 @@ def node_template_storage():
 
 
 @pytest.fixture
-def node_storage():
-    with sql_storage(_node_storage) as storage:
+def nodes_storage():
+    with sql_storage(_nodes_storage) as storage:
         yield storage
 
 
@@ -671,13 +677,13 @@ class TestRelationship(object):
             (True, 0, None),
         ]
     )
-    def test_relationship_model_creation(self, node_storage, is_valid, source_position,
+    def test_relationship_model_creation(self, nodes_storage, is_valid, source_position,
                                          target_position):
-        nodes = node_storage.node
+        nodes = nodes_storage.node
         source_node = nodes.get_by_name(mock.models.DEPENDENT_NODE_NAME)
         target_node = nodes.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
         _test_model(is_valid=is_valid,
-                    storage=node_storage,
+                    storage=nodes_storage,
                     model_cls=Relationship,
                     model_kwargs=dict(
                         source_node=source_node,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/85427299/tests/orchestrator/workflows/builtin/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/builtin/__init__.py b/tests/orchestrator/workflows/builtin/__init__.py
index 9f60e55..5ea761c 100644
--- a/tests/orchestrator/workflows/builtin/__init__.py
+++ b/tests/orchestrator/workflows/builtin/__init__.py
@@ -13,18 +13,65 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from tests import mock
+from aria.orchestrator.workflows.builtin import workflows
 
 
-def assert_node_install_operations(operations, with_relationships=False):
-    all_operations = mock.operations.OPERATIONS_INSTALL if with_relationships else \
-                     mock.operations.NODE_OPERATIONS_INSTALL
-    for i, operation in enumerate(operations):
-        assert operation.name.startswith(all_operations[i] + '.')
+def _assert_relationships(operations, expected_op_full_name, relationships=0):
+    """
 
+    :param operations: and iterable of operations
+    :param expected_op_full_name: Note that source/target doesn't really matter since they are
+    dropped
+    :param relationships: the number of relationships
+    :return:
+    """
+    expected_op_name = expected_op_full_name.rsplit('_', 1)[0]
+    for _ in xrange(relationships):
+        # Since the target and source operations start of the same way, we only need to retrieve the
+        # suffix once
+        operation = next(operations)
+        relationship_id_1 = operation.actor.id
+        edge1 = operation.runs_on
+        _assert_cfg_interface_op(operation, expected_op_name)
 
-def assert_node_uninstall_operations(operations, with_relationships=False):
-    all_operations = mock.operations.OPERATIONS_UNINSTALL if with_relationships else \
-                     mock.operations.NODE_OPERATIONS_UNINSTALL
-    for i, operation in enumerate(operations):
-        assert operation.name.startswith(all_operations[i] + '.')
+        operation = next(operations)
+        relationship_id_2 = operation.actor.id
+        edge2 = operation.runs_on
+        _assert_cfg_interface_op(operation, expected_op_name)
+
+        assert relationship_id_1 == relationship_id_2
+        print '++++' + edge1, edge2
+        assert edge1 != edge2
+
+
+def assert_node_install_operations(operations, relationships=0):
+    operations = iter(operations)
+
+    _assert_std_interface_op(next(operations), workflows.NORMATIVE_CREATE)
+    _assert_relationships(operations, workflows.NORMATIVE_PRE_CONFIGURE_SOURCE, relationships)
+    _assert_std_interface_op(next(operations), workflows.NORMATIVE_CONFIGURE)
+    _assert_relationships(operations, workflows.NORMATIVE_POST_CONFIGURE_SOURCE, relationships)
+    _assert_std_interface_op(next(operations), workflows.NORMATIVE_START)
+    _assert_relationships(operations, workflows.NORMATIVE_ADD_SOURCE, relationships)
+
+
+def assert_node_uninstall_operations(operations, relationships=0):
+    operations = iter(operations)
+
+    _assert_std_interface_op(next(operations), workflows.NORMATIVE_STOP)
+    _assert_relationships(operations, workflows.NORMATIVE_REMOVE_SOURCE, relationships)
+    _assert_std_interface_op(next(operations), workflows.NORMATIVE_DELETE)
+
+
+def _assert_cfg_interface_op(op, operation_name):
+    op = op.name.split('@', 1)[0].rsplit('_', 1)[0].lower()
+    predicted_name = \
+        '{0}:{1}'.format(workflows.NORMATIVE_CONFIGURE_INTERFACE, operation_name).lower()
+    assert op == predicted_name
+
+
+def _assert_std_interface_op(op, operation_name):
+    op = op.name.split('@', 1)[0].lower()
+    predicted_name = '{0}:{1}'\
+        .format(workflows.NORMATIVE_STANDARD_INTERFACE, operation_name).lower()
+    assert op == predicted_name

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/85427299/tests/orchestrator/workflows/builtin/test_heal.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/builtin/test_heal.py b/tests/orchestrator/workflows/builtin/test_heal.py
index 92fa7ea..0a422bd 100644
--- a/tests/orchestrator/workflows/builtin/test_heal.py
+++ b/tests/orchestrator/workflows/builtin/test_heal.py
@@ -20,8 +20,7 @@ from aria.orchestrator.workflows.builtin.heal import heal
 
 from tests import mock, storage
 
-from . import (assert_node_install_operations,
-               assert_node_uninstall_operations)
+from . import (assert_node_install_operations, assert_node_uninstall_operations)
 
 
 @pytest.fixture
@@ -57,8 +56,8 @@ def test_heal_dependent_node(ctx):
         list(dependent_node_subgraph_install.topological_order(reverse=True))
     assert isinstance(dependency_node_subgraph_install, task.StubTask)
 
-    assert_node_uninstall_operations(dependent_node_uninstall_tasks, with_relationships=True)
-    assert_node_install_operations(dependent_node_install_tasks, with_relationships=True)
+    assert_node_uninstall_operations(dependent_node_uninstall_tasks, relationships=1)
+    assert_node_install_operations(dependent_node_install_tasks, relationships=1)
 
 
 @pytest.mark.skip(reason='heal is not implemented for now')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/85427299/tests/orchestrator/workflows/builtin/test_install.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/builtin/test_install.py b/tests/orchestrator/workflows/builtin/test_install.py
index 1791719..1a4e1f9 100644
--- a/tests/orchestrator/workflows/builtin/test_install.py
+++ b/tests/orchestrator/workflows/builtin/test_install.py
@@ -25,7 +25,8 @@ from . import assert_node_install_operations
 
 @pytest.fixture
 def ctx(tmpdir):
-    context = mock.context.simple(str(tmpdir))
+    context = mock.context.simple(str(tmpdir),
+                                  topology=mock.topology.create_simple_topology_three_nodes)
     yield context
     storage.release_sqlite_storage(context.model)
 
@@ -34,10 +35,12 @@ def test_install(ctx):
 
     install_tasks = list(task.WorkflowTask(install, ctx=ctx).topological_order(True))
 
-    assert len(install_tasks) == 2
-    dependency_node_subgraph, dependent_node_subgraph = install_tasks
+    assert len(install_tasks) == 3
+    dependency_node_subgraph1, dependency_node_subgraph2, dependent_node_subgraph = install_tasks
     dependent_node_tasks = list(dependent_node_subgraph.topological_order(reverse=True))
-    dependency_node_tasks = list(dependency_node_subgraph.topological_order(reverse=True))
+    dependency_node1_tasks = list(dependency_node_subgraph1.topological_order(reverse=True))
+    dependency_node2_tasks = list(dependency_node_subgraph2.topological_order(reverse=True))
 
-    assert_node_install_operations(dependency_node_tasks)
-    assert_node_install_operations(dependent_node_tasks, with_relationships=True)
+    assert_node_install_operations(dependency_node1_tasks)
+    assert_node_install_operations(dependency_node2_tasks)
+    assert_node_install_operations(dependent_node_tasks, relationships=2)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/85427299/tests/orchestrator/workflows/builtin/test_uninstall.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/builtin/test_uninstall.py b/tests/orchestrator/workflows/builtin/test_uninstall.py
index 791291f..aa04c38 100644
--- a/tests/orchestrator/workflows/builtin/test_uninstall.py
+++ b/tests/orchestrator/workflows/builtin/test_uninstall.py
@@ -26,7 +26,8 @@ from . import assert_node_uninstall_operations
 
 @pytest.fixture
 def ctx(tmpdir):
-    context = mock.context.simple(str(tmpdir))
+    context = mock.context.simple(str(tmpdir),
+                                  topology=mock.topology.create_simple_topology_three_nodes)
     yield context
     storage.release_sqlite_storage(context.model)
 
@@ -35,10 +36,12 @@ def test_uninstall(ctx):
 
     uninstall_tasks = list(task.WorkflowTask(uninstall, ctx=ctx).topological_order(True))
 
-    assert len(uninstall_tasks) == 2
-    dependent_node_subgraph, dependency_node_subgraph = uninstall_tasks
+    assert len(uninstall_tasks) == 3
+    dependent_node_subgraph, dependency_node_subgraph1, dependency_node_subgraph2 = uninstall_tasks
     dependent_node_tasks = list(dependent_node_subgraph.topological_order(reverse=True))
-    dependency_node_tasks = list(dependency_node_subgraph.topological_order(reverse=True))
+    dependency_node1_tasks = list(dependency_node_subgraph1.topological_order(reverse=True))
+    dependency_node2_tasks = list(dependency_node_subgraph2.topological_order(reverse=True))
 
-    assert_node_uninstall_operations(operations=dependency_node_tasks)
-    assert_node_uninstall_operations(operations=dependent_node_tasks, with_relationships=True)
+    assert_node_uninstall_operations(operations=dependency_node1_tasks)
+    assert_node_uninstall_operations(operations=dependency_node2_tasks)
+    assert_node_uninstall_operations(operations=dependent_node_tasks, relationships=2)



[3/3] incubator-ariatosca git commit: code review fixups

Posted by mx...@apache.org.
code review fixups


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/95c692c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/95c692c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/95c692c8

Branch: refs/heads/ARIA-120-Builtin-workflows-relationship-operations-execution-order
Commit: 95c692c8c257c5c8380e0e506e79fd308ebbda82
Parents: 8542729
Author: max-orlov <ma...@gigaspaces.com>
Authored: Mon Mar 27 17:55:03 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Mon Mar 27 23:38:37 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/api/task.py         | 26 +++++++++++---------
 aria/orchestrator/workflows/builtin/utils.py    | 25 ++++++-------------
 .../orchestrator/workflows/builtin/workflows.py |  2 +-
 .../workflows/core/events_handler.py            |  6 ++---
 aria/orchestrator/workflows/core/task.py        |  2 ++
 .../orchestrator/workflows/builtin/__init__.py  | 14 ++++-------
 .../workflows/executor/test_executor.py         |  2 ++
 .../workflows/executor/test_process_executor.py |  4 ++-
 8 files changed, 38 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95c692c8/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index 25d631d..f49ec2e 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -90,6 +90,8 @@ class OperationTask(BaseTask):
         self.ignore_failure = (self.workflow_context._task_ignore_failure
                                if ignore_failure is None else ignore_failure)
         self.runs_on = runs_on
+        self.interface_name = interface_name
+        self.operation_name = operation_name
 
         # Wrap inputs
         inputs = copy.deepcopy(inputs) if inputs else {}
@@ -101,11 +103,11 @@ class OperationTask(BaseTask):
         # model, because they are different from the operation inputs. If we do this, then the two
         # kinds of inputs should *not* be merged here.
 
-        operation = self._get_operation(interface_name, operation_name)
+        operation = self._get_operation()
         if operation is None:
             raise exceptions.OperationNotFoundException(
                 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"'
-                .format(operation_name, interface_name, actor_type, actor.name))
+                .format(self.operation_name, self.interface_name, actor_type, actor.name))
 
         self.plugin = None
         if operation.plugin_specification:
@@ -113,19 +115,27 @@ class OperationTask(BaseTask):
             if self.plugin is None:
                 raise exceptions.PluginNotFoundException(
                     'Could not find plugin of operation "{0}" on interface "{1}" for {2} "{3}"'
-                    .format(operation_name, interface_name, actor_type, actor.name))
+                    .format(self.operation_name, self.interface_name, actor_type, actor.name))
 
         self.implementation = operation.implementation
         self.inputs = OperationTask._merge_inputs(operation.inputs, inputs)
 
         self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
                                                      name=actor.name,
-                                                     interface=interface_name,
-                                                     operation=operation_name)
+                                                     interface=self.interface_name,
+                                                     operation=self.operation_name)
 
     def __repr__(self):
         return self.name
 
+    def _get_operation(self):
+        interface = self.actor.interfaces.get(self.interface_name)
+        if interface:
+            return interface.operations.get(self.operation_name)
+        return None
+
+
+
     @classmethod
     def for_node(cls,
                  node,
@@ -201,12 +211,6 @@ class OperationTask(BaseTask):
             ignore_failure=ignore_failure,
             inputs=inputs)
 
-    def _get_operation(self, interface_name, operation_name):
-        interface = self.actor.interfaces.get(interface_name)
-        if interface is not None:
-            return interface.operations.get(operation_name)
-        return None
-
     @staticmethod
     def _find_plugin(plugin_specification):
         workflow_context = context.workflow.current.get()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95c692c8/aria/orchestrator/workflows/builtin/utils.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py
index 79d02ab..d79318f 100644
--- a/aria/orchestrator/workflows/builtin/utils.py
+++ b/aria/orchestrator/workflows/builtin/utils.py
@@ -12,8 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from itertools import groupby
-
 from ..api.task import OperationTask
 from .. import exceptions
 
@@ -37,24 +35,24 @@ def create_relationships_tasks(
     """
     Creates a relationship task (source and target) for all of a node_instance relationships.
     :param basestring source_operation_name: the relationship operation name.
+    :param basestring interface_name: the name of the interface.
     :param source_operation_name:
     :param target_operation_name:
     :param NodeInstance node: the source_node
     :return:
     """
-    relationships_groups = groupby(node.outbound_relationships,
-                                   key=lambda relationship: relationship.target_node.id)
-
     sub_tasks = []
-    for _, (_, relationship_group) in enumerate(relationships_groups):
-        for relationship in relationship_group:
+    for relationship in node.outbound_relationships:
+        try:
             relationship_operations = relationship_tasks(
                 relationship,
                 interface_name,
                 source_operation_name=source_operation_name,
                 target_operation_name=target_operation_name)
             sub_tasks.append(relationship_operations)
-
+        except exceptions.OperationNotFoundException:
+            # We will skip relationships which do not have the operation
+            pass
     return sub_tasks
 
 
@@ -69,14 +67,14 @@ def relationship_tasks(
     :return:
     """
     operations = []
-    if source_operation_name and _has_operation(relationship.interfaces, source_operation_name):
+    if source_operation_name:
         operations.append(
             OperationTask.for_relationship(relationship=relationship,
                                            interface_name=interface_name,
                                            operation_name=source_operation_name,
                                            runs_on='source')
         )
-    if target_operation_name and _has_operation(relationship.interfaces, target_operation_name):
+    if target_operation_name:
         operations.append(
             OperationTask.for_relationship(relationship=relationship,
                                            interface_name=interface_name,
@@ -110,10 +108,3 @@ def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False):
                     graph.add_dependency(dependency, task)
             else:
                 graph.add_dependency(task, dependencies)
-
-
-def _has_operation(interfaces, operation_name):
-    for interface in interfaces.values():
-        if operation_name in interface.operations:
-            return True
-    return False

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95c692c8/aria/orchestrator/workflows/builtin/workflows.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py
index 67badcf..60f14ed 100644
--- a/aria/orchestrator/workflows/builtin/workflows.py
+++ b/aria/orchestrator/workflows/builtin/workflows.py
@@ -28,11 +28,11 @@ NORMATIVE_STANDARD_INTERFACE = 'Standard' # 'tosca.interfaces.node.lifecycle.Sta
 NORMATIVE_CONFIGURE_INTERFACE = 'Configure' # 'tosca.interfaces.relationship.Configure'
 
 NORMATIVE_CREATE = 'create'
+NORMATIVE_CONFIGURE = 'configure'
 NORMATIVE_START = 'start'
 NORMATIVE_STOP = 'stop'
 NORMATIVE_DELETE = 'delete'
 
-NORMATIVE_CONFIGURE = 'configure'
 NORMATIVE_PRE_CONFIGURE_SOURCE = 'pre_configure_source'
 NORMATIVE_PRE_CONFIGURE_TARGET = 'pre_configure_target'
 NORMATIVE_POST_CONFIGURE_SOURCE = 'post_configure_source'

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95c692c8/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 4ac4b64..7d19784 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -126,11 +126,9 @@ def _workflow_cancelling(workflow_context, *args, **kwargs):
 
 
 def _update_node_state_if_necessary(task, is_transitional=False):
-    match = re.search(r'^(?:tosca.interfaces.node.lifecycle.Standard|Standard):(\S+)@node',
-                      task.name)
-    if match:
+    if task.interface_name in ['tosca.interfaces.node.lifecycle.Standard', 'Standard']:
         node = task.runs_on
-        state = node.determine_state(op_name=match.group(1), is_transitional=is_transitional)
+        state = node.determine_state(op_name=task.operation_name, is_transitional=is_transitional)
         if state:
             node.state = state
             task.context.model.node.update(node)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95c692c8/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index f23312d..1e13588 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -106,6 +106,8 @@ class OperationTask(BaseTask):
     def __init__(self, api_task, *args, **kwargs):
         super(OperationTask, self).__init__(id=api_task.id, **kwargs)
         self._workflow_context = api_task._workflow_context
+        self.interface_name = api_task.interface_name
+        self.operation_name = api_task.operation_name
         model_storage = api_task._workflow_context.model
         plugin = api_task.plugin
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95c692c8/tests/orchestrator/workflows/builtin/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/builtin/__init__.py b/tests/orchestrator/workflows/builtin/__init__.py
index 5ea761c..2aef2cf 100644
--- a/tests/orchestrator/workflows/builtin/__init__.py
+++ b/tests/orchestrator/workflows/builtin/__init__.py
@@ -40,7 +40,6 @@ def _assert_relationships(operations, expected_op_full_name, relationships=0):
         _assert_cfg_interface_op(operation, expected_op_name)
 
         assert relationship_id_1 == relationship_id_2
-        print '++++' + edge1, edge2
         assert edge1 != edge2
 
 
@@ -64,14 +63,11 @@ def assert_node_uninstall_operations(operations, relationships=0):
 
 
 def _assert_cfg_interface_op(op, operation_name):
-    op = op.name.split('@', 1)[0].rsplit('_', 1)[0].lower()
-    predicted_name = \
-        '{0}:{1}'.format(workflows.NORMATIVE_CONFIGURE_INTERFACE, operation_name).lower()
-    assert op == predicted_name
+    predicted_name = '{0}:{1}'.format(workflows.NORMATIVE_CONFIGURE_INTERFACE, operation_name)
+    operation_name = op.operation_name.rsplit('_', 1)[0]
+    assert '{0}:{1}'.format(op.interface_name, operation_name).lower() == predicted_name.lower()
 
 
 def _assert_std_interface_op(op, operation_name):
-    op = op.name.split('@', 1)[0].lower()
-    predicted_name = '{0}:{1}'\
-        .format(workflows.NORMATIVE_STANDARD_INTERFACE, operation_name).lower()
-    assert op == predicted_name
+    predicted_name = '{0}:{1}'.format(workflows.NORMATIVE_STANDARD_INTERFACE, operation_name)
+    assert '{op.interface_name}:{op.operation_name}'.format(op=op).lower() == predicted_name.lower()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95c692c8/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 0a2280d..9dde1ce 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -118,6 +118,8 @@ class MockTask(object):
         self.max_attempts = 1
         self.plugin_fk = None
         self.ignore_failure = False
+        self.interface_name = 'interface_name'
+        self.operation_name = 'operation_name'
 
         for state in models.Task.STATES:
             setattr(self, state.upper(), state)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95c692c8/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index 3cd1c47..761c9f7 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -140,7 +140,9 @@ class MockTask(object):
         self.plugin_fk = plugin.id
         self.plugin = plugin
         self.ignore_failure = False
-
+        self.interface_name = 'interface_name'
+        self.operation_name = 'operation_name'
+        
         for state in aria_models.Task.STATES:
             setattr(self, state.upper(), state)