You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by da...@apache.org on 2016/12/19 14:48:09 UTC

incubator-ariatosca git commit: ARIA-26 Implement operation plugin mechanism

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/master 04c9bd079 -> 5cf84eebe


ARIA-26 Implement operation plugin mechanism


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5cf84eeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5cf84eeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5cf84eeb

Branch: refs/heads/master
Commit: 5cf84eebe425bbade0d9285081cebf5d0a62a675
Parents: 04c9bd0
Author: Dan Kilman <da...@gigaspaces.com>
Authored: Sun Nov 27 16:31:29 2016 +0200
Committer: Dan Kilman <da...@gigaspaces.com>
Committed: Mon Dec 19 15:47:38 2016 +0200

----------------------------------------------------------------------
 aria/cli/commands.py                            |   4 +-
 aria/orchestrator/exceptions.py                 |   7 +
 aria/orchestrator/plugin.py                     | 105 +++++++
 aria/orchestrator/workflows/api/task.py         |  44 ++-
 aria/orchestrator/workflows/core/task.py        |  12 +-
 .../orchestrator/workflows/executor/__init__.py |   2 +-
 aria/orchestrator/workflows/executor/base.py    |   6 +-
 .../orchestrator/workflows/executor/blocking.py |  36 ---
 .../workflows/executor/multiprocess.py          |  98 -------
 aria/orchestrator/workflows/executor/process.py | 277 +++++++++++++++++++
 aria/orchestrator/workflows/executor/thread.py  |   4 +-
 aria/storage/models.py                          |  15 +-
 aria/utils/plugin.py                            |  20 ++
 requirements.txt                                |   3 +-
 tests/mock/models.py                            |  16 ++
 tests/orchestrator/workflows/api/test_task.py   |  51 +++-
 tests/orchestrator/workflows/core/test_task.py  |  16 +-
 .../workflows/executor/test_executor.py         | 111 ++++----
 .../workflows/executor/test_process_executor.py | 130 +++++++++
 tests/resources/__init__.py                     |  19 ++
 .../plugins/mock-plugin1/mock_plugin1.py        |  27 ++
 tests/resources/plugins/mock-plugin1/setup.py   |  28 ++
 tests/storage/test_models.py                    | 188 ++++++-------
 tests/utils/__init__.py                         |  14 +
 tests/utils/test_plugin.py                      |  77 ++++++
 25 files changed, 990 insertions(+), 320 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/cli/commands.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands.py b/aria/cli/commands.py
index 141da07..1cd765f 100644
--- a/aria/cli/commands.py
+++ b/aria/cli/commands.py
@@ -34,7 +34,7 @@ from ..logger import LoggerMixin
 from ..storage import (FileSystemModelDriver, FileSystemResourceDriver)
 from ..orchestrator.context.workflow import WorkflowContext
 from ..orchestrator.workflows.core.engine import Engine
-from ..orchestrator.workflows.executor.thread import ThreadExecutor
+from ..orchestrator.workflows.executor.process import ProcessExecutor
 from ..parser import iter_specifications
 from ..parser.consumption import (
     ConsumptionContext,
@@ -252,7 +252,7 @@ class ExecuteCommand(BaseCommand):
         )
         workflow_function = self._load_workflow_handler(workflow['operation'])
         tasks_graph = workflow_function(workflow_context, **workflow_context.parameters)
-        executor = ThreadExecutor()
+        executor = ProcessExecutor()
         workflow_engine = Engine(executor=executor,
                                  workflow_context=workflow_context,
                                  tasks_graph=tasks_graph)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py
index 1a48194..74e9002 100644
--- a/aria/orchestrator/exceptions.py
+++ b/aria/orchestrator/exceptions.py
@@ -23,3 +23,10 @@ class OrchestratorError(AriaError):
     Orchestrator based exception
     """
     pass
+
+
+class PluginAlreadyExistsError(AriaError):
+    """
+    Raised when a plugin with the same package name and package version already exists
+    """
+    pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/plugin.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/plugin.py b/aria/orchestrator/plugin.py
new file mode 100644
index 0000000..3005756
--- /dev/null
+++ b/aria/orchestrator/plugin.py
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import tempfile
+import subprocess
+import sys
+from datetime import datetime
+
+import wagon
+
+from . import exceptions
+
+
+class PluginManager(object):
+
+    def __init__(self, model, plugins_dir):
+        """
+        :param plugins_dir: Root directory to install plugins in.
+        """
+        self._model = model
+        self._plugins_dir = plugins_dir
+
+    def install(self, source):
+        """
+        Install a wagon plugin.
+        """
+        metadata = wagon.show(source)
+        cls = self._model.plugin.model_cls
+        plugin = cls(
+            archive_name=metadata['archive_name'],
+            supported_platform=metadata['supported_platform'],
+            supported_py_versions=metadata['supported_python_versions'],
+            # Remove suffix colon after upgrading wagon to > 0.5.0
+            distribution=metadata['build_server_os_properties']['distribution:'],
+            distribution_release=metadata['build_server_os_properties']['distribution_version'],
+            distribution_version=metadata['build_server_os_properties']['distribution_release'],
+            package_name=metadata['package_name'],
+            package_version=metadata['package_version'],
+            package_source=metadata['package_source'],
+            wheels=metadata['wheels'],
+            uploaded_at=datetime.now()
+        )
+        if len(self._model.plugin.list(filters={'package_name': plugin.package_name,
+                                                'package_version': plugin.package_version})):
+            raise exceptions.PluginAlreadyExistsError(
+                'Plugin {0}, version {1} already exists'.format(plugin.package_name,
+                                                                plugin.package_version))
+        self._install_wagon(source=source, prefix=self.get_plugin_prefix(plugin))
+        self._model.plugin.put(plugin)
+        return plugin
+
+    def get_plugin_prefix(self, plugin):
+        return os.path.join(
+            self._plugins_dir,
+            '{0}-{1}'.format(plugin.package_name, plugin.package_version))
+
+    def _install_wagon(self, source, prefix):
+        pip_freeze_output = self._pip_freeze()
+        file_descriptor, constraint_path = tempfile.mkstemp(prefix='constraint-', suffix='.txt')
+        os.close(file_descriptor)
+        try:
+            with open(constraint_path, 'wb') as constraint:
+                constraint.write(pip_freeze_output)
+            # Install the provided wagon.
+            # * The --prefix install_arg will cause the plugin to be installed under
+            #   plugins_dir/{package_name}-{package_version}, So different plugins don't step on
+            #   each other and don't interfere with the current virtualenv
+            # * The --constraint flag points a file containing the output of ``pip freeze``.
+            #   It is required, to handle cases where plugins depend on some python package with
+            #   a different version than the one installed in the current virtualenv. Without this
+            #   flag, the existing package will be **removed** from the parent virtualenv and the
+            #   new package will be installed under prefix. With the flag, the existing version will
+            #   remain, and the version requested by the plugin will be ignored.
+            wagon.install(
+                source=source,
+                install_args='--prefix="{prefix}" --constraint="{constraint}"'.format(
+                    prefix=prefix,
+                    constraint=constraint.name),
+                virtualenv=os.environ.get('VIRTUAL_ENV'))
+        finally:
+            os.remove(constraint_path)
+
+    @staticmethod
+    def _pip_freeze():
+        """Run pip freeze in current environment and return the output"""
+        bin_dir = 'Scripts' if os.name == 'nt' else 'bin'
+        pip_path = os.path.join(sys.prefix, bin_dir,
+                                'pip{0}'.format('.exe' if os.name == 'nt' else ''))
+        pip_freeze = subprocess.Popen([pip_path, 'freeze'], stdout=subprocess.PIPE)
+        pip_freeze_output, _ = pip_freeze.communicate()
+        assert not pip_freeze.poll()
+        return pip_freeze_output

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index 1c12407..4f025b6 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -67,11 +67,11 @@ class OperationTask(BaseTask):
                  max_attempts=None,
                  retry_interval=None,
                  ignore_failure=None,
-                 inputs=None):
+                 inputs=None,
+                 plugin=None):
         """
         Creates an operation task using the name, details, node instance and any additional kwargs.
         :param name: the operation of the name.
