You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by da...@apache.org on 2016/12/13 15:17:14 UTC
incubator-ariatosca git commit: ARIA-26 TBD [Forced Update!]
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-26-plugin-mechanism 5c3bee47d -> 11072ee33 (forced update)
ARIA-26 TBD
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/11072ee3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/11072ee3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/11072ee3
Branch: refs/heads/ARIA-26-plugin-mechanism
Commit: 11072ee33f06bcd40151de2b8dcd65affa167c44
Parents: 04c9bd0
Author: Dan Kilman <da...@gigaspaces.com>
Authored: Sun Nov 27 16:31:29 2016 +0200
Committer: Dan Kilman <da...@gigaspaces.com>
Committed: Tue Dec 13 17:17:08 2016 +0200
----------------------------------------------------------------------
aria/cli/commands.py | 4 +-
aria/orchestrator/workflows/api/task.py | 42 +++--
aria/orchestrator/workflows/core/task.py | 10 +-
.../orchestrator/workflows/executor/__init__.py | 2 +-
.../orchestrator/workflows/executor/blocking.py | 36 ----
.../workflows/executor/multiprocess.py | 88 +++++----
aria/orchestrator/workflows/executor/thread.py | 3 +-
aria/storage/models.py | 15 +-
aria/utils/plugin.py | 74 ++++++++
requirements.txt | 3 +-
tests/mock/models.py | 16 ++
tests/orchestrator/workflows/api/test_task.py | 51 ++++-
tests/orchestrator/workflows/core/test_task.py | 16 +-
.../workflows/executor/test_executor.py | 104 +++++-----
tests/storage/test_models.py | 188 +++++++++----------
tests/utils/__init__.py | 14 ++
tests/utils/test_plugin.py | 75 ++++++++
17 files changed, 491 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/aria/cli/commands.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands.py b/aria/cli/commands.py
index 141da07..825be68 100644
--- a/aria/cli/commands.py
+++ b/aria/cli/commands.py
@@ -34,7 +34,7 @@ from ..logger import LoggerMixin
from ..storage import (FileSystemModelDriver, FileSystemResourceDriver)
from ..orchestrator.context.workflow import WorkflowContext
from ..orchestrator.workflows.core.engine import Engine
-from ..orchestrator.workflows.executor.thread import ThreadExecutor
+from ..orchestrator.workflows.executor.multiprocess import MultiprocessExecutor
from ..parser import iter_specifications
from ..parser.consumption import (
ConsumptionContext,
@@ -252,7 +252,7 @@ class ExecuteCommand(BaseCommand):
)
workflow_function = self._load_workflow_handler(workflow['operation'])
tasks_graph = workflow_function(workflow_context, **workflow_context.parameters)
- executor = ThreadExecutor()
+ executor = MultiprocessExecutor()
workflow_engine = Engine(executor=executor,
workflow_context=workflow_context,
tasks_graph=tasks_graph)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index 1c12407..70f9773 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -67,11 +67,11 @@ class OperationTask(BaseTask):
max_attempts=None,
retry_interval=None,
ignore_failure=None,
- inputs=None):
+ inputs=None,
+ plugin=None):
"""
Creates an operation task using the name, details, node instance and any additional kwargs.
:param name: the operation of the name.
- :param operation_details: the details for the operation.
:param actor: the operation host on which this operation is registered.
:param inputs: operation inputs.
"""
@@ -82,6 +82,7 @@ class OperationTask(BaseTask):
self.name = '{name}.{actor.id}'.format(name=name, actor=actor)
self.operation_mapping = operation_mapping
self.inputs = inputs or {}
+ self.plugin = plugin or {}
self.max_attempts = (self.workflow_context._task_max_attempts
if max_attempts is None else max_attempts)
self.retry_interval = (self.workflow_context._task_retry_interval
@@ -98,15 +99,13 @@ class OperationTask(BaseTask):
:param name: the name of the operation.
"""
assert isinstance(instance, models.NodeInstance)
- operation_details = instance.node.operations[name]
- operation_inputs = operation_details.get('inputs', {})
- operation_inputs.update(inputs or {})
- return cls(name=name,
- actor=instance,
- operation_mapping=operation_details.get('operation', ''),
- inputs=operation_inputs,
- *args,
- **kwargs)
+ return cls._instance(instance=instance,
+ name=name,
+ operation_details=instance.node.operations[name],
+ inputs=inputs,
+ plugins=instance.node.plugins or [],
+ *args,
+ **kwargs)
@classmethod
def relationship_instance(cls, instance, name, operation_end, inputs=None, *args, **kwargs):
@@ -125,12 +124,31 @@ class OperationTask(BaseTask):
cls.TARGET_OPERATION, cls.SOURCE_OPERATION
))
operation_details = getattr(instance.relationship, operation_end)[name]
+ if operation_end == cls.SOURCE_OPERATION:
+ plugins = instance.relationship.source_node.plugins
+ else:
+ plugins = instance.relationship.target_node.plugins
+ return cls._instance(instance=instance,
+ name=name,
+ operation_details=operation_details,
+ inputs=inputs,
+ plugins=plugins or [],
+ *args,
+ **kwargs)
+
+ @classmethod
+ def _instance(cls, instance, name, operation_details, inputs, plugins, *args, **kwargs):
+ operation_mapping = operation_details.get('operation')
operation_inputs = operation_details.get('inputs', {})
operation_inputs.update(inputs or {})
+ plugin_name = operation_details.get('plugin')
+ matching_plugins = [p for p in plugins if p['name'] == plugin_name]
+ plugin = matching_plugins[0] if matching_plugins else {}
return cls(actor=instance,
name=name,
- operation_mapping=operation_details.get('operation'),
+ operation_mapping=operation_mapping,
inputs=operation_inputs,
+ plugin=plugin,
*args,
**kwargs)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index 0be17fe..3d869ba 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -106,8 +106,9 @@ class OperationTask(BaseTask):
def __init__(self, api_task, *args, **kwargs):
super(OperationTask, self).__init__(id=api_task.id, **kwargs)
self._workflow_context = api_task._workflow_context
- base_task_model = api_task._workflow_context.model.task.model_cls
+ model = api_task._workflow_context.model
+ base_task_model = model.task.model_cls
if isinstance(api_task.actor, models.NodeInstance):
context_class = operation_context.NodeOperationContext
task_model_cls = base_task_model.as_node_instance
@@ -117,7 +118,11 @@ class OperationTask(BaseTask):
else:
raise RuntimeError('No operation context could be created for {actor.model_cls}'
.format(actor=api_task.actor))
-
+ plugin = api_task.plugin
+ plugins = model.plugin.list(filters={'package_name': plugin.get('package_name', ''),
+ 'package_version': plugin.get('package_version', '')},
+ include=['id'])
+ plugin_id = plugins[0].id if plugins else None
operation_task = task_model_cls(
name=api_task.name,
operation_mapping=api_task.operation_mapping,
@@ -127,6 +132,7 @@ class OperationTask(BaseTask):
max_attempts=api_task.max_attempts,
retry_interval=api_task.retry_interval,
ignore_failure=api_task.ignore_failure,
+ plugin_id=plugin_id
)
self._workflow_context.model.task.put(operation_task)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/aria/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/__init__.py b/aria/orchestrator/workflows/executor/__init__.py
index 16b6c9b..7b205c1 100644
--- a/aria/orchestrator/workflows/executor/__init__.py
+++ b/aria/orchestrator/workflows/executor/__init__.py
@@ -18,5 +18,5 @@ Executors for task execution
"""
-from . import blocking, multiprocess, thread
+from . import multiprocess, thread
from .base import BaseExecutor
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/aria/orchestrator/workflows/executor/blocking.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/blocking.py b/aria/orchestrator/workflows/executor/blocking.py
deleted file mode 100644
index 9d3a9ba..0000000
--- a/aria/orchestrator/workflows/executor/blocking.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Blocking executor
-"""
-
-from aria.utils import imports
-from .base import BaseExecutor
-
-
-class CurrentThreadBlockingExecutor(BaseExecutor):
- """
- Executor which runs tasks in the current thread (blocking)
- """
-
- def execute(self, task):
- self._task_started(task)
- try:
- task_func = imports.load_attribute(task.operation_mapping)
- task_func(ctx=task.context, **task.inputs)
- self._task_succeeded(task)
- except BaseException as e:
- self._task_failed(task, exception=e)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/aria/orchestrator/workflows/executor/multiprocess.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/multiprocess.py b/aria/orchestrator/workflows/executor/multiprocess.py
index d770e07..a88d9a4 100644
--- a/aria/orchestrator/workflows/executor/multiprocess.py
+++ b/aria/orchestrator/workflows/executor/multiprocess.py
@@ -17,7 +17,10 @@
Multiprocess based executor
"""
+import collections
import multiprocessing
+import os
+import sys
import threading
import jsonpickle
@@ -26,73 +29,90 @@ from aria.utils import imports
from .base import BaseExecutor
+_TaskStarted = collections.namedtuple('_TaskStarted', 'task_id')
+_TaskSucceeded = collections.namedtuple('_TaskSucceeded', 'task_id')
+_TaskFailed = collections.namedtuple('_TaskFailed', 'task_id exception')
+
+
class MultiprocessExecutor(BaseExecutor):
"""
Executor which runs tasks in a multiprocess environment
"""
- def __init__(self, pool_size=1, *args, **kwargs):
+ def __init__(self, pool_size=1, plugin_manager=None, *args, **kwargs):
super(MultiprocessExecutor, self).__init__(*args, **kwargs)
self._stopped = False
- self._manager = multiprocessing.Manager()
- self._queue = self._manager.Queue()
+ self._plugin_manager = plugin_manager
+ self._multiprocessing_manager = multiprocessing.Manager()
+ self._queue = self._multiprocessing_manager.Queue()
self._tasks = {}
self._listener_thread = threading.Thread(target=self._listener)
self._listener_thread.daemon = True
self._listener_thread.start()
- self._pool = multiprocessing.Pool(processes=pool_size)
-
- def execute(self, task):
- self._tasks[task.id] = task
- self._pool.apply_async(_multiprocess_handler, args=(
- self._queue,
- task.context,
- task.id,
- task.operation_mapping,
- task.inputs))
+ self._pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=1)
def close(self):
self._pool.close()
self._stopped = True
self._pool.join()
+ self._multiprocessing_manager.shutdown()
+ self._multiprocessing_manager.join()
self._listener_thread.join()
+ def execute(self, task):
+ self._tasks[task.id] = task
+ kwargs = {
+ 'queue': self._queue,
+ 'ctx': task.context,
+ 'task_id': task.id,
+ 'operation_mapping': task.operation_mapping,
+ 'operation_inputs': task.inputs,
+ 'plugin_prefix': None
+ }
+ if task.plugin_id and self._plugin_manager:
+ kwargs['plugin_prefix'] = self._plugin_manager.get_plugin_prefix(task.plugin)
+ self._pool.apply_async(_handler, kwds=kwargs)
+
+ def _remove_task(self, task_id):
+ return self._tasks.pop(task_id)
+
def _listener(self):
while not self._stopped:
try:
message = self._queue.get(timeout=1)
- if message.type == 'task_started':
+ if isinstance(message, _TaskStarted):
self._task_started(self._tasks[message.task_id])
- elif message.type == 'task_succeeded':
+ elif isinstance(message, _TaskSucceeded):
self._task_succeeded(self._remove_task(message.task_id))
- elif message.type == 'task_failed':
+ elif isinstance(message, _TaskFailed):
self._task_failed(self._remove_task(message.task_id),
exception=jsonpickle.loads(message.exception))
else:
- # TODO: something
- raise RuntimeError()
+ raise RuntimeError('Invalid state')
# Daemon threads
except BaseException:
pass
- def _remove_task(self, task_id):
- return self._tasks.pop(task_id)
-
-
-class _MultiprocessMessage(object):
-
- def __init__(self, type, task_id, exception=None):
- self.type = type
- self.task_id = task_id
- self.exception = exception
-
-def _multiprocess_handler(queue, ctx, task_id, operation_mapping, operation_inputs):
- queue.put(_MultiprocessMessage(type='task_started', task_id=task_id))
+def _handler(queue, ctx, task_id, operation_mapping, operation_inputs, plugin_prefix):
+ queue.put(_TaskStarted(task_id))
+ if plugin_prefix:
+ bin_dir = 'Scripts' if os.name == 'nt' else 'bin'
+ os.environ['PATH'] = '{0}{1}{2}'.format(
+ os.path.join(plugin_prefix, bin_dir),
+ os.pathsep,
+ os.environ.get('PATH', ''))
+ if os.name == 'nt':
+ pythonpath_dirs = [os.path.join(plugin_prefix, 'Lib', 'site-packages')]
+ else:
+ pythonpath_dirs = [os.path.join(
+ plugin_prefix, 'lib{0}'.format(b),
+ 'python{0}.{1}'.format(sys.version_info[0], sys.version_info[1]),
+ 'site-packages') for b in ['', '64']]
+ sys.path.extend(pythonpath_dirs)
try:
task_func = imports.load_attribute(operation_mapping)
task_func(ctx=ctx, **operation_inputs)
- queue.put(_MultiprocessMessage(type='task_succeeded', task_id=task_id))
+ queue.put(_TaskSucceeded(task_id))
except BaseException as e:
- queue.put(_MultiprocessMessage(type='task_failed', task_id=task_id,
- exception=jsonpickle.dumps(e)))
+ queue.put(_TaskFailed(task_id, exception=jsonpickle.dumps(e)))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 76ceefd..16a58fb 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -26,7 +26,8 @@ from .base import BaseExecutor
class ThreadExecutor(BaseExecutor):
"""
- Executor which runs tasks in a separate thread
+ Executor which runs tasks in a separate thread. It's easier writing tests
+ using this executor rather than the full blown multiprocessing executor.
"""
def __init__(self, pool_size=1, *args, **kwargs):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/aria/storage/models.py
----------------------------------------------------------------------
diff --git a/aria/storage/models.py b/aria/storage/models.py
index 6302e66..0a1027b 100644
--- a/aria/storage/models.py
+++ b/aria/storage/models.py
@@ -344,8 +344,7 @@ class Node(SQLModelBase):
min_number_of_instances = Column(Integer, nullable=False)
number_of_instances = Column(Integer, nullable=False)
planned_number_of_instances = Column(Integer, nullable=False)
- plugins = Column(Dict)
- plugins_to_install = Column(Dict)
+ plugins = Column(List)
properties = Column(Dict)
operations = Column(Dict)
type = Column(Text, nullable=False, index=True)
@@ -474,14 +473,13 @@ class Plugin(SQLModelBase):
distribution = Column(Text)
distribution_release = Column(Text)
distribution_version = Column(Text)
- excluded_wheels = Column(Dict)
package_name = Column(Text, nullable=False, index=True)
package_source = Column(Text)
package_version = Column(Text)
- supported_platform = Column(Dict)
- supported_py_versions = Column(Dict)
+ supported_platform = Column(Text)
+ supported_py_versions = Column(List)
uploaded_at = Column(DateTime, nullable=False, index=True)
- wheels = Column(Dict, nullable=False)
+ wheels = Column(List, nullable=False)
class Task(SQLModelBase):
@@ -550,6 +548,11 @@ class Task(SQLModelBase):
name = Column(String)
operation_mapping = Column(String)
inputs = Column(Dict)
+ plugin_id = foreign_key(Plugin.id, nullable=True)
+
+ @declared_attr
+ def plugin(cls):
+ return one_to_many_relationship(cls, Plugin, cls.plugin_id)
@declared_attr
def execution(cls):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/aria/utils/plugin.py
----------------------------------------------------------------------
diff --git a/aria/utils/plugin.py b/aria/utils/plugin.py
new file mode 100644
index 0000000..6934ed0
--- /dev/null
+++ b/aria/utils/plugin.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import tempfile
+import subprocess
+import sys
+from datetime import datetime
+
+import wagon
+
+
+def create(source, destination_dir):
+ return wagon.create(source=source, archive_destination_dir=destination_dir)
+
+
+class PluginManager(object):
+
+ def __init__(self, model, plugins_dir):
+ self._model = model
+ self._plugins_dir = plugins_dir
+
+ def install(self, source):
+ metadata = wagon.show(source)
+ cls = self._model.plugin.model_cls
+ plugin = cls(
+ archive_name=metadata['archive_name'],
+ supported_platform=metadata['supported_platform'],
+ supported_py_versions=metadata['supported_python_versions'],
+ # Remove suffix colon after upgrading wagon to > 0.5.0
+ distribution=metadata['build_server_os_properties']['distribution:'],
+ distribution_release=metadata['build_server_os_properties']['distribution_version'],
+ distribution_version=metadata['build_server_os_properties']['distribution_release'],
+ package_name=metadata['package_name'],
+ package_version=metadata['package_version'],
+ package_source=metadata['package_source'],
+ wheels=metadata['wheels'],
+ uploaded_at=datetime.now()
+ )
+ if len(self._model.plugin.list(filters={'package_name': plugin.package_name,
+ 'package_version': plugin.package_version})):
+ raise RuntimeError('Plugin {0}, version {1} already exists'.format(
+ plugin.package_name, plugin.package_version))
+ self._install_wagon(source=source, prefix=self.get_plugin_prefix(plugin))
+ self._model.plugin.put(plugin)
+ return plugin
+
+ def get_plugin_prefix(self, plugin):
+ return os.path.join(
+ self._plugins_dir,
+ '{0}-{1}'.format(plugin.package_name, plugin.package_version))
+
+ def _install_wagon(self, source, prefix):
+ with tempfile.NamedTemporaryFile() as constraint:
+ # TODO: test constraint
+ constraint.write(subprocess.check_output([sys.executable, '-m', 'pip', 'freeze']))
+ constraint.flush()
+ wagon.install(
+ source=source,
+ install_args='--prefix="{prefix}" --constraint="{constraint}"'.format(
+ prefix=prefix,
+ constraint=constraint.name))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/requirements.txt
----------------------------------------------------------------------
diff --git a/requirements.txt b/requirements.txt
index 31b0b79..0005a5e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -24,4 +24,5 @@ Jinja2==2.8
shortuuid==0.4.3
CacheControl[filecache]==0.11.6
clint==0.5.1
-SQLAlchemy==1.1.4
\ No newline at end of file
+SQLAlchemy==1.1.4
+wagon==0.5.0
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index e2e3d2f..26088e0 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -152,3 +152,19 @@ def get_deployment(blueprint):
outputs={},
scaling_groups={},
)
+
+
+def get_plugin(package_name='package', package_version='0.1'):
+ return models.Plugin(
+ archive_name='archive_name',
+ distribution='distribution',
+ distribution_release='dist_release',
+ distribution_version='dist_version',
+ package_name=package_name,
+ package_source='source',
+ package_version=package_version,
+ supported_platform='any',
+ supported_py_versions=['python27'],
+ uploaded_at=datetime.now(),
+ wheels=[],
+ )
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/tests/orchestrator/workflows/api/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/api/test_task.py b/tests/orchestrator/workflows/api/test_task.py
index 1a90338..58e387f 100644
--- a/tests/orchestrator/workflows/api/test_task.py
+++ b/tests/orchestrator/workflows/api/test_task.py
@@ -39,9 +39,12 @@ class TestOperationTask(object):
def test_node_operation_task_creation(self, ctx):
operation_name = 'aria.interfaces.lifecycle.create'
- op_details = {'operation': True}
+ op_details = {'operation': True, 'plugin': 'plugin'}
node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
node.operations[operation_name] = op_details
+ node.plugins = [{'name': 'plugin',
+ 'package_name': 'package',
+ 'package_version': '0.1'}]
ctx.model.node.update(node)
node_instance = \
ctx.model.node_instance.get_by_name(mock.models.DEPENDENT_NODE_INSTANCE_NAME)
@@ -66,12 +69,18 @@ class TestOperationTask(object):
assert api_task.retry_interval == retry_interval
assert api_task.max_attempts == max_attempts
assert api_task.ignore_failure == ignore_failure
+ assert api_task.plugin == {'name': 'plugin',
+ 'package_name': 'package',
+ 'package_version': '0.1'}
- def test_relationship_operation_task_creation(self, ctx):
+ def test_source_relationship_operation_task_creation(self, ctx):
operation_name = 'aria.interfaces.relationship_lifecycle.preconfigure'
- op_details = {'operation': True}
+ op_details = {'operation': True, 'plugin': 'plugin'}
relationship = ctx.model.relationship.list()[0]
relationship.source_operations[operation_name] = op_details
+ relationship.source_node.plugins = [{'name': 'plugin',
+ 'package_name': 'package',
+ 'package_version': '0.1'}]
relationship_instance = ctx.model.relationship_instance.list()[0]
inputs = {'inputs': True}
max_attempts = 10
@@ -92,6 +101,41 @@ class TestOperationTask(object):
assert api_task.inputs == inputs
assert api_task.retry_interval == retry_interval
assert api_task.max_attempts == max_attempts
+ assert api_task.plugin == {'name': 'plugin',
+ 'package_name': 'package',
+ 'package_version': '0.1'}
+
+ def test_target_relationship_operation_task_creation(self, ctx):
+ operation_name = 'aria.interfaces.relationship_lifecycle.preconfigure'
+ op_details = {'operation': True, 'plugin': 'plugin'}
+ relationship = ctx.model.relationship.list()[0]
+ relationship.target_operations[operation_name] = op_details
+ relationship.target_node.plugins = [{'name': 'plugin',
+ 'package_name': 'package',
+ 'package_version': '0.1'}]
+ relationship_instance = ctx.model.relationship_instance.list()[0]
+ inputs = {'inputs': True}
+ max_attempts = 10
+ retry_interval = 10
+
+ with context.workflow.current.push(ctx):
+ api_task = api.task.OperationTask.relationship_instance(
+ name=operation_name,
+ instance=relationship_instance,
+ operation_end=api.task.OperationTask.TARGET_OPERATION,
+ inputs=inputs,
+ max_attempts=max_attempts,
+ retry_interval=retry_interval)
+
+ assert api_task.name == '{0}.{1}'.format(operation_name, relationship_instance.id)
+ assert api_task.operation_mapping is True
+ assert api_task.actor == relationship_instance
+ assert api_task.inputs == inputs
+ assert api_task.retry_interval == retry_interval
+ assert api_task.max_attempts == max_attempts
+ assert api_task.plugin == {'name': 'plugin',
+ 'package_name': 'package',
+ 'package_version': '0.1'}
def test_operation_task_default_values(self, ctx):
dependency_node_instance = ctx.model.node_instance.get_by_name(
@@ -106,6 +150,7 @@ class TestOperationTask(object):
assert task.retry_interval == ctx._task_retry_interval
assert task.max_attempts == ctx._task_max_attempts
assert task.ignore_failure == ctx._task_ignore_failure
+ assert task.plugin == {}
class TestWorkflowTask(object):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/tests/orchestrator/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py
index c572501..6c3825c 100644
--- a/tests/orchestrator/workflows/core/test_task.py
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -42,24 +42,30 @@ class TestOperationTask(object):
with workflow_context.current.push(ctx):
api_task = api.task.OperationTask.node_instance(
instance=node_instance,
- name='aria.interfaces.lifecycle.create',
- )
-
+ name='aria.interfaces.lifecycle.create')
core_task = core.task.OperationTask(api_task=api_task)
-
return api_task, core_task
def test_operation_task_creation(self, ctx):
+ storage_plugin = mock.models.get_plugin(package_name='p1', package_version='0.1')
+ storage_plugin_other = mock.models.get_plugin(package_name='p0', package_version='0.0')
+ ctx.model.plugin.put(storage_plugin_other)
+ ctx.model.plugin.put(storage_plugin)
node_instance = \
ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+ node = node_instance.node
+ node.plugins = [{'name': 'plugin1',
+ 'package_name': 'p1',
+ 'package_version': '0.1'}]
+ node.operations['aria.interfaces.lifecycle.create'] = {'plugin': 'plugin1'}
api_task, core_task = self._create_operation_task(ctx, node_instance)
storage_task = ctx.model.task.get_by_name(core_task.name)
-
assert core_task.model_task == storage_task
assert core_task.name == api_task.name
assert core_task.operation_mapping == api_task.operation_mapping
assert core_task.actor == api_task.actor == node_instance
assert core_task.inputs == api_task.inputs == storage_task.inputs
+ assert core_task.plugin == storage_plugin
def test_operation_task_edit_locked_attribute(self, ctx):
node_instance = \
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 654542c..a587686 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -33,52 +33,28 @@ from aria.orchestrator import events
from aria.orchestrator.workflows.executor import (
thread,
multiprocess,
- blocking,
# celery
)
-class TestExecutor(object):
-
- @pytest.mark.parametrize('executor_cls,executor_kwargs', [
- (thread.ThreadExecutor, {'pool_size': 1}),
- (thread.ThreadExecutor, {'pool_size': 2}),
- (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
- (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
- (blocking.CurrentThreadBlockingExecutor, {}),
- # (celery.CeleryExecutor, {'app': app})
- ])
- def test_execute(self, executor_cls, executor_kwargs):
- self.executor = executor_cls(**executor_kwargs)
- expected_value = 'value'
- successful_task = MockTask(mock_successful_task)
- failing_task = MockTask(mock_failing_task)
- task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value})
-
- for task in [successful_task, failing_task, task_with_inputs]:
- self.executor.execute(task)
-
- @retrying.retry(stop_max_delay=10000, wait_fixed=100)
- def assertion():
- assert successful_task.states == ['start', 'success']
- assert failing_task.states == ['start', 'failure']
- assert task_with_inputs.states == ['start', 'failure']
- assert isinstance(failing_task.exception, MockException)
- assert isinstance(task_with_inputs.exception, MockException)
- assert task_with_inputs.exception.message == expected_value
- assertion()
-
- def setup_method(self):
- events.start_task_signal.connect(start_handler)
- events.on_success_task_signal.connect(success_handler)
- events.on_failure_task_signal.connect(failure_handler)
-
- def teardown_method(self):
- events.start_task_signal.disconnect(start_handler)
- events.on_success_task_signal.disconnect(success_handler)
- events.on_failure_task_signal.disconnect(failure_handler)
- if hasattr(self, 'executor'):
- self.executor.close()
+def test_execute(executor):
+ expected_value = 'value'
+ successful_task = MockTask(mock_successful_task)
+ failing_task = MockTask(mock_failing_task)
+ task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value})
+
+ for task in [successful_task, failing_task, task_with_inputs]:
+ executor.execute(task)
+
+ @retrying.retry(stop_max_delay=10000, wait_fixed=100)
+ def assertion():
+ assert successful_task.states == ['start', 'success']
+ assert failing_task.states == ['start', 'failure']
+ assert task_with_inputs.states == ['start', 'failure']
+ assert isinstance(failing_task.exception, MockException)
+ assert isinstance(task_with_inputs.exception, MockException)
+ assert task_with_inputs.exception.message == expected_value
+ assertion()
def mock_successful_task(**_):
@@ -119,6 +95,7 @@ class MockTask(object):
self.context = ctx or None
self.retry_count = 0
self.max_attempts = 1
+ self.plugin_id = None
for state in models.Task.STATES:
setattr(self, state.upper(), state)
@@ -128,14 +105,35 @@ class MockTask(object):
yield self
-def start_handler(task, *args, **kwargs):
- task.states.append('start')
-
-
-def success_handler(task, *args, **kwargs):
- task.states.append('success')
-
-
-def failure_handler(task, exception, *args, **kwargs):
- task.states.append('failure')
- task.exception = exception
+@pytest.fixture(params=[
+ (thread.ThreadExecutor, {'pool_size': 1}),
+ (thread.ThreadExecutor, {'pool_size': 2}),
+ (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
+ (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
+ # (celery.CeleryExecutor, {'app': app})
+])
+def executor(request):
+ executor_cls, executor_kwargs = request.param
+ result = executor_cls(**executor_kwargs)
+ yield result
+ result.close()
+
+
+@pytest.fixture(autouse=True)
+def register_signals():
+ def start_handler(task, *args, **kwargs):
+ task.states.append('start')
+
+ def success_handler(task, *args, **kwargs):
+ task.states.append('success')
+
+ def failure_handler(task, exception, *args, **kwargs):
+ task.states.append('failure')
+ task.exception = exception
+ events.start_task_signal.connect(start_handler)
+ events.on_success_task_signal.connect(success_handler)
+ events.on_failure_task_signal.connect(failure_handler)
+ yield
+ events.start_task_signal.disconnect(start_handler)
+ events.on_success_task_signal.disconnect(success_handler)
+ events.on_failure_task_signal.disconnect(failure_handler)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/tests/storage/test_models.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_models.py b/tests/storage/test_models.py
index 0ae5d1c..0651957 100644
--- a/tests/storage/test_models.py
+++ b/tests/storage/test_models.py
@@ -111,7 +111,9 @@ def _node_instances_storage():
def _execution_storage():
storage = _deployment_storage()
execution = mock.models.get_execution(storage.deployment.list()[0])
+ plugin = mock.models.get_plugin()
storage.execution.put(execution)
+ storage.plugin.put(plugin)
return storage
@@ -531,34 +533,31 @@ class TestNode(object):
@pytest.mark.parametrize(
'is_valid, name, deploy_number_of_instances, max_number_of_instances, '
'min_number_of_instances, number_of_instances, planned_number_of_instances, plugins, '
- 'plugins_to_install, properties, operations, type, type_hierarchy',
+ 'properties, operations, type, type_hierarchy',
[
- (False, m_cls, 1, 1, 1, 1, 1, {}, {}, {}, {}, 'type', []),
- (False, 'name', m_cls, 1, 1, 1, 1, {}, {}, {}, {}, 'type', []),
- (False, 'name', 1, m_cls, 1, 1, 1, {}, {}, {}, {}, 'type', []),
- (False, 'name', 1, 1, m_cls, 1, 1, {}, {}, {}, {}, 'type', []),
- (False, 'name', 1, 1, 1, m_cls, 1, {}, {}, {}, {}, 'type', []),
- (False, 'name', 1, 1, 1, 1, m_cls, {}, {}, {}, {}, 'type', []),
- (False, 'name', 1, 1, 1, 1, 1, m_cls, {}, {}, {}, 'type', []),
- (False, 'name', 1, 1, 1, 1, 1, {}, m_cls, {}, {}, 'type', []),
- (False, 'name', 1, 1, 1, 1, 1, {}, {}, m_cls, {}, 'type', []),
- (False, 'name', 1, 1, 1, 1, 1, {}, {}, {}, m_cls, 'type', []),
- (False, 'name', 1, 1, 1, 1, 1, {}, {}, {}, {}, m_cls, []),
- (False, 'name', 1, 1, 1, 1, 1, {}, {}, {}, {}, 'type', m_cls),
-
- (True, 'name', 1, 1, 1, 1, 1, {}, {}, {}, {}, 'type', []),
- (True, 'name', 1, 1, 1, 1, 1, None, {}, {}, {}, 'type', []),
- (True, 'name', 1, 1, 1, 1, 1, {}, None, {}, {}, 'type', []),
- (True, 'name', 1, 1, 1, 1, 1, {}, {}, None, {}, 'type', []),
- (True, 'name', 1, 1, 1, 1, 1, {}, {}, {}, None, 'type', []),
- (True, 'name', 1, 1, 1, 1, 1, {}, {}, {}, {}, 'type', []),
- (True, 'name', 1, 1, 1, 1, 1, {}, {}, {}, {}, 'type', None),
+ (False, m_cls, 1, 1, 1, 1, 1, [], {}, {}, 'type', []),
+ (False, 'name', m_cls, 1, 1, 1, 1, [], {}, {}, 'type', []),
+ (False, 'name', 1, m_cls, 1, 1, 1, [], {}, {}, 'type', []),
+ (False, 'name', 1, 1, m_cls, 1, 1, [], {}, {}, 'type', []),
+ (False, 'name', 1, 1, 1, m_cls, 1, [], {}, {}, 'type', []),
+ (False, 'name', 1, 1, 1, 1, m_cls, [], {}, {}, 'type', []),
+ (False, 'name', 1, 1, 1, 1, 1, m_cls, {}, {}, 'type', []),
+ (False, 'name', 1, 1, 1, 1, 1, [], m_cls, {}, 'type', []),
+ (False, 'name', 1, 1, 1, 1, 1, [], {}, m_cls, 'type', []),
+ (False, 'name', 1, 1, 1, 1, 1, [], {}, {}, m_cls, []),
+ (False, 'name', 1, 1, 1, 1, 1, [], {}, {}, 'type', m_cls),
+
+ (True, 'name', 1, 1, 1, 1, 1, [], {}, {}, 'type', []),
+ (True, 'name', 1, 1, 1, 1, 1, None, {}, {}, 'type', []),
+ (True, 'name', 1, 1, 1, 1, 1, [], None, {}, 'type', []),
+ (True, 'name', 1, 1, 1, 1, 1, [], {}, None, 'type', []),
+ (True, 'name', 1, 1, 1, 1, 1, [], {}, {}, 'type', None),
]
)
def test_node_model_creation(self, deployment_storage, is_valid, name,
deploy_number_of_instances, max_number_of_instances,
min_number_of_instances, number_of_instances,
- planned_number_of_instances, plugins, plugins_to_install,
+ planned_number_of_instances, plugins,
properties, operations, type, type_hierarchy):
node = _test_model(
is_valid=is_valid,
@@ -573,7 +572,6 @@ class TestNode(object):
number_of_instances=number_of_instances,
planned_number_of_instances=planned_number_of_instances,
plugins=plugins,
- plugins_to_install=plugins_to_install,
properties=properties,
operations=operations,
type=type,
@@ -713,58 +711,56 @@ class TestProviderContext(object):
class TestPlugin(object):
@pytest.mark.parametrize(
'is_valid, archive_name, distribution, distribution_release, '
- 'distribution_version, excluded_wheels, package_name, package_source, '
+ 'distribution_version, package_name, package_source, '
'package_version, supported_platform, supported_py_versions, uploaded_at, wheels',
[
- (False, m_cls, 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', 'pak_ver',
- {}, {}, now, {}),
- (False, 'arc_name', m_cls, 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', 'pak_ver',
- {}, {}, now, {}),
- (False, 'arc_name', 'dis_name', m_cls, 'dis_ver', {}, 'pak_name', 'pak_src', 'pak_ver',
- {}, {}, now, {}),
- (False, 'arc_name', 'dis_name', 'dis_rel', m_cls, {}, 'pak_name', 'pak_src', 'pak_ver',
- {}, {}, now, {}),
- (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', m_cls, 'pak_name', 'pak_src',
- 'pak_ver', {}, {}, now, {}),
- (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, m_cls, 'pak_src', 'pak_ver',
- {}, {}, now, {}),
- (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', m_cls, 'pak_ver',
- {}, {}, now, {}),
- (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', m_cls,
- {}, {}, now, {}),
- (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src',
- 'pak_ver', m_cls, {}, now, {}),
- (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src',
- 'pak_ver', {}, m_cls, now, {}),
- (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src',
- 'pak_ver', {}, {}, m_cls, {}),
- (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src',
- 'pak_ver', {}, {}, now, m_cls),
-
- (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src',
- 'pak_ver', {}, {}, now, {}),
- (True, 'arc_name', None, 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', 'pak_ver',
- {}, {}, now, {}),
- (True, 'arc_name', 'dis_name', None, 'dis_ver', {}, 'pak_name', 'pak_src', 'pak_ver',
- {}, {}, now, {}),
- (True, 'arc_name', 'dis_name', 'dis_rel', None, {}, 'pak_name', 'pak_src', 'pak_ver',
- {}, {}, now, {}),
- (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', None, 'pak_name', 'pak_src',
- 'pak_ver', {}, {}, now, {}),
- (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', None, 'pak_ver',
- {}, {}, now, {}),
- (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', None,
- {}, {}, now, {}),
- (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src',
- 'pak_ver', None, {}, now, {}),
- (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src',
- 'pak_ver', {}, None, now, {}),
- (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src',
- 'pak_ver', {}, {}, now, {}),
+ (False, m_cls, 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', 'pak_ver',
+ 'sup_pla', [], now, []),
+ (False, 'arc_name', m_cls, 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', 'pak_ver',
+ 'sup_pla', [], now, []),
+ (False, 'arc_name', 'dis_name', m_cls, 'dis_ver', 'pak_name', 'pak_src', 'pak_ver',
+ 'sup_pla', [], now, []),
+ (False, 'arc_name', 'dis_name', 'dis_rel', m_cls, 'pak_name', 'pak_src', 'pak_ver',
+ 'sup_pla', [], now, []),
+ (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', m_cls, 'pak_src', 'pak_ver',
+ 'sup_pla', [], now, []),
+ (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', m_cls, 'pak_ver',
+ 'sup_pla', [], now, []),
+ (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', m_cls,
+ 'sup_pla', [], now, []),
+ (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+ 'pak_ver', m_cls, [], now, []),
+ (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+ 'pak_ver', 'sup_pla', m_cls, now, []),
+ (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+ 'pak_ver', 'sup_pla', [], m_cls, []),
+ (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+ 'pak_ver', 'sup_pla', [], now, m_cls),
+
+ (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+ 'pak_ver', 'sup_pla', [], now, []),
+ (True, 'arc_name', None, 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', 'pak_ver',
+ 'sup_pla', [], now, []),
+ (True, 'arc_name', 'dis_name', None, 'dis_ver', 'pak_name', 'pak_src', 'pak_ver',
+ 'sup_pla', [], now, []),
+ (True, 'arc_name', 'dis_name', 'dis_rel', None, 'pak_name', 'pak_src', 'pak_ver',
+ 'sup_pla', [], now, []),
+ (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+ 'pak_ver', 'sup_pla', [], now, []),
+ (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', None, 'pak_ver',
+ 'sup_pla', [], now, []),
+ (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', None,
+ 'sup_pla', [], now, []),
+ (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+ 'pak_ver', None, [], now, []),
+ (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+ 'pak_ver', 'sup_pla', None, now, []),
+ (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src',
+ 'pak_ver', 'sup_pla', [], now, []),
]
)
def test_plugin_model_creation(self, empty_storage, is_valid, archive_name, distribution,
- distribution_release, distribution_version, excluded_wheels,
+ distribution_release, distribution_version,
package_name, package_source, package_version,
supported_platform, supported_py_versions, uploaded_at, wheels):
_test_model(is_valid=is_valid,
@@ -776,7 +772,6 @@ class TestPlugin(object):
distribution=distribution,
distribution_release=distribution_release,
distribution_version=distribution_version,
- excluded_wheels=excluded_wheels,
package_name=package_name,
package_source=package_source,
package_version=package_version,
@@ -791,34 +786,36 @@ class TestTask(object):
@pytest.mark.parametrize(
'is_valid, status, due_at, started_at, ended_at, max_attempts, retry_count, '
- 'retry_interval, ignore_failure, name, operation_mapping, inputs',
+ 'retry_interval, ignore_failure, name, operation_mapping, inputs, plugin_id',
[
- (False, m_cls, now, now, now, 1, 1, 1, True, 'name', 'map', {}),
- (False, Task.STARTED, m_cls, now, now, 1, 1, 1, True, 'name', 'map', {}),
- (False, Task.STARTED, now, m_cls, now, 1, 1, 1, True, 'name', 'map', {}),
- (False, Task.STARTED, now, now, m_cls, 1, 1, 1, True, 'name', 'map', {}),
- (False, Task.STARTED, now, now, now, m_cls, 1, 1, True, 'name', 'map', {}),
- (False, Task.STARTED, now, now, now, 1, m_cls, 1, True, 'name', 'map', {}),
- (False, Task.STARTED, now, now, now, 1, 1, m_cls, True, 'name', 'map', {}),
- (False, Task.STARTED, now, now, now, 1, 1, 1, True, m_cls, 'map', {}),
- (False, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', m_cls, {}),
- (False, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', m_cls),
-
- (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', {}),
- (True, Task.STARTED, None, now, now, 1, 1, 1, True, 'name', 'map', {}),
- (True, Task.STARTED, now, None, now, 1, 1, 1, True, 'name', 'map', {}),
- (True, Task.STARTED, now, now, None, 1, 1, 1, True, 'name', 'map', {}),
- (True, Task.STARTED, now, now, now, 1, None, 1, True, 'name', 'map', {}),
- (True, Task.STARTED, now, now, now, 1, 1, None, True, 'name', 'map', {}),
- (True, Task.STARTED, now, now, now, 1, 1, 1, None, 'name', 'map', {}),
- (True, Task.STARTED, now, now, now, 1, 1, 1, True, None, 'map', {}),
- (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', None, {}),
- (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', None),
+ (False, m_cls, now, now, now, 1, 1, 1, True, 'name', 'map', {}, '1'),
+ (False, Task.STARTED, m_cls, now, now, 1, 1, 1, True, 'name', 'map', {}, '1'),
+ (False, Task.STARTED, now, m_cls, now, 1, 1, 1, True, 'name', 'map', {}, '1'),
+ (False, Task.STARTED, now, now, m_cls, 1, 1, 1, True, 'name', 'map', {}, '1'),
+ (False, Task.STARTED, now, now, now, m_cls, 1, 1, True, 'name', 'map', {}, '1'),
+ (False, Task.STARTED, now, now, now, 1, m_cls, 1, True, 'name', 'map', {}, '1'),
+ (False, Task.STARTED, now, now, now, 1, 1, m_cls, True, 'name', 'map', {}, '1'),
+ (False, Task.STARTED, now, now, now, 1, 1, 1, True, m_cls, 'map', {}, '1'),
+ (False, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', m_cls, {}, '1'),
+ (False, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', m_cls, '1'),
+ (False, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', {}, m_cls),
+
+ (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', {}, '1'),
+ (True, Task.STARTED, None, now, now, 1, 1, 1, True, 'name', 'map', {}, '1'),
+ (True, Task.STARTED, now, None, now, 1, 1, 1, True, 'name', 'map', {}, '1'),
+ (True, Task.STARTED, now, now, None, 1, 1, 1, True, 'name', 'map', {}, '1'),
+ (True, Task.STARTED, now, now, now, 1, None, 1, True, 'name', 'map', {}, '1'),
+ (True, Task.STARTED, now, now, now, 1, 1, None, True, 'name', 'map', {}, '1'),
+ (True, Task.STARTED, now, now, now, 1, 1, 1, None, 'name', 'map', {}, '1'),
+ (True, Task.STARTED, now, now, now, 1, 1, 1, True, None, 'map', {}, '1'),
+ (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', None, {}, '1'),
+ (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', None, '1'),
+ (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', {}, None),
]
)
def test_task_model_creation(self, execution_storage, is_valid, status, due_at, started_at,
ended_at, max_attempts, retry_count, retry_interval,
- ignore_failure, name, operation_mapping, inputs):
+ ignore_failure, name, operation_mapping, inputs, plugin_id):
task = _test_model(
is_valid=is_valid,
storage=execution_storage,
@@ -837,9 +834,12 @@ class TestTask(object):
name=name,
operation_mapping=operation_mapping,
inputs=inputs,
+ plugin_id=plugin_id,
))
if is_valid:
assert task.execution == execution_storage.execution.list()[0]
+ if task.plugin_id:
+ assert task.plugin == execution_storage.plugin.list()[0]
def test_task_max_attempts_validation(self):
def create_task(max_attempts):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/tests/utils/__init__.py
----------------------------------------------------------------------
diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/tests/utils/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/11072ee3/tests/utils/test_plugin.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_plugin.py b/tests/utils/test_plugin.py
new file mode 100644
index 0000000..b9f1d59
--- /dev/null
+++ b/tests/utils/test_plugin.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+import pytest
+
+from aria import application_model_storage
+from aria.utils import plugin
+from aria.storage.sql_mapi import SQLAlchemyModelAPI
+
+from .. import storage
+
+
+PACKAGE_NAME = 'mock-plugin'
+PACKAGE_VERSION = '100'
+
+
+class TestPluginManager(object):
+
+ def test_install(self, plugin_manager, mock_plugin, model, plugins_dir):
+ plugin = plugin_manager.install(mock_plugin)
+ assert plugin.package_name == PACKAGE_NAME
+ assert plugin.package_version == PACKAGE_VERSION
+ assert plugin == model.plugin.get(plugin.id)
+ plugin_prefix = os.path.join(plugins_dir, '{0}-{1}'.format(PACKAGE_NAME, PACKAGE_VERSION))
+ assert os.path.isdir(plugin_prefix)
+ assert plugin_prefix == plugin_manager.get_plugin_prefix(plugin)
+
+ def test_install_already_exits(self, plugin_manager, mock_plugin):
+ plugin_manager.install(mock_plugin)
+ with pytest.raises(RuntimeError):
+ plugin_manager.install(mock_plugin)
+
+
+@pytest.fixture
+def model():
+ api_kwargs = storage.get_sqlite_api_kwargs()
+ model = application_model_storage(SQLAlchemyModelAPI, api_kwargs=api_kwargs)
+ yield model
+ storage.release_sqlite_storage(model)
+
+
+@pytest.fixture
+def plugins_dir(tmpdir):
+ result = tmpdir.join('plugins')
+ result.mkdir()
+ return str(result)
+
+
+@pytest.fixture
+def plugin_manager(model, plugins_dir):
+ return plugin.PluginManager(model=model, plugins_dir=plugins_dir)
+
+
+@pytest.fixture
+def mock_plugin(tmpdir):
+ source_dir = tmpdir.join('mock_plugin')
+ source_dir.mkdir()
+ setup_py = source_dir.join('setup.py')
+ setup_py.write('from setuptools import setup; setup(name="{0}", version="{1}")'
+ .format(PACKAGE_NAME, PACKAGE_VERSION))
+ return plugin.create(source=str(source_dir), destination_dir=str(tmpdir))