-        :param operation_details: the details for the operation.
         :param actor: the operation host on which this operation is registered.
         :param inputs: operation inputs.
         """
@@ -82,6 +82,7 @@ class OperationTask(BaseTask):
         self.name = '{name}.{actor.id}'.format(name=name, actor=actor)
         self.operation_mapping = operation_mapping
         self.inputs = inputs or {}
+        self.plugin = plugin or {}
         self.max_attempts = (self.workflow_context._task_max_attempts
                              if max_attempts is None else max_attempts)
         self.retry_interval = (self.workflow_context._task_retry_interval
@@ -98,15 +99,13 @@ class OperationTask(BaseTask):
         :param name: the name of the operation.
         """
         assert isinstance(instance, models.NodeInstance)
-        operation_details = instance.node.operations[name]
-        operation_inputs = operation_details.get('inputs', {})
-        operation_inputs.update(inputs or {})
-        return cls(name=name,
-                   actor=instance,
-                   operation_mapping=operation_details.get('operation', ''),
-                   inputs=operation_inputs,
-                   *args,
-                   **kwargs)
+        return cls._instance(instance=instance,
+                             name=name,
+                             operation_details=instance.node.operations[name],
+                             inputs=inputs,
+                             plugins=instance.node.plugins or [],
+                             *args,
+                             **kwargs)
 
     @classmethod
     def relationship_instance(cls, instance, name, operation_end, inputs=None, *args, **kwargs):
@@ -125,12 +124,33 @@ class OperationTask(BaseTask):
                 cls.TARGET_OPERATION, cls.SOURCE_OPERATION
             ))
         operation_details = getattr(instance.relationship, operation_end)[name]
+        if operation_end == cls.SOURCE_OPERATION:
+            plugins = instance.relationship.source_node.plugins
+        else:
+            plugins = instance.relationship.target_node.plugins
+        return cls._instance(instance=instance,
+                             name=name,
+                             operation_details=operation_details,
+                             inputs=inputs,
+                             plugins=plugins or [],
+                             *args,
+                             **kwargs)
+
+    @classmethod
+    def _instance(cls, instance, name, operation_details, inputs, plugins, *args, **kwargs):
+        operation_mapping = operation_details.get('operation')
         operation_inputs = operation_details.get('inputs', {})
         operation_inputs.update(inputs or {})
+        plugin_name = operation_details.get('plugin')
+        matching_plugins = [p for p in plugins if p['name'] == plugin_name]
+        # All matching plugins should have identical package_name/package_version, so it's safe to
+        # take the first found.
+        plugin = matching_plugins[0] if matching_plugins else {}
         return cls(actor=instance,
                    name=name,
-                   operation_mapping=operation_details.get('operation'),
+                   operation_mapping=operation_mapping,
                    inputs=operation_inputs,
+                   plugin=plugin,
                    *args,
                    **kwargs)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index 0be17fe..08cf26e 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -106,8 +106,9 @@ class OperationTask(BaseTask):
     def __init__(self, api_task, *args, **kwargs):
         super(OperationTask, self).__init__(id=api_task.id, **kwargs)
         self._workflow_context = api_task._workflow_context
-        base_task_model = api_task._workflow_context.model.task.model_cls
+        model = api_task._workflow_context.model
 
+        base_task_model = model.task.model_cls
         if isinstance(api_task.actor, models.NodeInstance):
             context_class = operation_context.NodeOperationContext
             task_model_cls = base_task_model.as_node_instance
@@ -117,7 +118,13 @@ class OperationTask(BaseTask):
         else:
             raise RuntimeError('No operation context could be created for {actor.model_cls}'
                                .format(actor=api_task.actor))
-
+        plugin = api_task.plugin
+        plugins = model.plugin.list(filters={'package_name': plugin.get('package_name', ''),
+                                             'package_version': plugin.get('package_version', '')},
+                                    include=['id'])
+        # Validation during installation ensures that at most one plugin can exists with provided
+        # package_name and package_version
+        plugin_id = plugins[0].id if plugins else None
         operation_task = task_model_cls(
             name=api_task.name,
             operation_mapping=api_task.operation_mapping,
@@ -127,6 +134,7 @@ class OperationTask(BaseTask):
             max_attempts=api_task.max_attempts,
             retry_interval=api_task.retry_interval,
             ignore_failure=api_task.ignore_failure,
+            plugin_id=plugin_id
         )
         self._workflow_context.model.task.put(operation_task)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/__init__.py b/aria/orchestrator/workflows/executor/__init__.py
index 16b6c9b..414a740 100644
--- a/aria/orchestrator/workflows/executor/__init__.py
+++ b/aria/orchestrator/workflows/executor/__init__.py
@@ -18,5 +18,5 @@ Executors for task execution
 """
 
 
-from . import blocking, multiprocess, thread
+from . import process, thread
 from .base import BaseExecutor

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index ba44124..4ae046d 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -17,17 +17,15 @@
 Base executor module
 """
 
+from aria import logger
 from aria.orchestrator import events
 
 
-class BaseExecutor(object):
+class BaseExecutor(logger.LoggerMixin):
     """
     Base class for executors for running tasks
     """
 
-    def __init__(self, *args, **kwargs):
-        pass
-
     def execute(self, task):
         """
         Execute a task

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/workflows/executor/blocking.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/blocking.py b/aria/orchestrator/workflows/executor/blocking.py
deleted file mode 100644
index 9d3a9ba..0000000
--- a/aria/orchestrator/workflows/executor/blocking.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Blocking executor
-"""
-
-from aria.utils import imports
-from .base import BaseExecutor
-
-
-class CurrentThreadBlockingExecutor(BaseExecutor):
-    """
-    Executor which runs tasks in the current thread (blocking)
-    """
-
-    def execute(self, task):
-        self._task_started(task)
-        try:
-            task_func = imports.load_attribute(task.operation_mapping)
-            task_func(ctx=task.context, **task.inputs)
-            self._task_succeeded(task)
-        except BaseException as e:
-            self._task_failed(task, exception=e)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/workflows/executor/multiprocess.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/multiprocess.py b/aria/orchestrator/workflows/executor/multiprocess.py
deleted file mode 100644
index d770e07..0000000
--- a/aria/orchestrator/workflows/executor/multiprocess.py
+++ /dev/null
@@ -1,98 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Multiprocess based executor
-"""
-
-import multiprocessing
-import threading
-
-import jsonpickle
-
-from aria.utils import imports
-from .base import BaseExecutor
-
-
-class MultiprocessExecutor(BaseExecutor):
-    """
-    Executor which runs tasks in a multiprocess environment
-    """
-
-    def __init__(self, pool_size=1, *args, **kwargs):
-        super(MultiprocessExecutor, self).__init__(*args, **kwargs)
-        self._stopped = False
-        self._manager = multiprocessing.Manager()
-        self._queue = self._manager.Queue()
-        self._tasks = {}
-        self._listener_thread = threading.Thread(target=self._listener)
-        self._listener_thread.daemon = True
-        self._listener_thread.start()
-        self._pool = multiprocessing.Pool(processes=pool_size)
-
-    def execute(self, task):
-        self._tasks[task.id] = task
-        self._pool.apply_async(_multiprocess_handler, args=(
-            self._queue,
-            task.context,
-            task.id,
-            task.operation_mapping,
-            task.inputs))
-
-    def close(self):
-        self._pool.close()
-        self._stopped = True
-        self._pool.join()
-        self._listener_thread.join()
-
-    def _listener(self):
-        while not self._stopped:
-            try:
-                message = self._queue.get(timeout=1)
-                if message.type == 'task_started':
-                    self._task_started(self._tasks[message.task_id])
-                elif message.type == 'task_succeeded':
-                    self._task_succeeded(self._remove_task(message.task_id))
-                elif message.type == 'task_failed':
-                    self._task_failed(self._remove_task(message.task_id),
-                                      exception=jsonpickle.loads(message.exception))
-                else:
-                    # TODO: something
-                    raise RuntimeError()
-            # Daemon threads
-            except BaseException:
-                pass
-
-    def _remove_task(self, task_id):
-        return self._tasks.pop(task_id)
-
-
-class _MultiprocessMessage(object):
-
-    def __init__(self, type, task_id, exception=None):
-        self.type = type
-        self.task_id = task_id
-        self.exception = exception
-
-
-def _multiprocess_handler(queue, ctx, task_id, operation_mapping, operation_inputs):
-    queue.put(_MultiprocessMessage(type='task_started', task_id=task_id))
-    try:
-        task_func = imports.load_attribute(operation_mapping)
-        task_func(ctx=ctx, **operation_inputs)
-        queue.put(_MultiprocessMessage(type='task_succeeded', task_id=task_id))
-    except BaseException as e:
-        queue.put(_MultiprocessMessage(type='task_failed', task_id=task_id,
-                                       exception=jsonpickle.dumps(e)))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
new file mode 100644
index 0000000..1a47d4c
--- /dev/null
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -0,0 +1,277 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Subprocess based executor
+"""
+
+# pylint: disable=wrong-import-position
+
+import sys
+import os
+
+# As part of the process executor implementation, subprocess are started with this module as their
+# entry point. We thus remove this module's directory from the python path if it happens to be
+# there
+script_dir = os.path.dirname(__file__)
+if script_dir in sys.path:
+    sys.path.remove(script_dir)
+
+import io
+import threading
+import socket
+import struct
+import subprocess
+import tempfile
+import Queue
+
+import jsonpickle
+
+from aria.utils import imports
+from aria.orchestrator.workflows.executor import base
+
+_IS_WIN = os.name == 'nt'
+
+_INT_FMT = 'I'
+_INT_SIZE = struct.calcsize(_INT_FMT)
+
+
+class ProcessExecutor(base.BaseExecutor):
+    """
+    Executor which runs tasks in a subprocess environment
+    """
+
+    def __init__(self, plugin_manager=None, python_path=None, *args, **kwargs):
+        super(ProcessExecutor, self).__init__(*args, **kwargs)
+        self._plugin_manager = plugin_manager
+
+        # Optional list of additional directories that should be added to
+        # subprocesses python path
+        self._python_path = python_path or []
+
+        # Flag that denotes whether this executor has been stopped
+        self._stopped = False
+
+        # Contains reference to all currently running tasks
+        self._tasks = {}
+
+        # Server socket used to accept task status messages from subprocesses
+        self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self._server_socket.bind(('localhost', 0))
+        self._server_socket.listen(10)
+        self._server_port = self._server_socket.getsockname()[1]
+
+        # Used to send a "closed" message to the listener when this executor is closed
+        self._messenger = _Messenger(task_id=None, port=self._server_port)
+
+        # Queue object used by the listener thread to notify this constructed it has started
+        # (see last line of this __init__ method)
+        self._listener_started = Queue.Queue()
+
+        # Listener thread to handle subprocesses task status messages
+        self._listener_thread = threading.Thread(target=self._listener)
+        self._listener_thread.daemon = True
+        self._listener_thread.start()
+
+        # Wait for listener thread to actually start before returning
+        self._listener_started.get(timeout=60)
+
+    def close(self):
+        if self._stopped:
+            return
+        self._stopped = True
+        # Listener thread may be blocked on "accept" call. This will wake it up with an explicit
+        # "closed" message
+        self._messenger.closed()
+        self._server_socket.close()
+        self._listener_thread.join(timeout=60)
+
+    def execute(self, task):
+        self._check_closed()
+        self._tasks[task.id] = task
+
+        # Temporary file used to pass arguments to the started subprocess
+        file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json')
+        os.close(file_descriptor)
+        with open(arguments_json_path, 'wb') as f:
+            f.write(jsonpickle.dumps({
+                'task_id': task.id,
+                'operation_mapping': task.operation_mapping,
+                'operation_inputs': task.inputs,
+                'port': self._server_port
+            }))
+
+        env = os.environ.copy()
+        # See _update_env for plugin_prefix usage
+        if task.plugin_id and self._plugin_manager:
+            plugin_prefix = self._plugin_manager.get_plugin_prefix(task.plugin)
+        else:
+            plugin_prefix = None
+        self._update_env(env=env, plugin_prefix=plugin_prefix)
+        # Asynchronously start the operation in a subprocess
+        subprocess.Popen(
+            '{0} {1} {2}'.format(sys.executable, __file__, arguments_json_path),
+            env=env,
+            shell=True)
+
+    def _remove_task(self, task_id):
+        return self._tasks.pop(task_id)
+
+    def _listener(self):
+        # Notify __init__ method this thread has actually started
+        self._listener_started.put(True)
+        while not self._stopped:
+            try:
+                # Accept messages written to the server socket
+                message = self._recv_message()
+                message_type = message['type']
+                if message_type == 'closed':
+                    break
+                task_id = message['task_id']
+                if message_type == 'started':
+                    self._task_started(self._tasks[task_id])
+                elif message_type == 'succeeded':
+                    self._task_succeeded(self._remove_task(task_id))
+                elif message_type == 'failed':
+                    self._task_failed(self._remove_task(task_id),
+                                      exception=message['exception'])
+                else:
+                    raise RuntimeError('Invalid state')
+            except BaseException as e:
+                self.logger.debug('Error in process executor listener: {0}'.format(e))
+
+    def _recv_message(self):
+        connection, _ = self._server_socket.accept()
+        try:
+            message_len, = struct.unpack(_INT_FMT, self._recv_bytes(connection, _INT_SIZE))
+            return jsonpickle.loads(self._recv_bytes(connection, message_len))
+        finally:
+            connection.close()
+
+    @staticmethod
+    def _recv_bytes(connection, count):
+        result = io.BytesIO()
+        while True:
+            if not count:
+                return result.getvalue()
+            read = connection.recv(count)
+            if not read:
+                return result.getvalue()
+            result.write(read)
+            count -= len(read)
+
+    def _check_closed(self):
+        if self._stopped:
+            raise RuntimeError('Executor closed')
+
+    def _update_env(self, env, plugin_prefix):
+        pythonpath_dirs = []
+        # If this is a plugin operation, plugin prefix will point to where
+        # This plugin is installed.
+        # We update the environment variables that the subprocess will be started with based on it
+        if plugin_prefix:
+
+            # Update PATH environment variable to include plugin's bin dir
+            bin_dir = 'Scripts' if _IS_WIN else 'bin'
+            env['PATH'] = '{0}{1}{2}'.format(
+                os.path.join(plugin_prefix, bin_dir),
+                os.pathsep,
+                env.get('PATH', ''))
+
+            # Update PYTHONPATH environment variable to include plugin's site-packages
+            # directories
+            if _IS_WIN:
+                pythonpath_dirs = [os.path.join(plugin_prefix, 'Lib', 'site-packages')]
+            else:
+                # In some linux environments, there will be both a lib and a lib64 directory
+                # with the latter, containing compiled packages.
+                pythonpath_dirs = [os.path.join(
+                    plugin_prefix, 'lib{0}'.format(b),
+                    'python{0}.{1}'.format(sys.version_info[0], sys.version_info[1]),
+                    'site-packages') for b in ['', '64']]
+
+        # Add used supplied directories to injected PYTHONPATH
+        pythonpath_dirs.extend(self._python_path)
+
+        if pythonpath_dirs:
+            env['PYTHONPATH'] = '{0}{1}{2}'.format(
+                os.pathsep.join(pythonpath_dirs),
+                os.pathsep,
+                env.get('PYTHONPATH', ''))
+
+
+class _Messenger(object):
+
+    def __init__(self, task_id, port):
+        self.task_id = task_id
+        self.port = port
+
+    def started(self):
+        """Task started message"""
+        self._send_message(type='started')
+
+    def succeeded(self):
+        """Task succeeded message"""
+        self._send_message(type='succeeded')
+
+    def failed(self, exception):
+        """Task failed message"""
+        self._send_message(type='failed', exception=exception)
+
+    def closed(self):
+        """Executor closed message"""
+        self._send_message(type='closed')
+
+    def _send_message(self, type, exception=None):
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.connect(('localhost', self.port))
+        try:
+            data = jsonpickle.dumps({
+                'type': type,
+                'task_id': self.task_id,
+                'exception': exception
+            })
+            sock.send(struct.pack(_INT_FMT, len(data)))
+            sock.sendall(data)
+        finally:
+            sock.close()
+
+
+def _main():
+    arguments_json_path = sys.argv[1]
+    with open(arguments_json_path) as f:
+        arguments = jsonpickle.loads(f.read())
+
+    # arguments_json_path is a temporary file created by the parent process.
+    # so we remove it here
+    os.remove(arguments_json_path)
+
+    task_id = arguments['task_id']
+    port = arguments['port']
+    messenger = _Messenger(task_id=task_id, port=port)
+
+    operation_mapping = arguments['operation_mapping']
+    operation_inputs = arguments['operation_inputs']
+    ctx = None
+    messenger.started()
+    try:
+        task_func = imports.load_attribute(operation_mapping)
+        task_func(ctx=ctx, **operation_inputs)
+        messenger.succeeded()
+    except BaseException as e:
+        messenger.failed(exception=e)
+
+if __name__ == '__main__':
+    _main()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 76ceefd..1a6ad9f 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -26,7 +26,9 @@ from .base import BaseExecutor
 
 class ThreadExecutor(BaseExecutor):
     """
-    Executor which runs tasks in a separate thread
+    Executor which runs tasks in a separate thread. It's easier writing tests
+    using this executor rather than the full blown subprocess executor.
+    Note: This executor is not capable of running plugin operations.
     """
 
     def __init__(self, pool_size=1, *args, **kwargs):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/storage/models.py
----------------------------------------------------------------------
diff --git a/aria/storage/models.py b/aria/storage/models.py
index 6302e66..0a1027b 100644
--- a/aria/storage/models.py
+++ b/aria/storage/models.py
@@ -344,8 +344,7 @@ class Node(SQLModelBase):
     min_number_of_instances = Column(Integer, nullable=False)
     number_of_instances = Column(Integer, nullable=False)
     planned_number_of_instances = Column(Integer, nullable=False)
-    plugins = Column(Dict)
-    plugins_to_install = Column(Dict)
+    plugins = Column(List)
     properties = Column(Dict)
     operations = Column(Dict)
     type = Column(Text, nullable=False, index=True)
@@ -474,14 +473,13 @@ class Plugin(SQLModelBase):
     distribution = Column(Text)
     distribution_release = Column(Text)
     distribution_version = Column(Text)
-    excluded_wheels = Column(Dict)
     package_name = Column(Text, nullable=False, index=True)
     package_source = Column(Text)
     package_version = Column(Text)
-    supported_platform = Column(Dict)
-    supported_py_versions = Column(Dict)
+    supported_platform = Column(Text)
+    supported_py_versions = Column(List)
     uploaded_at = Column(DateTime, nullable=False, index=True)
-    wheels = Column(Dict, nullable=False)
+    wheels = Column(List, nullable=False)
 
 
 class Task(SQLModelBase):
@@ -550,6 +548,11 @@ class Task(SQLModelBase):
     name = Column(String)
     operation_mapping = Column(String)
     inputs = Column(Dict)
+    plugin_id = foreign_key(Plugin.id, nullable=True)
+
+    @declared_attr
+    def plugin(cls):
+        return one_to_many_relationship(cls, Plugin, cls.plugin_id)
 
     @declared_attr
     def execution(cls):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/utils/plugin.py
----------------------------------------------------------------------
diff --git a/aria/utils/plugin.py b/aria/utils/plugin.py
new file mode 100644
index 0000000..b7f94a1
--- /dev/null
+++ b/aria/utils/plugin.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import wagon
+
+
+def create(source, destination_dir):
+    return wagon.create(source=source, archive_destination_dir=destination_dir)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/requirements.txt
----------------------------------------------------------------------
diff --git a/requirements.txt b/requirements.txt
index 31b0b79..0005a5e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -24,4 +24,5 @@ Jinja2==2.8
 shortuuid==0.4.3
 CacheControl[filecache]==0.11.6
 clint==0.5.1
-SQLAlchemy==1.1.4
\ No newline at end of file
+SQLAlchemy==1.1.4
+wagon==0.5.0

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index e2e3d2f..26088e0 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -152,3 +152,19 @@ def get_deployment(blueprint):
         outputs={},
         scaling_groups={},
     )
+
+
+def get_plugin(package_name='package', package_version='0.1'):
+    return models.Plugin(
+        archive_name='archive_name',
+        distribution='distribution',
+        distribution_release='dist_release',
+        distribution_version='dist_version',
+        package_name=package_name,
+        package_source='source',
+        package_version=package_version,
+        supported_platform='any',
+        supported_py_versions=['python27'],
+        uploaded_at=datetime.now(),
+        wheels=[],
+    )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/orchestrator/workflows/api/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/api/test_task.py b/tests/orchestrator/workflows/api/test_task.py
index 1a90338..58e387f 100644
--- a/tests/orchestrator/workflows/api/test_task.py
+++ b/tests/orchestrator/workflows/api/test_task.py
@@ -39,9 +39,12 @@ class TestOperationTask(object):
 
     def test_node_operation_task_creation(self, ctx):
         operation_name = 'aria.interfaces.lifecycle.create'
-        op_details = {'operation': True}
+        op_details = {'operation': True, 'plugin': 'plugin'}
         node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
         node.operations[operation_name] = op_details
+        node.plugins = [{'name': 'plugin',
+                         'package_name': 'package',
+                         'package_version': '0.1'}]
         ctx.model.node.update(node)
         node_instance = \
             ctx.model.node_instance.get_by_name(mock.models.DEPENDENT_NODE_INSTANCE_NAME)
@@ -66,12 +69,18 @@ class TestOperationTask(object):
         assert api_task.retry_interval == retry_interval
         assert api_task.max_attempts == max_attempts
         assert api_task.ignore_failure == ignore_failure
+        assert api_task.plugin == {'name': 'plugin',
+                                   'package_name': 'package',
+                                   'package_version': '0.1'}
 
-    def test_relationship_operation_task_creation(self, ctx):
+    def test_source_relationship_operation_task_creation(self, ctx):
         operation_name = 'aria.interfaces.relationship_lifecycle.preconfigure'
-        op_details = {'operation': True}
+        op_details = {'operation': True, 'plugin': 'plugin'}
         relationship = ctx.model.relationship.list()[0]
         relationship.source_operations[operation_name] = op_details
+        relationship.source_node.plugins = [{'name': 'plugin',
+                                             'package_name': 'package',
+                                             'package_version': '0.1'}]
         relationship_instance = ctx.model.relationship_instance.list()[0]
         inputs = {'inputs': True}
         max_attempts = 10
@@ -92,6 +101,41 @@ class TestOperationTask(object):
         assert api_task.inputs == inputs
         assert api_task.retry_interval == retry_interval
         assert api_task.max_attempts == max_attempts
+        assert api_task.plugin == {'name': 'plugin',
+                                   'package_name': 'package',
+                                   'package_version': '0.1'}
+
+    def test_target_relationship_operation_task_creation(self, ctx):
+        operation_name = 'aria.interfaces.relationship_lifecycle.preconfigure'
+        op_details = {'operation': True, 'plugin': 'plugin'}
+        relationship = ctx.model.relationship.list()[0]
+        relationship.target_operations[operation_name] = op_details
+        relationship.target_node.plugins = [{'name': 'plugin',
+                                             'package_name': 'package',
+                                             'package_version': '0.1'}]
+        relationship_instance = ctx.model.relationship_instance.list()[0]
+        inputs = {'inputs': True}
+        max_attempts = 10
+        retry_interval = 10
+
+        with context.workflow.current.push(ctx):
+            api_task = api.task.OperationTask.relationship_instance(
+                name=operation_name,
+                instance=relationship_instance,
+                operation_end=api.task.OperationTask.TARGET_OPERATION,
+                inputs=inputs,
+                max_attempts=max_attempts,
+                retry_interval=retry_interval)
+
+        assert api_task.name == '{0}.{1}'.format(operation_name, relationship_instance.id)
+        assert api_task.operation_mapping is True
+        assert api_task.actor == relationship_instance
+        assert api_task.inputs == inputs
+        assert api_task.retry_interval == retry_interval
+        assert api_task.max_attempts == max_attempts
+        assert api_task.plugin == {'name': 'plugin',
+                                   'package_name': 'package',
+                                   'package_version': '0.1'}
 
     def test_operation_task_default_values(self, ctx):
         dependency_node_instance = ctx.model.node_instance.get_by_name(
@@ -106,6 +150,7 @@ class TestOperationTask(object):
         assert task.retry_interval == ctx._task_retry_interval
         assert task.max_attempts == ctx._task_max_attempts
         assert task.ignore_failure == ctx._task_ignore_failure
+        assert task.plugin == {}
 
 
 class TestWorkflowTask(object):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/orchestrator/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py
index c572501..6c3825c 100644
--- a/tests/orchestrator/workflows/core/test_task.py
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -42,24 +42,30 @@ class TestOperationTask(object):
         with workflow_context.current.push(ctx):
             api_task = api.task.OperationTask.node_instance(
                 instance=node_instance,
-                name='aria.interfaces.lifecycle.create',
-            )
-
+                name='aria.interfaces.lifecycle.create')
             core_task = core.task.OperationTask(api_task=api_task)
-
         return api_task, core_task
 
     def test_operation_task_creation(self, ctx):
+        storage_plugin = mock.models.get_plugin(package_name='p1', package_version='0.1')
+        storage_plugin_other = mock.models.get_plugin(package_name='p0', package_version='0.0')
+        ctx.model.plugin.put(storage_plugin_other)
+        ctx.model.plugin.put(storage_plugin)
         node_instance = \
             ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+        node = node_instance.node
+        node.plugins = [{'name': 'plugin1',
+                         'package_name': 'p1',
+                         'package_version': '0.1'}]
+        node.operations['aria.interfaces.lifecycle.create'] = {'plugin': 'plugin1'}
         api_task, core_task = self._create_operation_task(ctx, node_instance)
         storage_task = ctx.model.task.get_by_name(core_task.name)
-
         assert core_task.model_task == storage_task
         assert core_task.name == api_task.name
         assert core_task.operation_mapping == api_task.operation_mapping
         assert core_task.actor == api_task.actor == node_instance
         assert core_task.inputs == api_task.inputs == storage_task.inputs
+        assert core_task.plugin == storage_plugin
 
     def test_operation_task_edit_locked_attribute(self, ctx):
         node_instance = \

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 654542c..7a11524 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import logging
+import os
 import uuid
 from contextlib import contextmanager
 
@@ -28,57 +29,34 @@ except ImportError:
     _celery = None
     app = None
 
+import aria
 from aria.storage import models
 from aria.orchestrator import events
 from aria.orchestrator.workflows.executor import (
     thread,
-    multiprocess,
-    blocking,
+    process,
     # celery
 )
 
 
-class TestExecutor(object):
-
-    @pytest.mark.parametrize('executor_cls,executor_kwargs', [
-        (thread.ThreadExecutor, {'pool_size': 1}),
-        (thread.ThreadExecutor, {'pool_size': 2}),
-        (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
-        (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
-        (blocking.CurrentThreadBlockingExecutor, {}),
-        # (celery.CeleryExecutor, {'app': app})
-    ])
-    def test_execute(self, executor_cls, executor_kwargs):
-        self.executor = executor_cls(**executor_kwargs)
-        expected_value = 'value'
-        successful_task = MockTask(mock_successful_task)
-        failing_task = MockTask(mock_failing_task)
-        task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value})
-
-        for task in [successful_task, failing_task, task_with_inputs]:
-            self.executor.execute(task)
-
-        @retrying.retry(stop_max_delay=10000, wait_fixed=100)
-        def assertion():
-            assert successful_task.states == ['start', 'success']
-            assert failing_task.states == ['start', 'failure']
-            assert task_with_inputs.states == ['start', 'failure']
-            assert isinstance(failing_task.exception, MockException)
-            assert isinstance(task_with_inputs.exception, MockException)
-            assert task_with_inputs.exception.message == expected_value
-        assertion()
-
-    def setup_method(self):
-        events.start_task_signal.connect(start_handler)
-        events.on_success_task_signal.connect(success_handler)
-        events.on_failure_task_signal.connect(failure_handler)
-
-    def teardown_method(self):
-        events.start_task_signal.disconnect(start_handler)
-        events.on_success_task_signal.disconnect(success_handler)
-        events.on_failure_task_signal.disconnect(failure_handler)
-        if hasattr(self, 'executor'):
-            self.executor.close()
+def test_execute(executor):
+    expected_value = 'value'
+    successful_task = MockTask(mock_successful_task)
+    failing_task = MockTask(mock_failing_task)
+    task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value})
+
+    for task in [successful_task, failing_task, task_with_inputs]:
+        executor.execute(task)
+
+    @retrying.retry(stop_max_delay=10000, wait_fixed=100)
+    def assertion():
+        assert successful_task.states == ['start', 'success']
+        assert failing_task.states == ['start', 'failure']
+        assert task_with_inputs.states == ['start', 'failure']
+        assert isinstance(failing_task.exception, MockException)
+        assert isinstance(task_with_inputs.exception, MockException)
+        assert task_with_inputs.exception.message == expected_value
+    assertion()
 
 
 def mock_successful_task(**_):
@@ -116,9 +94,10 @@ class MockTask(object):
         self.logger = logging.getLogger()
         self.name = name
         self.inputs = inputs or {}
-        self.context = ctx or None
+        self.context = ctx
         self.retry_count = 0
         self.max_attempts = 1
+        self.plugin_id = None
 
         for state in models.Task.STATES:
             setattr(self, state.upper(), state)
@@ -128,14 +107,36 @@ class MockTask(object):
         yield self
 
 
-def start_handler(task, *args, **kwargs):
-    task.states.append('start')
-
-
-def success_handler(task, *args, **kwargs):
-    task.states.append('success')
-
-
-def failure_handler(task, exception, *args, **kwargs):
-    task.states.append('failure')
-    task.exception = exception
+@pytest.fixture(params=[
+    (thread.ThreadExecutor, {'pool_size': 1}),
+    (thread.ThreadExecutor, {'pool_size': 2}),
+    # subprocess needs to load a tests module so we explicitly add the root directory as if
+    # the project has been installed in editable mode
+    (process.ProcessExecutor, {'python_path': [os.path.dirname(os.path.dirname(aria.__file__))]}),
+    # (celery.CeleryExecutor, {'app': app})
+])
+def executor(request):
+    executor_cls, executor_kwargs = request.param
+    result = executor_cls(**executor_kwargs)
+    yield result
+    result.close()
+
+
+@pytest.fixture(autouse=True)
+def register_signals():
+    def start_handler(task, *args, **kwargs):
+        task.states.append('start')
+
+    def success_handler(task, *args, **kwargs):
+        task.states.append('success')
+
+    def failure_handler(task, exception, *args, **kwargs):
+        task.states.append('failure')
+        task.exception = exception
+    events.start_task_signal.connect(start_handler)
+    events.on_success_task_signal.connect(success_handler)
+    events.on_failure_task_signal.connect(failure_handler)
+    yield
+    events.start_task_signal.disconnect(start_handler)
+    events.on_success_task_signal.disconnect(success_handler)
+    events.on_failure_task_signal.disconnect(failure_handler)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
new file mode 100644
index 0000000..364d354
--- /dev/null
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import uuid
+import Queue
+from contextlib import contextmanager
+
+import pytest
+
+from aria import application_model_storage
+from aria.storage import models
+from aria.utils.plugin import create as create_plugin
+from aria.storage.sql_mapi import SQLAlchemyModelAPI
+from aria.orchestrator import events
+from aria.orchestrator import plugin
+from aria.orchestrator.workflows.executor import process
+
+
+import tests.storage
+import tests.resources
+
+
+class TestProcessExecutor(object):
+
+    def test_plugin_execution(self, executor, mock_plugin):
+        task = MockTask(plugin=mock_plugin,
+                        operation='mock_plugin1.operation')
+
+        queue = Queue.Queue()
+
+        def handler(_, exception=None):
+            queue.put(exception)
+
+        events.on_success_task_signal.connect(handler)
+        events.on_failure_task_signal.connect(handler)
+        try:
+            executor.execute(task)
+            error = queue.get(timeout=60)
+            # tests/resources/plugins/mock-plugin1 is the plugin installed
+            # during this tests setup. The module mock_plugin1 contains a single
+            # operation named "operation" which calls an entry point defined in the plugin's
+            # setup.py. This entry points simply prints 'mock-plugin-output' to stdout.
+            # The "operation" operation that called this subprocess, then raises a RuntimeError
+            # with that subprocess output as the error message.
+            # This is what we assert here. This tests checks that both the PYTHONPATH (operation)
+            # and PATH (entry point) are properly updated in the subprocess in which the task is
+            # running.
+            assert isinstance(error, RuntimeError)
+            assert error.message == 'mock-plugin-output'
+        finally:
+            events.on_success_task_signal.disconnect(handler)
+            events.on_failure_task_signal.disconnect(handler)
+
+    def test_closed(self, executor):
+        executor.close()
+        with pytest.raises(RuntimeError) as exc_info:
+            executor.execute(task=None)
+        assert 'closed' in exc_info.value.message
+
+
+@pytest.fixture
+def model(tmpdir):
+    api_kwargs = tests.storage.get_sqlite_api_kwargs(str(tmpdir))
+    result = application_model_storage(SQLAlchemyModelAPI, api_kwargs=api_kwargs)
+    yield result
+    tests.storage.release_sqlite_storage(result)
+
+
+@pytest.fixture
+def plugins_dir(tmpdir):
+    result = tmpdir.join('plugins')
+    result.mkdir()
+    return str(result)
+
+
+@pytest.fixture
+def plugin_manager(model, plugins_dir):
+    return plugin.PluginManager(model=model, plugins_dir=plugins_dir)
+
+
+@pytest.fixture
+def executor(plugin_manager):
+    result = process.ProcessExecutor(plugin_manager=plugin_manager)
+    yield result
+    result.close()
+
+
+@pytest.fixture
+def mock_plugin(plugin_manager, tmpdir):
+    source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1')
+    plugin_path = create_plugin(source=source, destination_dir=str(tmpdir))
+    return plugin_manager.install(source=plugin_path)
+
+
+class MockTask(object):
+
+    INFINITE_RETRIES = models.Task.INFINITE_RETRIES
+
+    def __init__(self, plugin, operation):
+        self.id = str(uuid.uuid4())
+        self.operation_mapping = operation
+        self.logger = logging.getLogger()
+        self.name = operation
+        self.inputs = {}
+        self.context = None
+        self.retry_count = 0
+        self.max_attempts = 1
+        self.plugin_id = plugin.id
+        self.plugin = plugin
+
+        for state in models.Task.STATES:
+            setattr(self, state.upper(), state)
+
+    @contextmanager
+    def _update(self):
+        yield self

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/resources/__init__.py
----------------------------------------------------------------------
diff --git a/tests/resources/__init__.py b/tests/resources/__init__.py
new file mode 100644
index 0000000..3ed601f
--- /dev/null
+++ b/tests/resources/__init__.py
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+
+DIR = os.path.dirname(__file__)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/resources/plugins/mock-plugin1/mock_plugin1.py
----------------------------------------------------------------------
diff --git a/tests/resources/plugins/mock-plugin1/mock_plugin1.py b/tests/resources/plugins/mock-plugin1/mock_plugin1.py
new file mode 100644
index 0000000..25a00d1
--- /dev/null
+++ b/tests/resources/plugins/mock-plugin1/mock_plugin1.py
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import subprocess
+
+
+def operation(**_):
+    process = subprocess.Popen(['mock-plugin1'], stdout=subprocess.PIPE)
+    output, _ = process.communicate()
+    assert not process.poll()
+    raise RuntimeError(output.strip())
+
+
+def console_script_entry_point():
+    print 'mock-plugin-output'

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/resources/plugins/mock-plugin1/setup.py
----------------------------------------------------------------------
diff --git a/tests/resources/plugins/mock-plugin1/setup.py b/tests/resources/plugins/mock-plugin1/setup.py
new file mode 100644
index 0000000..88d354d
--- /dev/null
+++ b/tests/resources/plugins/mock-plugin1/setup.py
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from setuptools import setup
+
+
+setup(
+    name='mock-plugin1',
+    version='0.1',
+    py_modules=['mock_plugin1'],
+    entry_points={
+        'console_scripts': [
+            'mock-plugin1 = mock_plugin1:console_script_entry_point'
+        ]
+    }
+)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/storage/test_models.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_models.py b/tests/storage/test_models.py
index 0ae5d1c..0651957 100644
--- a/tests/storage/test_models.py
+++ b/tests/storage/test_models.py
@@ -111,7 +111,9 @@ def _node_instances_storage():
 def _execution_storage():
     storage = _deployment_storage()
     execution = mock.models.get_execution(storage.deployment.list()[0])
+    plugin = mock.models.get_plugin()
     storage.execution.put(execution)
+    storage.plugin.put(plugin)
     return storage
 
 
@@ -531,34 +533,31 @@ class TestNode(object):
     @pytest.mark.parametrize(
         'is_valid, name, deploy_number_of_instances, max_number_of_instances, '
         'min_number_of_instances, number_of_instances, planned_number_of_instances, plugins, '
-        'plugins_to_install, properties, operations, type, type_hierarchy',
+        'properties, operations, type, type_hierarchy',
         [
-            (False, m_cls, 1, 1, 1, 1, 1, {}, {}, {}, {}, 'type', []),
-            (False, 'name', m_cls, 1, 1, 1, 1, {}, {}, {}, {}, 'type', []),
-            (False, 'name', 1, m_cls, 1, 1, 1, {}, {}, {}, {}, 'type', []),
-            (False, 'name', 1, 1, m_cls, 1, 1, {}, {}, {}, {}, 'type', []),
-            (False, 'name', 1, 1, 1, m_cls, 1, {}, {}, {}, {}, 'type', []),
-            (False, 'name', 1, 1, 1, 1, m_cls, {}, {}, {}, {}, 'type', []),
-            (False, 'name', 1, 1, 1, 1, 1, m_cls, {}, {}, {}, 'type', []),
-            (False, 'name', 1, 1, 1, 1, 1, {}, m_cls, {}, {}, 'type', []),
-            (False, 'name', 1, 1, 1, 1, 1, {}, {}, m_cls, {}, 'type', []),
-            (False, 'name', 1, 1, 1, 1, 1, {}, {}, {}, m_cls, 'type', []),
-            (False, 'name', 1, 1, 1, 1, 1, {}, {}, {}, {}, m_cls, []),
-            (False, 'name', 1, 1, 1, 1, 1, {}, {}, {}, {}, 'type', m_cls),
-
-            (True, 'name', 1, 1, 1, 1, 1, {}, {}, {}, {}, 'type', []),
-            (True, 'name', 1, 1, 1, 1, 1, None, {}, {}, {}, 'type', []),
-            (True, 'name', 1, 1, 1, 1, 1, {}, None, {}, {}, 'type', []),
-            (True, 'name', 1, 1, 1, 1, 1, {}, {}, None, {}, 'type', []),
-            (True, 'name', 1, 1, 1, 1, 1, {}, {}, {}, None, 'type', []),
-            (True, 'name', 1, 1, 1, 1, 1, {}, {}, {}, {}, 'type', []),
-            (True, 'name', 1, 1, 1, 1, 1, {}, {}, {}, {}, 'type', None),
+            (False, m_cls, 1, 1, 1, 1, 1, [], {}, {}, 'type', []),
+            (False, 'name', m_cls, 1, 1, 1, 1, [], {}, {}, 'type', []),
+            (False, 'name', 1, m_cls, 1, 1, 1, [], {}, {}, 'type', []),
+            (False, 'name', 1, 1, m_cls, 1, 1, [], {}, {}, 'type', []),
+            (False, 'name', 1, 1, 1, m_cls, 1, [], {}, {}, 'type', []),
+            (False, 'name', 1, 1, 1, 1, m_cls, [], {}, {}, 'type', []),
+            (False, 'name', 1, 1, 1, 1, 1, m_cls, {}, {}, 'type', []),
+            (False, 'name', 1, 1, 1, 1, 1, [], m_cls, {}, 'type', []),
+            (False, 'name', 1, 1, 1, 1, 1, [], {}, m_cls, 'type', []),
+            (False, 'name', 1, 1, 1, 1, 1, [], {}, {}, m_cls, []),
+            (False, 'name', 1, 1, 1, 1, 1, [], {}, {}, 'type', m_cls),
+
+            (True, 'name', 1, 1, 1, 1, 1, [], {}, {}, 'type', []),
+            (True, 'name', 1, 1, 1, 1, 1, None, {}, {}, 'type', []),
+            (True, 'name', 1, 1, 1, 1, 1, [], None, {}, 'type', []),
+            (True, 'name', 1, 1, 1, 1, 1, [], {}, None, 'type', []),
+            (True, 'name', 1, 1, 1, 1, 1, [], {}, {}, 'type', None),
         ]
     )
     def test_node_model_creation(self, deployment_storage, is_valid, name,
                                  deploy_number_of_instances, max_number_of_instances,
                                  min_number_of_instances, number_of_instances,
-                                 planned_number_of_instances, plugins, plugins_to_install,
+                                 planned_number_of_instances, plugins,
                                  properties, operations, type, type_hierarchy):
         node = _test_model(
             is_valid=is_valid,
@@ -573,7 +572,6 @@ class TestNode(object):
                 number_of_instances=number_of_instances,
                 planned_number_of_instances=planned_number_of_instances,
                 plugins=plugins,
-                plugins_to_install=plugins_to_install,
                 properties=properties,
                 operations=operations,
                 type=type,
@@ -713,58 +711,56 @@ class TestProviderContext(object):
 class TestPlugin(object):
     @pytest.mark.parametrize(
         'is_valid, archive_name, distribution, distribution_release, '
-        'distribution_version, excluded_wheels, package_name, package_source, '
+        'distribution_version, package_name, package_source, '
         'package_version, supported_platform, supported_py_versions, uploaded_at, wheels',
         [
-            (False, m_cls, 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', 'pak_ver',
-             {}, {}, now, {}),
-            (False, 'arc_name', m_cls, 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', 'pak_ver',
-             {}, {}, now, {}),
-            (False, 'arc_name', 'dis_name', m_cls, 'dis_ver', {}, 'pak_name', 'pak_src', 'pak_ver',
-             {}, {}, now, {}),
-            (False, 'arc_name', 'dis_name', 'dis_rel', m_cls, {}, 'pak_name', 'pak_src', 'pak_ver',
-             {}, {}, now, {}),
-            (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', m_cls, 'pak_name', 'pak_src',
-             'pak_ver', {}, {}, now, {}),
-            (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, m_cls, 'pak_src', 'pak_ver',
-             {}, {}, now, {}),
-            (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', m_cls, 'pak_ver',
-             {}, {}, now, {}),
-            (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', m_cls,
-             {}, {}, now, {}),
-            (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src',
-             'pak_ver', m_cls, {}, now, {}),
-            (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src',
-             'pak_ver', {}, m_cls, now, {}),
-            (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src',
-             'pak_ver', {}, {}, m_cls, {}),
-            (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src',
-             'pak_ver', {}, {}, now, m_cls),
-
-            (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src',
-             'pak_ver', {}, {}, now, {}),
-            (True, 'arc_name', None, 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', 'pak_ver',
-             {}, {}, now, {}),
-            (True, 'arc_name', 'dis_name', None, 'dis_ver', {}, 'pak_name', 'pak_src', 'pak_ver',
-             {}, {}, now, {}),
-            (True, 'arc_name', 'dis_name', 'dis_rel', None, {}, 'pak_name', 'pak_src', 'pak_ver',
-             {}, {}, now, {}),
-            (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', None, 'pak_name', 'pak_src',
-             'pak_ver', {}, {}, now, {}),
-            (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', None, 'pak_ver',
-             {}, {}, now, {}),
-            (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', None,
-             {}, {}, now, {}),
-            (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src',
-             'pak_ver', None, {}, now, {}),
-            (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src',
-             'pak_ver', {}, None, now, {}),
-            (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src',
-             'pak_ver', {}, {}, now, {}),
+            (False, m_cls, 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', 'pak_ver',
+             'sup_pla', [], now, []),
+            (False, 'arc_name', m_cls, 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', 'pak_ver',
+             'sup_pla', [], now, []),
+            (False, 'arc_name', 'dis_name', m_cls, 'dis_ver', 'pak_name', 'pak_src', 'pak_ver',
+             'sup_pla', [], now, []),
+            (False, 'arc_name', 'dis_name', 'dis_rel', m_cls, 'pak_name', 'pak_src', 'pak_ver',
+             'sup_pla', [], now, []),
+            (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', m_cls, 'pak_src', 'pak_ver',
+             'sup_pla', [], now, []),
+            (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', m_cls, 'pak_ver',
+             'sup_pla', [], now, []),
+            (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', m_cls,
+             'sup_pla', [], now, []),
+            (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+             'pak_ver', m_cls, [], now, []),
+            (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+             'pak_ver', 'sup_pla', m_cls, now, []),
+            (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+             'pak_ver', 'sup_pla', [], m_cls, []),
+            (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+             'pak_ver', 'sup_pla', [], now, m_cls),
+
+            (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+             'pak_ver', 'sup_pla', [], now, []),
+            (True, 'arc_name', None, 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', 'pak_ver',
+             'sup_pla', [], now, []),
+            (True, 'arc_name', 'dis_name', None, 'dis_ver', 'pak_name', 'pak_src', 'pak_ver',
+             'sup_pla', [], now, []),
+            (True, 'arc_name', 'dis_name', 'dis_rel', None, 'pak_name', 'pak_src', 'pak_ver',
+             'sup_pla', [], now, []),
+            (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+             'pak_ver', 'sup_pla', [], now, []),
+            (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', None, 'pak_ver',
+             'sup_pla', [], now, []),
+            (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', None,
+             'sup_pla', [], now, []),
+            (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+             'pak_ver', None, [], now, []),
+            (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+             'pak_ver', 'sup_pla', None, now, []),
+            (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+             'pak_ver', 'sup_pla', [], now, []),
         ]
     )
     def test_plugin_model_creation(self, empty_storage, is_valid, archive_name, distribution,
-                                   distribution_release, distribution_version, excluded_wheels,
+                                   distribution_release, distribution_version,
                                    package_name, package_source, package_version,
                                    supported_platform, supported_py_versions, uploaded_at, wheels):
         _test_model(is_valid=is_valid,
@@ -776,7 +772,6 @@ class TestPlugin(object):
                         distribution=distribution,
                         distribution_release=distribution_release,
                         distribution_version=distribution_version,
-                        excluded_wheels=excluded_wheels,
                         package_name=package_name,
                         package_source=package_source,
                         package_version=package_version,
@@ -791,34 +786,36 @@ class TestTask(object):
 
     @pytest.mark.parametrize(
         'is_valid, status, due_at, started_at, ended_at, max_attempts, retry_count, '
-        'retry_interval, ignore_failure, name, operation_mapping, inputs',
+        'retry_interval, ignore_failure, name, operation_mapping, inputs, plugin_id',
         [
-            (False, m_cls, now, now, now, 1, 1, 1, True, 'name', 'map', {}),
-            (False, Task.STARTED, m_cls, now, now, 1, 1, 1, True, 'name', 'map', {}),
-            (False, Task.STARTED, now, m_cls, now, 1, 1, 1, True, 'name', 'map', {}),
-            (False, Task.STARTED, now, now, m_cls, 1, 1, 1, True, 'name', 'map', {}),
-            (False, Task.STARTED, now, now, now, m_cls, 1, 1, True, 'name', 'map', {}),
-            (False, Task.STARTED, now, now, now, 1, m_cls, 1, True, 'name', 'map', {}),
-            (False, Task.STARTED, now, now, now, 1, 1, m_cls, True, 'name', 'map', {}),
-            (False, Task.STARTED, now, now, now, 1, 1, 1, True, m_cls, 'map', {}),
-            (False, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', m_cls, {}),
-            (False, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', m_cls),
-
-            (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', {}),
-            (True, Task.STARTED, None, now, now, 1, 1, 1, True, 'name', 'map', {}),
-            (True, Task.STARTED, now, None, now, 1, 1, 1, True, 'name', 'map', {}),
-            (True, Task.STARTED, now, now, None, 1, 1, 1, True, 'name', 'map', {}),
-            (True, Task.STARTED, now, now, now, 1, None, 1, True, 'name', 'map', {}),
-            (True, Task.STARTED, now, now, now, 1, 1, None, True, 'name', 'map', {}),
-            (True, Task.STARTED, now, now, now, 1, 1, 1, None, 'name', 'map', {}),
-            (True, Task.STARTED, now, now, now, 1, 1, 1, True, None, 'map', {}),
-            (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', None, {}),
-            (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', None),
+            (False, m_cls, now, now, now, 1, 1, 1, True, 'name', 'map', {}, '1'),
+            (False, Task.STARTED, m_cls, now, now, 1, 1, 1, True, 'name', 'map', {}, '1'),
+            (False, Task.STARTED, now, m_cls, now, 1, 1, 1, True, 'name', 'map', {}, '1'),
+            (False, Task.STARTED, now, now, m_cls, 1, 1, 1, True, 'name', 'map', {}, '1'),
+            (False, Task.STARTED, now, now, now, m_cls, 1, 1, True, 'name', 'map', {}, '1'),
+            (False, Task.STARTED, now, now, now, 1, m_cls, 1, True, 'name', 'map', {}, '1'),
+            (False, Task.STARTED, now, now, now, 1, 1, m_cls, True, 'name', 'map', {}, '1'),
+            (False, Task.STARTED, now, now, now, 1, 1, 1, True, m_cls, 'map', {}, '1'),
+            (False, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', m_cls, {}, '1'),
+            (False, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', m_cls, '1'),
+            (False, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', {}, m_cls),
+
+            (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', {}, '1'),
+            (True, Task.STARTED, None, now, now, 1, 1, 1, True, 'name', 'map', {}, '1'),
+            (True, Task.STARTED, now, None, now, 1, 1, 1, True, 'name', 'map', {}, '1'),
+            (True, Task.STARTED, now, now, None, 1, 1, 1, True, 'name', 'map', {}, '1'),
+            (True, Task.STARTED, now, now, now, 1, None, 1, True, 'name', 'map', {}, '1'),
+            (True, Task.STARTED, now, now, now, 1, 1, None, True, 'name', 'map', {}, '1'),
+            (True, Task.STARTED, now, now, now, 1, 1, 1, None, 'name', 'map', {}, '1'),
+            (True, Task.STARTED, now, now, now, 1, 1, 1, True, None, 'map', {}, '1'),
+            (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', None, {}, '1'),
+            (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', None, '1'),
+            (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', {}, None),
         ]
     )
     def test_task_model_creation(self, execution_storage, is_valid, status, due_at, started_at,
                                  ended_at, max_attempts, retry_count, retry_interval,
-                                 ignore_failure, name, operation_mapping, inputs):
+                                 ignore_failure, name, operation_mapping, inputs, plugin_id):
         task = _test_model(
             is_valid=is_valid,
             storage=execution_storage,
@@ -837,9 +834,12 @@ class TestTask(object):
                 name=name,
                 operation_mapping=operation_mapping,
                 inputs=inputs,
+                plugin_id=plugin_id,
             ))
         if is_valid:
             assert task.execution == execution_storage.execution.list()[0]
+            if task.plugin_id:
+                assert task.plugin == execution_storage.plugin.list()[0]
 
     def test_task_max_attempts_validation(self):
         def create_task(max_attempts):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/utils/__init__.py
----------------------------------------------------------------------
diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/tests/utils/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/utils/test_plugin.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_plugin.py b/tests/utils/test_plugin.py
new file mode 100644
index 0000000..6f2dd92
--- /dev/null
+++ b/tests/utils/test_plugin.py
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+import pytest
+
+from aria import application_model_storage
+from aria.orchestrator import exceptions
+from aria.orchestrator import plugin
+from aria.utils.plugin import create as create_plugin
+from aria.storage.sql_mapi import SQLAlchemyModelAPI
+
+from .. import storage
+
+
+PACKAGE_NAME = 'mock-plugin'
+PACKAGE_VERSION = '100'
+
+
+class TestPluginManager(object):
+
+    def test_install(self, plugin_manager, mock_plugin, model, plugins_dir):
+        plugin = plugin_manager.install(mock_plugin)
+        assert plugin.package_name == PACKAGE_NAME
+        assert plugin.package_version == PACKAGE_VERSION
+        assert plugin == model.plugin.get(plugin.id)
+        plugin_prefix = os.path.join(plugins_dir, '{0}-{1}'.format(PACKAGE_NAME, PACKAGE_VERSION))
+        assert os.path.isdir(plugin_prefix)
+        assert plugin_prefix == plugin_manager.get_plugin_prefix(plugin)
+
+    def test_install_already_exits(self, plugin_manager, mock_plugin):
+        plugin_manager.install(mock_plugin)
+        with pytest.raises(exceptions.PluginAlreadyExistsError):
+            plugin_manager.install(mock_plugin)
+
+
+@pytest.fixture
+def model():
+    api_kwargs = storage.get_sqlite_api_kwargs()
+    model = application_model_storage(SQLAlchemyModelAPI, api_kwargs=api_kwargs)
+    yield model
+    storage.release_sqlite_storage(model)
+
+
+@pytest.fixture
+def plugins_dir(tmpdir):
+    result = tmpdir.join('plugins')
+    result.mkdir()
+    return str(result)
+
+
+@pytest.fixture
+def plugin_manager(model, plugins_dir):
+    return plugin.PluginManager(model=model, plugins_dir=plugins_dir)
+
+
+@pytest.fixture
+def mock_plugin(tmpdir):
+    source_dir = tmpdir.join('mock_plugin')
+    source_dir.mkdir()
+    setup_py = source_dir.join('setup.py')
+    setup_py.write('from setuptools import setup; setup(name="{0}", version="{1}")'
+                   .format(PACKAGE_NAME, PACKAGE_VERSION))
+    return create_plugin(source=str(source_dir), destination_dir=str(tmpdir))