You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by ra...@apache.org on 2017/04/06 08:29:33 UTC
[07/32] incubator-ariatosca git commit: ARIA-48 cli
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 127641f..f631e79 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -68,17 +68,6 @@ class BaseContext(object):
self._workdir = workdir
self.logger = None
- def _create_execution(self):
- now = datetime.utcnow()
- execution = self.model.execution.model_cls(
- service_instance=self.service_instance,
- workflow_name=self._workflow_name,
- created_at=now,
- parameters=self.parameters,
- )
- self.model.execution.put(execution)
- return execution.id
-
def _register_logger(self, logger_name=None, level=None, task_id=None):
self.logger = self.PrefixedLogger(logging.getLogger(logger_name or self.__class__.__name__),
self.logging_id,
@@ -168,13 +157,13 @@ class BaseContext(object):
Download a blueprint resource from the resource storage
"""
try:
- self.resource.deployment.download(entry_id=str(self.service.id),
- destination=destination,
- path=path)
+ self.resource.service.download(entry_id=str(self.service.id),
+ destination=destination,
+ path=path)
except exceptions.StorageError:
- self.resource.blueprint.download(entry_id=str(self.service_template.id),
- destination=destination,
- path=path)
+ self.resource.service_template.download(entry_id=str(self.service_template.id),
+ destination=destination,
+ path=path)
def download_resource_and_render(self, destination, path=None, variables=None):
"""
@@ -193,9 +182,9 @@ class BaseContext(object):
Read a deployment resource as string from the resource storage
"""
try:
- return self.resource.deployment.read(entry_id=str(self.service.id), path=path)
+ return self.resource.service.read(entry_id=str(self.service.id), path=path)
except exceptions.StorageError:
- return self.resource.deployment.read(entry_id=str(self.service_template.id), path=path)
+ return self.resource.service_template.read(entry_id=str(self.service_template.id), path=path)
def get_resource_and_render(self, path=None, variables=None):
"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index 5f86d9d..bc9f653 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -31,11 +31,11 @@ class WorkflowContext(BaseContext):
"""
def __init__(self,
workflow_name,
+ execution_id,
parameters=None,
task_max_attempts=1,
task_retry_interval=0,
task_ignore_failure=False,
- execution_id=None,
*args, **kwargs):
super(WorkflowContext, self).__init__(*args, **kwargs)
self._workflow_name = workflow_name
@@ -43,28 +43,15 @@ class WorkflowContext(BaseContext):
self._task_max_attempts = task_max_attempts
self._task_retry_interval = task_retry_interval
self._task_ignore_failure = task_ignore_failure
- # TODO: execution creation should happen somewhere else
- # should be moved there, when such logical place exists
- self._execution_id = execution_id or self._create_execution()
+ self._execution_id = execution_id
self._register_logger()
def __repr__(self):
return (
'{name}(deployment_id={self._service_id}, '
- 'workflow_name={self._workflow_name}'.format(
+ 'workflow_name={self._workflow_name}, execution_id={self._execution_id})'.format(
name=self.__class__.__name__, self=self))
- def _create_execution(self):
- now = datetime.utcnow()
- execution = self.model.execution.model_cls(
- service=self.service,
- workflow_name=self._workflow_name,
- created_at=now,
- parameters=self.parameters,
- )
- self.model.execution.put(execution)
- return execution.id
-
@property
def logging_id(self):
return '{0}[{1}]'.format(self._workflow_name, self._execution_id)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/execution_plugin/ctx_proxy/server.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
index 817d064..52a5312 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
@@ -24,6 +24,7 @@ import StringIO
import wsgiref.simple_server
import bottle
+from aria import modeling
from .. import exceptions
@@ -111,7 +112,7 @@ class CtxProxy(object):
result = json.dumps({
'type': result_type,
'payload': payload
- })
+ }, cls=modeling.utils.ModelJSONEncoder)
except Exception as e:
traceback_out = StringIO.StringIO()
traceback.print_exc(file=traceback_out)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/runner.py b/aria/orchestrator/runner.py
deleted file mode 100644
index f1633fa..0000000
--- a/aria/orchestrator/runner.py
+++ /dev/null
@@ -1,101 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Workflow runner
-"""
-
-import tempfile
-import os
-
-from .context.workflow import WorkflowContext
-from .workflows.core.engine import Engine
-from .workflows.executor.thread import ThreadExecutor
-from ..storage import (
- sql_mapi,
- filesystem_rapi,
-)
-from .. import (
- application_model_storage,
- application_resource_storage
-)
-
-
-class Runner(object):
- """
- Runs workflows on a deployment. By default uses temporary storage (either on disk or in memory)
- but can also be used with existing storage.
-
- Handles the initialization of the storage engine and provides convenience methods for
- sub-classes to create tasks.
-
- :param path: path to Sqlite database file; use '' (the default) to use a temporary file,
- and None to use an in-memory database
- :type path: string
- """
-
- def __init__(self, workflow_name, workflow_fn, inputs, initialize_model_storage_fn,
- service_id_fn, storage_path='', is_storage_temporary=True):
- if storage_path == '':
- # Temporary file storage
- the_file, storage_path = tempfile.mkstemp(suffix='.db', prefix='aria-')
- os.close(the_file)
-
- self._storage_path = storage_path
- self._storage_dir = os.path.dirname(storage_path)
- self._storage_name = os.path.basename(storage_path)
- self._is_storage_temporary = is_storage_temporary
-
- workflow_context = self.create_workflow_context(workflow_name, initialize_model_storage_fn,
- service_id_fn)
-
- tasks_graph = workflow_fn(ctx=workflow_context, **inputs)
-
- self._engine = Engine(
- executor=ThreadExecutor(),
- workflow_context=workflow_context,
- tasks_graph=tasks_graph)
-
- def run(self):
- try:
- self._engine.execute()
- finally:
- self.cleanup()
-
- def create_workflow_context(self,
- workflow_name,
- initialize_model_storage_fn,
- service_id_fn):
- self.cleanup()
- model_storage = application_model_storage(
- sql_mapi.SQLAlchemyModelAPI,
- initiator_kwargs=dict(base_dir=self._storage_dir, filename=self._storage_name))
- if initialize_model_storage_fn:
- initialize_model_storage_fn(model_storage)
- resource_storage = application_resource_storage(
- filesystem_rapi.FileSystemResourceAPI, api_kwargs=dict(directory='.'))
- return WorkflowContext(
- name=workflow_name,
- model_storage=model_storage,
- resource_storage=resource_storage,
- service_id=service_id_fn(),
- workflow_name=self.__class__.__name__,
- task_max_attempts=1,
- task_retry_interval=1)
-
- def cleanup(self):
- if (self._is_storage_temporary and (self._storage_path is not None) and
- os.path.isfile(self._storage_path)):
- os.remove(self._storage_path)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
new file mode 100644
index 0000000..8b6b431
--- /dev/null
+++ b/aria/orchestrator/workflow_runner.py
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow runner
+"""
+
+import sys
+from datetime import datetime
+
+from .context.workflow import WorkflowContext
+from .workflows.builtin import BUILTIN_WORKFLOWS, BUILTIN_WORKFLOWS_PATH_PREFIX
+from .workflows.core.engine import Engine
+from .workflows.executor.process import ProcessExecutor
+from ..exceptions import AriaException
+from ..modeling import utils as modeling_utils
+from ..modeling import models
+from ..utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 1
+DEFAULT_TASK_RETRY_INTERVAL = 1
+# TODO move this constant somewhere in the DSL parser?
+WORKFLOW_POLICY_INTERNAL_PROPERTIES = ('implementation', 'dependencies')
+
+
+class WorkflowRunner(object):
+
+ def __init__(self, workflow_name, service_name, inputs,
+ model_storage, resource_storage, plugin_manager,
+ task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
+ task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
+
+ self._model_storage = model_storage
+ self._workflow_name = workflow_name
+ service = model_storage.service.get_by_name(service_name)
+ # the IDs are stored rather than the models themselves, so this module could be used
+ # by several threads without raising errors on model objects shared between threads
+ self._service_id = service.id
+
+ self._validate_workflow_exists_for_service()
+
+ workflow_fn = self._get_workflow_fn()
+
+ execution = self._create_execution_models(inputs)
+ self._execution_id = execution.id
+
+ workflow_context = WorkflowContext(
+ name=self.__class__.__name__,
+ model_storage=self._model_storage,
+ resource_storage=resource_storage,
+ service_id=service.id,
+ execution_id=execution.id,
+ workflow_name=workflow_name,
+ task_max_attempts=task_max_attempts,
+ task_retry_interval=task_retry_interval)
+
+ # transforming the execution inputs to dict, to pass them to the workflow function
+ execution_inputs_dict = {input.name: input.value for input in
+ self.execution.inputs.values()}
+ self._tasks_graph = workflow_fn(ctx=workflow_context, **execution_inputs_dict)
+
+ self._engine = Engine(
+ executor=ProcessExecutor(plugin_manager=plugin_manager),
+ workflow_context=workflow_context,
+ tasks_graph=self._tasks_graph)
+
+ @property
+ def execution(self):
+ return self._model_storage.execution.get(self._execution_id)
+
+ @property
+ def service(self):
+ return self._model_storage.service.get(self._service_id)
+
+ def execute(self):
+ #TODO uncomment, commented for testing purposes
+ # self._validate_no_active_executions()
+ self._engine.execute()
+
+ def cancel(self):
+ self._engine.cancel_execution()
+
+ def _create_execution_models(self, inputs):
+ execution = models.Execution(
+ created_at=datetime.utcnow(),
+ service=self.service,
+ workflow_name=self._workflow_name,
+ inputs={})
+
+ # built-in workflows don't have any inputs, and are also
+ # not a part of the service's workflows field
+ if self._workflow_name not in BUILTIN_WORKFLOWS:
+ workflow_inputs = {k: v for k, v in
+ self.service.workflows[self._workflow_name].inputs
+ if k not in WORKFLOW_POLICY_INTERNAL_PROPERTIES}
+
+ execution.inputs = modeling_utils.create_inputs(inputs, workflow_inputs)
+
+ self._model_storage.execution.put(execution)
+ return execution
+
+ def _validate_workflow_exists_for_service(self):
+ if self._workflow_name not in self.service.workflows and \
+ self._workflow_name not in BUILTIN_WORKFLOWS:
+ raise AriaException('No workflow policy {0} declared in service instance {1}'
+ .format(self._workflow_name, self.service.name))
+
+ def _validate_no_active_executions(self):
+ active_executions_filter = dict(service=self.service,
+ status=models.Execution.ACTIVE_STATES)
+ active_executions = self._model_storage.execution.list(filter=active_executions_filter)
+ if active_executions:
+ raise AriaException("Can't start execution; Service {0} has a running "
+ "execution with id {1}"
+ .format(self.service.name, active_executions[0].id))
+
+ def _get_workflow_fn(self):
+ if self._workflow_name in BUILTIN_WORKFLOWS:
+ return import_fullname('{0}.{1}'.format(BUILTIN_WORKFLOWS_PATH_PREFIX,
+ self._workflow_name))
+
+ workflow = self.service.workflows[self._workflow_name]
+
+ try:
+ # TODO: perhaps pass to import_fullname as paths instead of appending to sys path?
+ # TODO: revisit; workflow.implementation to be used instead?
+ sys.path.append(workflow.properties['implementation'].value)
+ # sys.path.append(os.path.dirname(str(context.presentation.location)))
+ except KeyError:
+ # no implementation field - a script has been provided directly
+ pass
+
+ workflow_fn = import_fullname(workflow.properties['implementation'].value)
+ return workflow_fn
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index f49ec2e..2ec85b9 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -19,7 +19,7 @@ Provides the tasks to be entered into the task graph
import copy
from ....modeling import models
-from ....utils.collections import OrderedDict
+from ....modeling import utils as modeling_utils
from ....utils.uuid import generate_uuid
from ... import context
from .. import exceptions
@@ -63,7 +63,6 @@ class OperationTask(BaseTask):
def __init__(self,
actor,
- actor_type,
interface_name,
operation_name,
runs_on=None,
@@ -76,6 +75,7 @@ class OperationTask(BaseTask):
:meth:`for_relationship`.
"""
+ actor_type = type(actor).__name__.lower()
assert isinstance(actor, (models.Node, models.Relationship))
assert actor_type in ('node', 'relationship')
assert interface_name and operation_name
@@ -93,22 +93,7 @@ class OperationTask(BaseTask):
self.interface_name = interface_name
self.operation_name = operation_name
- # Wrap inputs
- inputs = copy.deepcopy(inputs) if inputs else {}
- for k, v in inputs.iteritems():
- if not isinstance(v, models.Parameter):
- inputs[k] = models.Parameter.wrap(k, v)
-
- # TODO: Suggestion: these extra inputs could be stored as a separate entry in the task
- # model, because they are different from the operation inputs. If we do this, then the two
- # kinds of inputs should *not* be merged here.
-
- operation = self._get_operation()
- if operation is None:
- raise exceptions.OperationNotFoundException(
- 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"'
- .format(self.operation_name, self.interface_name, actor_type, actor.name))
-
+ operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
self.plugin = None
if operation.plugin_specification:
self.plugin = OperationTask._find_plugin(operation.plugin_specification)
@@ -117,9 +102,8 @@ class OperationTask(BaseTask):
'Could not find plugin of operation "{0}" on interface "{1}" for {2} "{3}"'
.format(self.operation_name, self.interface_name, actor_type, actor.name))
+ self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs)
self.implementation = operation.implementation
- self.inputs = OperationTask._merge_inputs(operation.inputs, inputs)
-
self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
name=actor.name,
interface=self.interface_name,
@@ -128,14 +112,6 @@ class OperationTask(BaseTask):
def __repr__(self):
return self.name
- def _get_operation(self):
- interface = self.actor.interfaces.get(self.interface_name)
- if interface:
- return interface.operations.get(self.operation_name)
- return None
-
-
-
@classmethod
def for_node(cls,
node,
@@ -163,7 +139,6 @@ class OperationTask(BaseTask):
assert isinstance(node, models.Node)
return cls(
actor=node,
- actor_type='node',
interface_name=interface_name,
operation_name=operation_name,
max_attempts=max_attempts,
@@ -202,7 +177,6 @@ class OperationTask(BaseTask):
assert runs_on in models.Task.RUNS_ON
return cls(
actor=relationship,
- actor_type='relationship',
interface_name=interface_name,
operation_name=operation_name,
runs_on=runs_on,
@@ -216,13 +190,6 @@ class OperationTask(BaseTask):
workflow_context = context.workflow.current.get()
return plugin_specification.find_plugin(workflow_context.model.plugin.list())
- @staticmethod
- def _merge_inputs(operation_inputs, override_inputs=None):
- final_inputs = OrderedDict(operation_inputs)
- if override_inputs:
- final_inputs.update(override_inputs)
- return final_inputs
-
class WorkflowTask(BaseTask):
"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/workflows/builtin/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/__init__.py b/aria/orchestrator/workflows/builtin/__init__.py
index d43a962..8b13c62 100644
--- a/aria/orchestrator/workflows/builtin/__init__.py
+++ b/aria/orchestrator/workflows/builtin/__init__.py
@@ -24,6 +24,7 @@ from .stop import stop
BUILTIN_WORKFLOWS = ('install', 'uninstall', 'start', 'stop')
+BUILTIN_WORKFLOWS_PATH_PREFIX = 'aria.orchestrator.workflows.builtin'
__all__ = [
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/workflows/builtin/execute_operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/execute_operation.py b/aria/orchestrator/workflows/builtin/execute_operation.py
index 348f47a..7ee135f 100644
--- a/aria/orchestrator/workflows/builtin/execute_operation.py
+++ b/aria/orchestrator/workflows/builtin/execute_operation.py
@@ -17,6 +17,7 @@
Builtin execute_operation workflow
"""
+from . import utils
from ..api.task import OperationTask
from ... import workflow
@@ -28,7 +29,6 @@ def execute_operation(
interface_name,
operation_name,
operation_kwargs,
- allow_kwargs_override,
run_by_dependency_order,
type_names,
node_template_ids,
@@ -41,7 +41,6 @@ def execute_operation(
:param TaskGraph graph: the graph which will describe the workflow.
:param basestring operation: the operation name to execute
:param dict operation_kwargs:
- :param bool allow_kwargs_override:
:param bool run_by_dependency_order:
:param type_names:
:param node_template_ids:
@@ -71,8 +70,7 @@ def execute_operation(
node=node,
interface_name=interface_name,
operation_name=operation_name,
- operation_kwargs=operation_kwargs,
- allow_kwargs_override=allow_kwargs_override
+ operation_kwargs=operation_kwargs
)
)
@@ -108,21 +106,16 @@ def _create_node_task(
node,
interface_name,
operation_name,
- operation_kwargs,
- allow_kwargs_override):
+ operation_kwargs):
"""
A workflow which executes a single operation
:param node: the node instance to install
:param basestring operation: the operation name
:param dict operation_kwargs:
- :param bool allow_kwargs_override:
:return:
"""
- if allow_kwargs_override is not None:
- operation_kwargs['allow_kwargs_override'] = allow_kwargs_override
-
- return OperationTask.for_node(
+ return utils.create_node_task(
node=node,
interface_name=interface_name,
operation_name=operation_name,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/workflows/builtin/utils.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py
index d79318f..8890084 100644
--- a/aria/orchestrator/workflows/builtin/utils.py
+++ b/aria/orchestrator/workflows/builtin/utils.py
@@ -12,26 +12,31 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from ..api.task import OperationTask
+
+from ..api.task import OperationTask, StubTask
from .. import exceptions
-def create_node_task(node, interface_name, operation_name):
+def create_node_task(node, interface_name, operation_name, **kwargs):
"""
Returns a new operation task if the operation exists in the node, otherwise returns None.
"""
try:
+ if _is_empty_task(node, interface_name, operation_name):
+ return StubTask()
+
return OperationTask.for_node(node=node,
interface_name=interface_name,
- operation_name=operation_name)
+ operation_name=operation_name,
+ **kwargs)
except exceptions.OperationNotFoundException:
# We will skip nodes which do not have the operation
return None
def create_relationships_tasks(
- node, interface_name, source_operation_name=None, target_operation_name=None):
+ node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs):
"""
Creates a relationship task (source and target) for all of a node_instance relationships.
:param basestring source_operation_name: the relationship operation name.
@@ -43,21 +48,18 @@ def create_relationships_tasks(
"""
sub_tasks = []
for relationship in node.outbound_relationships:
- try:
- relationship_operations = relationship_tasks(
- relationship,
- interface_name,
- source_operation_name=source_operation_name,
- target_operation_name=target_operation_name)
- sub_tasks.append(relationship_operations)
- except exceptions.OperationNotFoundException:
- # We will skip relationships which do not have the operation
- pass
+ relationship_operations = relationship_tasks(
+ relationship,
+ interface_name,
+ source_operation_name=source_operation_name,
+ target_operation_name=target_operation_name,
+ **kwargs)
+ sub_tasks.append(relationship_operations)
return sub_tasks
-def relationship_tasks(
- relationship, interface_name, source_operation_name=None, target_operation_name=None):
+def relationship_tasks(relationship, interface_name, source_operation_name=None,
+ target_operation_name=None, **kwargs):
"""
Creates a relationship task source and target.
:param Relationship relationship: the relationship instance itself
@@ -68,19 +70,35 @@ def relationship_tasks(
"""
operations = []
if source_operation_name:
- operations.append(
- OperationTask.for_relationship(relationship=relationship,
- interface_name=interface_name,
- operation_name=source_operation_name,
- runs_on='source')
- )
+ try:
+ if _is_empty_task(relationship, interface_name, source_operation_name):
+ operations.append(StubTask())
+
+ operations.append(
+ OperationTask.for_relationship(relationship=relationship,
+ interface_name=interface_name,
+ operation_name=source_operation_name,
+ runs_on='source',
+ **kwargs)
+ )
+ except exceptions.OperationNotFoundException:
+ # We will skip relationships which do not have the operation
+ pass
if target_operation_name:
- operations.append(
- OperationTask.for_relationship(relationship=relationship,
- interface_name=interface_name,
- operation_name=target_operation_name,
- runs_on='target')
- )
+ try:
+ if _is_empty_task(relationship, interface_name, target_operation_name):
+ operations.append(StubTask())
+
+ operations.append(
+ OperationTask.for_relationship(relationship=relationship,
+ interface_name=interface_name,
+ operation_name=target_operation_name,
+ runs_on='target',
+ **kwargs)
+ )
+ except exceptions.OperationNotFoundException:
+ # We will skip relationships which do not have the operation
+ pass
return operations
@@ -108,3 +126,15 @@ def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False):
graph.add_dependency(dependency, task)
else:
graph.add_dependency(task, dependencies)
+
+
+def _is_empty_task(actor, interface_name, operation_name):
+ interface = actor.interfaces.get(interface_name)
+ if interface:
+ operation = interface.operations.get(operation_name)
+ if operation:
+ return operation.implementation is None
+
+ raise exceptions.OperationNotFoundException(
+ 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"'
+ .format(operation_name, interface_name, type(actor).__name__.lower(), actor.name))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/storage/core.py
----------------------------------------------------------------------
diff --git a/aria/storage/core.py b/aria/storage/core.py
index 8302fc9..0e900bb 100644
--- a/aria/storage/core.py
+++ b/aria/storage/core.py
@@ -37,6 +37,7 @@ API:
* drivers - module, a pool of ARIA standard drivers.
* StorageDriver - class, abstract model implementation.
"""
+import logging
from aria.logger import LoggerMixin
from . import sql_mapi
@@ -71,6 +72,10 @@ class Storage(LoggerMixin):
:param kwargs:
"""
super(Storage, self).__init__(**kwargs)
+ # Set the logger handler of any storage object to NullHandler.
+ # This is since the absence of a handler shows up while using the CLI in the form of:
+ # `No handlers could be found for logger "aria.ResourceStorage"`.
+ self.logger.addHandler(logging.NullHandler())
self.api = api_cls
self.registered = {}
self._initiator = initiator
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/utils/archive.py
----------------------------------------------------------------------
diff --git a/aria/utils/archive.py b/aria/utils/archive.py
new file mode 100644
index 0000000..5077dec
--- /dev/null
+++ b/aria/utils/archive.py
@@ -0,0 +1,63 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import tarfile
+import zipfile
+import tempfile
+from contextlib import closing
+
+
+def is_archive(source):
+ return tarfile.is_tarfile(source) or zipfile.is_zipfile(source)
+
+
+def extract_archive(source):
+ if tarfile.is_tarfile(source):
+ return untar(source)
+ elif zipfile.is_zipfile(source):
+ return unzip(source)
+ raise ValueError(
+ 'Unsupported archive type provided or archive is not valid: {0}.'.format(source))
+
+
+def tar(source, destination):
+ with closing(tarfile.open(destination, 'w:gz')) as tar:
+ tar.add(source, arcname=os.path.basename(source))
+
+
+def untar(archive, destination=None):
+ if not destination:
+ destination = tempfile.mkdtemp()
+ with closing(tarfile.open(name=archive)) as tar:
+ tar.extractall(path=destination, members=tar.getmembers())
+ return destination
+
+
+def zip(source, destination):
+ with closing(zipfile.ZipFile(destination, 'w')) as zip_file:
+ for root, _, files in os.walk(source):
+ for filename in files:
+ file_path = os.path.join(root, filename)
+ source_dir = os.path.dirname(source)
+ zip_file.write(
+ file_path, os.path.relpath(file_path, source_dir))
+ return destination
+
+
+def unzip(archive, destination=None):
+ if not destination:
+ destination = tempfile.mkdtemp()
+ with closing(zipfile.ZipFile(archive, 'r')) as zip_file:
+ zip_file.extractall(destination)
+ return destination
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/utils/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/utils/exceptions.py b/aria/utils/exceptions.py
index 9e3e80f..b60cee4 100644
--- a/aria/utils/exceptions.py
+++ b/aria/utils/exceptions.py
@@ -15,6 +15,7 @@
import sys
import linecache
+import StringIO
import traceback as tb
import jsonpickle
@@ -89,6 +90,16 @@ def _print_stack(frame):
puts(line)
+def get_exception_as_string(exc_type, exc_val, traceback):
+ s_traceback = StringIO.StringIO()
+ tb.print_exception(
+ etype=exc_type,
+ value=exc_val,
+ tb=traceback,
+ file=s_traceback)
+ return s_traceback.getvalue()
+
+
class _WrappedException(Exception):
def __init__(self, exception_type, exception_str):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/utils/file.py
----------------------------------------------------------------------
diff --git a/aria/utils/file.py b/aria/utils/file.py
index b515f70..6d1aa16 100644
--- a/aria/utils/file.py
+++ b/aria/utils/file.py
@@ -15,6 +15,7 @@
import errno
import os
+import shutil
def makedirs(path):
@@ -26,3 +27,15 @@ def makedirs(path):
except IOError as e:
if e.errno != errno.EEXIST:
raise
+
+def remove_if_exists(path):
+
+ try:
+ if os.path.isfile(path):
+ os.remove(path)
+ if os.path.isdir(path):
+ shutil.rmtree(path)
+
+ except OSError as e:
+ if e.errno != errno.ENOENT: # errno.ENOENT = no such file or directory
+ raise # re-raise exception if a different error occurred
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/utils/formatting.py
----------------------------------------------------------------------
diff --git a/aria/utils/formatting.py b/aria/utils/formatting.py
index 8a223e9..698393f 100644
--- a/aria/utils/formatting.py
+++ b/aria/utils/formatting.py
@@ -83,6 +83,49 @@ def full_type_name(value):
return name if module == '__builtin__' else '%s.%s' % (module, name)
+def decode_list(data):
+ rv = []
+ for item in data:
+ if isinstance(item, unicode):
+ item = item.encode('utf-8')
+ elif isinstance(item, list):
+ item = decode_list(item)
+ elif isinstance(item, dict):
+ item = decode_dict(item)
+ rv.append(item)
+ return rv
+
+
+def decode_dict(data):
+ rv = {}
+ for key, value in data.iteritems():
+ if isinstance(key, unicode):
+ key = key.encode('utf-8')
+ if isinstance(value, unicode):
+ value = value.encode('utf-8')
+ elif isinstance(value, list):
+ value = decode_list(value)
+ elif isinstance(value, dict):
+ value = decode_dict(value)
+ rv[key] = value
+ return rv
+
+
+def try_convert_from_str(string, target_type):
+ if target_type == basestring:
+ return string
+ if target_type == bool:
+ if string.lower() == 'true':
+ return True
+ if string.lower() == 'false':
+ return False
+ return string
+ try:
+ return target_type(string)
+ except ValueError:
+ return string
+
+
def safe_str(value):
"""
Like :code:`str` coercion, but makes sure that Unicode strings are properly
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/utils/threading.py
----------------------------------------------------------------------
diff --git a/aria/utils/threading.py b/aria/utils/threading.py
index b99250d..f4e9c0e 100644
--- a/aria/utils/threading.py
+++ b/aria/utils/threading.py
@@ -15,6 +15,7 @@
from __future__ import absolute_import # so we can import standard 'threading'
+import sys
import itertools
import multiprocessing
from threading import (Thread, Lock)
@@ -255,3 +256,26 @@ class LockedList(list):
def __exit__(self, the_type, value, traceback):
return self.lock.__exit__(the_type, value, traceback)
+
+
+class ExceptionThread(Thread):
+ """
+ A thread from which top level exceptions can be retrieved or reraised
+ """
+ def __init__(self, *args, **kwargs):
+ Thread.__init__(self, *args, **kwargs)
+ self.exception = None
+
+ def run(self):
+ try:
+ super(ExceptionThread, self).run()
+ except BaseException:
+ self.exception = sys.exc_info()
+
+ def is_error(self):
+ return self.exception is not None
+
+ def raise_error_if_exists(self):
+ if self.is_error():
+ t, v, tb = self.exception
+ raise t, v, tb
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/utils/type.py
----------------------------------------------------------------------
diff --git a/aria/utils/type.py b/aria/utils/type.py
new file mode 100644
index 0000000..e427be1
--- /dev/null
+++ b/aria/utils/type.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+def validate_value_type(value, type_name):
+ """Supports both python and yaml type names"""
+ #TODO add timestamp type?
+
+ name_to_type = {
+ 'list': list,
+ 'dict': dict,
+ 'str': str,
+ 'unicode': str,
+ 'string': str,
+ 'int': int,
+ 'integer': int,
+ 'bool': bool,
+ 'boolean': bool,
+ 'float': float
+ }
+
+ type = name_to_type.get(type_name.lower())
+ if type is None:
+ raise ValueError('No supported type_name was provided')
+ try:
+ type(value)
+ except ValueError:
+ raise False
+
+
+def convert_value_to_type(str_value, type_name):
+ try:
+ if type_name.lower() in ['str', 'unicode']:
+ return str_value.decode('utf-8')
+ elif type_name.lower() == 'int':
+ return int(str_value)
+ elif type_name.lower() == 'bool':
+ return bool(str_value)
+ elif type_name.lower() == 'float':
+ return float(str_value)
+ else:
+ raise ValueError('No supported type_name was provided')
+ except ValueError:
+ raise ValueError('Trying to convert {0} to {1} failed'.format(str_value,
+ type_name))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/extensions/aria_extension_tosca/simple_v1_0/modeling/__init__.py
----------------------------------------------------------------------
diff --git a/extensions/aria_extension_tosca/simple_v1_0/modeling/__init__.py b/extensions/aria_extension_tosca/simple_v1_0/modeling/__init__.py
index 267f6de..e697bc2 100644
--- a/extensions/aria_extension_tosca/simple_v1_0/modeling/__init__.py
+++ b/extensions/aria_extension_tosca/simple_v1_0/modeling/__init__.py
@@ -19,6 +19,7 @@ Creates ARIA service template models based on the TOSCA presentation.
Relies on many helper methods in the presentation classes.
"""
+import os
import re
from types import FunctionType
from datetime import datetime
@@ -34,7 +35,7 @@ from ..data_types import coerce_value
def create_service_template_model(context): # pylint: disable=too-many-locals,too-many-branches
model = ServiceTemplate(created_at=datetime.now(),
- main_file_name=str(context.presentation.location))
+ main_file_name=os.path.basename(str(context.presentation.location)))
model.description = context.presentation.get('service_template', 'description', 'value')
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/requirements.in
----------------------------------------------------------------------
diff --git a/requirements.in b/requirements.in
index bc27479..2c7978f 100644
--- a/requirements.in
+++ b/requirements.in
@@ -25,6 +25,11 @@ SQLAlchemy>=1.1.0, <1.2 # version 1.2 dropped support of python 2.6
wagon==0.6.0
bottle>=0.12.0, <0.13
Fabric>=1.13.0, <1.14
+click==4.1
+colorama==0.3.3
+PrettyTable>=0.7,<0.8
+click_didyoumean==0.0.3
+backports.shutil_get_terminal_size==1.0.0
# Since the tool we are using to generate our requirements.txt, `pip-tools`,
# does not currently support conditional dependencies (;), we're adding our original
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 3d72ebc..b64453a 100644
--- a/setup.py
+++ b/setup.py
@@ -61,7 +61,7 @@ except IOError:
extras_require = {}
-console_scripts = ['aria = aria.cli.cli:main']
+console_scripts = ['aria = aria.cli.main:main']
def _generate_user_options(command):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/mock/context.py
----------------------------------------------------------------------
diff --git a/tests/mock/context.py b/tests/mock/context.py
index f943d7e..ac0a8a7 100644
--- a/tests/mock/context.py
+++ b/tests/mock/context.py
@@ -39,12 +39,17 @@ def simple(tmpdir, inmemory=False, context_kwargs=None, topology=None):
api_kwargs=dict(directory=os.path.join(tmpdir, 'resources'))
)
+ service_id = topology(model_storage)
+ execution = models.create_execution(model_storage.service.get(service_id))
+ model_storage.execution.put(execution)
+
final_kwargs = dict(
name='simple_context',
model_storage=model_storage,
resource_storage=resource_storage,
- service_id=topology(model_storage),
+ service_id=service_id,
workflow_name=models.WORKFLOW_NAME,
+ execution_id=execution.id,
task_max_attempts=models.TASK_MAX_ATTEMPTS,
task_retry_interval=models.TASK_RETRY_INTERVAL
)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index 1d29e2d..6b7f810 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -40,7 +40,6 @@ from aria.orchestrator.workflows.builtin.workflows import (
SERVICE_NAME = 'test_service_name'
SERVICE_TEMPLATE_NAME = 'test_service_template_name'
WORKFLOW_NAME = 'test_workflow_name'
-EXECUTION_NAME = 'test_execution_name'
TASK_RETRY_INTERVAL = 1
TASK_MAX_ATTEMPTS = 1
@@ -168,6 +167,13 @@ def create_interface_template(service_template, interface_name, operation_name,
def create_interface(service, interface_name, operation_name, operation_kwargs=None,
interface_kwargs=None):
the_type = service.service_template.interface_types.get_descendant('test_interface_type')
+
+ if operation_kwargs and operation_kwargs.get('inputs'):
+ wrapped_inputs = {}
+ for input_name, input_value in operation_kwargs['inputs'].iteritems():
+ wrapped_inputs[input_name] = models.Parameter.wrap(input_name, input_value)
+ operation_kwargs['inputs'] = wrapped_inputs
+
operation = models.Operation(
name=operation_name,
**(operation_kwargs or {})
@@ -183,10 +189,11 @@ def create_interface(service, interface_name, operation_name, operation_kwargs=N
def create_execution(service):
return models.Execution(
service=service,
- status=models.Execution.STARTED,
+ status=models.Execution.PENDING,
workflow_name=WORKFLOW_NAME,
+ created_at=datetime.utcnow(),
started_at=datetime.utcnow(),
- parameters=None
+ inputs={}
)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/modeling/test_models.py
----------------------------------------------------------------------
diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py
index 8cd00f8..e459821 100644
--- a/tests/modeling/test_models.py
+++ b/tests/modeling/test_models.py
@@ -253,7 +253,7 @@ class TestService(object):
class TestExecution(object):
@pytest.mark.parametrize(
- 'is_valid, created_at, started_at, ended_at, error, is_system_workflow, parameters, '
+ 'is_valid, created_at, started_at, ended_at, error, is_system_workflow, inputs, '
'status, workflow_name',
[
(False, m_cls, now, now, 'error', False, {}, Execution.STARTED, 'wf_name'),
@@ -268,11 +268,11 @@ class TestExecution(object):
(True, now, None, now, 'error', False, {}, Execution.STARTED, 'wf_name'),
(True, now, now, None, 'error', False, {}, Execution.STARTED, 'wf_name'),
(True, now, now, now, None, False, {}, Execution.STARTED, 'wf_name'),
- (True, now, now, now, 'error', False, None, Execution.STARTED, 'wf_name'),
+ (True, now, now, now, 'error', False, {}, Execution.STARTED, 'wf_name'),
]
)
def test_execution_model_creation(self, service_storage, is_valid, created_at, started_at,
- ended_at, error, is_system_workflow, parameters, status,
+ ended_at, error, is_system_workflow, inputs, status,
workflow_name):
execution = _test_model(
is_valid=is_valid,
@@ -285,7 +285,7 @@ class TestExecution(object):
ended_at=ended_at,
error=error,
is_system_workflow=is_system_workflow,
- parameters=parameters,
+ inputs=inputs,
status=status,
workflow_name=workflow_name,
))
@@ -299,7 +299,7 @@ class TestExecution(object):
id='e_id',
workflow_name='w_name',
status=status,
- parameters={},
+ inputs={},
created_at=now,
)
return execution
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index f55b83e..47c82dc 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -69,16 +69,17 @@ def test_node_operation_task_execution(ctx, thread_executor):
interface_name = 'Standard'
operation_name = 'create'
+ inputs = {'putput': True}
node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
interface = mock.models.create_interface(
node.service,
interface_name,
operation_name,
- operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__))
+ operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__),
+ inputs=inputs)
)
node.interfaces[interface.name] = interface
ctx.model.node.update(node)
- inputs = {'putput': True}
@workflow
def basic_workflow(graph, **_):
@@ -124,17 +125,18 @@ def test_relationship_operation_task_execution(ctx, thread_executor):
interface_name = 'Configure'
operation_name = 'post_configure'
+ inputs = {'putput': True}
relationship = ctx.model.relationship.list()[0]
interface = mock.models.create_interface(
relationship.source_node.service,
interface_name,
operation_name,
- operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__)),
+ operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__),
+ inputs=inputs),
)
relationship.interfaces[interface.name] = interface
ctx.model.relationship.update(relationship)
- inputs = {'putput': True}
@workflow
def basic_workflow(graph, **_):
@@ -232,21 +234,21 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
ctx.model.plugin.put(plugin)
plugin_specification = mock.models.create_plugin_specification()
node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ filename = 'test_file'
+ content = 'file content'
+ inputs = {'filename': filename, 'content': content}
interface = mock.models.create_interface(
node.service,
interface_name,
operation_name,
operation_kwargs=dict(
implementation='{0}.{1}'.format(__name__, _test_plugin_workdir.__name__),
- plugin_specification=plugin_specification)
+ plugin_specification=plugin_specification,
+ inputs=inputs)
)
node.interfaces[interface.name] = interface
ctx.model.node.update(node)
- filename = 'test_file'
- content = 'file content'
- inputs = {'filename': filename, 'content': content}
-
@workflow
def basic_workflow(graph, **_):
graph.add_tasks(api.task.OperationTask.for_node(node=node,
@@ -278,21 +280,22 @@ def test_node_operation_logging(ctx, executor):
interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ inputs = {
+ 'op_start': 'op_start',
+ 'op_end': 'op_end',
+ }
interface = mock.models.create_interface(
node.service,
interface_name,
operation_name,
operation_kwargs=dict(
- implementation=op_path(logged_operation, module_path=__name__))
+ implementation=op_path(logged_operation, module_path=__name__),
+ inputs=inputs)
)
node.interfaces[interface.name] = interface
ctx.model.node.update(node)
- inputs = {
- 'op_start': 'op_start',
- 'op_end': 'op_end',
- }
-
@workflow
def basic_workflow(graph, **_):
graph.add_tasks(
@@ -312,20 +315,20 @@ def test_relationship_operation_logging(ctx, executor):
interface_name, operation_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[0]
relationship = ctx.model.relationship.list()[0]
+ inputs = {
+ 'op_start': 'op_start',
+ 'op_end': 'op_end',
+ }
interface = mock.models.create_interface(
relationship.source_node.service,
interface_name,
operation_name,
- operation_kwargs=dict(implementation=op_path(logged_operation, module_path=__name__))
+ operation_kwargs=dict(implementation=op_path(logged_operation, module_path=__name__),
+ inputs=inputs)
)
relationship.interfaces[interface.name] = interface
ctx.model.relationship.update(relationship)
- inputs = {
- 'op_start': 'op_start',
- 'op_end': 'op_end',
- }
-
@workflow
def basic_workflow(graph, **_):
graph.add_tasks(
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/context/test_resource_render.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_resource_render.py b/tests/orchestrator/context/test_resource_render.py
index 696e9b3..8113746 100644
--- a/tests/orchestrator/context/test_resource_render.py
+++ b/tests/orchestrator/context/test_resource_render.py
@@ -64,9 +64,9 @@ def resources(tmpdir, ctx):
implicit_ctx_template_path.write(_IMPLICIT_CTX_TEMPLATE)
variables_template_path = tmpdir.join(_VARIABLES_TEMPLATE_PATH)
variables_template_path.write(_VARIABLES_TEMPLATE)
- ctx.resource.deployment.upload(entry_id='1',
+ ctx.resource.service.upload(entry_id='1',
source=str(implicit_ctx_template_path),
path=_IMPLICIT_CTX_TEMPLATE_PATH)
- ctx.resource.deployment.upload(entry_id='1',
+ ctx.resource.service.upload(entry_id='1',
source=str(variables_template_path),
path=_VARIABLES_TEMPLATE_PATH)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index db45e8e..5fdb674 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -34,7 +34,7 @@ def test_serialize_operation_context(context, executor, tmpdir):
test_file = tmpdir.join(TEST_FILE_NAME)
test_file.write(TEST_FILE_CONTENT)
resource = context.resource
- resource.blueprint.upload(TEST_FILE_ENTRY_ID, str(test_file))
+ resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file))
graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
eng.execute()
@@ -73,7 +73,7 @@ def _mock_operation(ctx):
# a correct ctx.deployment.name tells us we kept the correct deployment_id
assert ctx.service.name == mock.models.SERVICE_NAME
# Here we test that the resource storage was properly re-created
- test_file_content = ctx.resource.blueprint.read(TEST_FILE_ENTRY_ID, TEST_FILE_NAME)
+ test_file_content = ctx.resource.service_template.read(TEST_FILE_ENTRY_ID, TEST_FILE_NAME)
assert test_file_content == TEST_FILE_CONTENT
# a non empty plugin workdir tells us that we kept the correct base_workdir
assert ctx.plugin_workdir is not None
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/context/test_toolbelt.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py
index cf82127..213d964 100644
--- a/tests/orchestrator/context/test_toolbelt.py
+++ b/tests/orchestrator/context/test_toolbelt.py
@@ -76,15 +76,16 @@ def test_host_ip(workflow_context, executor):
interface_name = 'Standard'
operation_name = 'create'
_, dependency_node, _, _, _ = _get_elements(workflow_context)
+ inputs = {'putput': True}
interface = mock.models.create_interface(
dependency_node.service,
interface_name=interface_name,
operation_name=operation_name,
- operation_kwargs=dict(implementation=op_path(host_ip, module_path=__name__))
+ operation_kwargs=dict(implementation=op_path(host_ip, module_path=__name__),
+ inputs=inputs)
)
dependency_node.interfaces[interface.name] = interface
workflow_context.model.node.update(dependency_node)
- inputs = {'putput': True}
@workflow
def basic_workflow(graph, **_):
@@ -106,17 +107,17 @@ def test_relationship_tool_belt(workflow_context, executor):
interface_name = 'Configure'
operation_name = 'post_configure'
_, _, _, _, relationship = _get_elements(workflow_context)
+ inputs = {'putput': True}
interface = mock.models.create_interface(
relationship.source_node.service,
interface_name=interface_name,
operation_name=operation_name,
- operation_kwargs=dict(implementation=op_path(relationship_operation, module_path=__name__))
+ operation_kwargs=dict(implementation=op_path(relationship_operation, module_path=__name__),
+ inputs=inputs)
)
relationship.interfaces[interface.name] = interface
workflow_context.model.relationship.update(relationship)
- inputs = {'putput': True}
-
@workflow
def basic_workflow(graph, **_):
graph.add_tasks(
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/context/test_workflow.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_workflow.py b/tests/orchestrator/context/test_workflow.py
index fa1f387..3c35435 100644
--- a/tests/orchestrator/context/test_workflow.py
+++ b/tests/orchestrator/context/test_workflow.py
@@ -35,7 +35,7 @@ class TestWorkflowContext(object):
assert execution.service_template == storage.service_template.get_by_name(
models.SERVICE_TEMPLATE_NAME)
assert execution.status == storage.execution.model_cls.PENDING
- assert execution.parameters == {}
+ assert execution.inputs == {}
assert execution.created_at <= datetime.utcnow()
def test_subsequent_workflow_context_creation_do_not_fail(self, storage):
@@ -49,11 +49,13 @@ class TestWorkflowContext(object):
:param storage:
:return WorkflowContext:
"""
+ service = storage.service.get_by_name(models.SERVICE_NAME)
return context.workflow.WorkflowContext(
name='simple_context',
model_storage=storage,
resource_storage=None,
- service_id=storage.service.get_by_name(models.SERVICE_NAME).id,
+ service_id=service,
+ execution_id=storage.execution.list(filters=dict(service=service))[0].id,
workflow_name=models.WORKFLOW_NAME,
task_max_attempts=models.TASK_MAX_ATTEMPTS,
task_retry_interval=models.TASK_RETRY_INTERVAL
@@ -66,6 +68,8 @@ def storage():
sql_mapi.SQLAlchemyModelAPI, initiator=test_storage.init_inmemory_model_storage)
workflow_storage.service_template.put(models.create_service_template())
service_template = workflow_storage.service_template.get_by_name(models.SERVICE_TEMPLATE_NAME)
- workflow_storage.service.put(models.create_service(service_template))
+ service = models.create_service(service_template)
+ workflow_storage.service.put(service)
+ workflow_storage.execution.put(models.create_execution(service))
yield workflow_storage
test_storage.release_sqlite_storage(workflow_storage)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index e3612cf..67d527c 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -460,14 +460,15 @@ if __name__ == '__main__':
env_var='value',
inputs=None):
local_script_path = script_path
- script_path = os.path.basename(local_script_path) if local_script_path else None
+ script_path = os.path.basename(local_script_path) if local_script_path else ''
+ inputs = inputs or {}
+ process = process or {}
if script_path:
- workflow_context.resource.deployment.upload(
+ workflow_context.resource.service.upload(
entry_id=str(workflow_context.service.id),
source=local_script_path,
path=script_path)
- inputs = inputs or {}
inputs.update({
'script_path': script_path,
'process': process,
@@ -483,7 +484,8 @@ if __name__ == '__main__':
'op',
operation_kwargs=dict(implementation='{0}.{1}'.format(
operations.__name__,
- operations.run_script_locally.__name__))
+ operations.run_script_locally.__name__),
+ inputs=inputs)
)
node.interfaces[interface.name] = interface
graph.add_tasks(api.task.OperationTask.for_node(
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index dd36466..d17def1 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -258,7 +258,7 @@ class TestWithActualSSHServer(object):
return collected[signal][0]['kwargs']['exception']
def _upload(self, source, path):
- self._workflow_context.resource.deployment.upload(
+ self._workflow_context.resource.service.upload(
entry_id=str(self._workflow_context.service.id),
source=source,
path=path)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/workflows/builtin/test_execute_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/builtin/test_execute_operation.py b/tests/orchestrator/workflows/builtin/test_execute_operation.py
index 360e17d..41e2b6b 100644
--- a/tests/orchestrator/workflows/builtin/test_execute_operation.py
+++ b/tests/orchestrator/workflows/builtin/test_execute_operation.py
@@ -34,7 +34,8 @@ def test_execute_operation(ctx):
interface = mock.models.create_interface(
ctx.service,
interface_name,
- operation_name
+ operation_name,
+ operation_kwargs={'implementation': 'stub-implementation'}
)
node.interfaces[interface.name] = interface
ctx.model.node.update(node)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index 6f97952..e793c49 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -61,12 +61,18 @@ class BaseTest(object):
retry_interval=None,
ignore_failure=None):
node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ operation_kwargs = dict(implementation='{name}.{func.__name__}'.format(
+ name=__name__, func=func))
+ if inputs:
+ # the operation has to declare the inputs before those may be passed
+ operation_kwargs['inputs'] = inputs
+
interface = mock.models.create_interface(
node.service,
'aria.interfaces.lifecycle',
'create',
- operation_kwargs=dict(implementation='{name}.{func.__name__}'.format(name=__name__,
- func=func))
+ operation_kwargs=operation_kwargs
)
node.interfaces[interface.name] = interface
return api.task.OperationTask.for_node(
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py
deleted file mode 100644
index 0a95d43..0000000
--- a/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py
+++ /dev/null
@@ -1,111 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from networkx import topological_sort, DiGraph
-
-from aria.orchestrator import context
-from aria.orchestrator.workflows import api, core
-
-from tests import mock
-from tests import storage
-
-
-def test_task_graph_into_execution_graph(tmpdir):
- interface_name = 'Standard'
- operation_name = 'create'
- task_context = mock.context.simple(str(tmpdir))
- node = task_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
- interface = mock.models.create_interface(
- node.service,
- interface_name,
- operation_name
- )
- node.interfaces[interface.name] = interface
- task_context.model.node.update(node)
-
- def sub_workflow(name, **_):
- return api.task_graph.TaskGraph(name)
-
- with context.workflow.current.push(task_context):
- test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph')
- simple_before_task = api.task.OperationTask.for_node(node=node,
- interface_name=interface_name,
- operation_name=operation_name)
- simple_after_task = api.task.OperationTask.for_node(node=node,
- interface_name=interface_name,
- operation_name=operation_name)
-
- inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph')
- inner_task = api.task.OperationTask.for_node(node=node,
- interface_name=interface_name,
- operation_name=operation_name)
- inner_task_graph.add_tasks(inner_task)
-
- test_task_graph.add_tasks(simple_before_task)
- test_task_graph.add_tasks(simple_after_task)
- test_task_graph.add_tasks(inner_task_graph)
- test_task_graph.add_dependency(inner_task_graph, simple_before_task)
- test_task_graph.add_dependency(simple_after_task, inner_task_graph)
-
- # Direct check
- execution_graph = DiGraph()
- core.translation.build_execution_graph(task_graph=test_task_graph,
- execution_graph=execution_graph)
- execution_tasks = topological_sort(execution_graph)
-
- assert len(execution_tasks) == 7
-
- expected_tasks_names = [
- '{0}-Start'.format(test_task_graph.id),
- simple_before_task.id,
- '{0}-Start'.format(inner_task_graph.id),
- inner_task.id,
- '{0}-End'.format(inner_task_graph.id),
- simple_after_task.id,
- '{0}-End'.format(test_task_graph.id)
- ]
-
- assert expected_tasks_names == execution_tasks
-
- assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph),
- core.task.StartWorkflowTask)
-
- _assert_execution_is_api_task(_get_task_by_name(execution_tasks[1], execution_graph),
- simple_before_task)
- assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph),
- core.task.StartSubWorkflowTask)
-
- _assert_execution_is_api_task(_get_task_by_name(execution_tasks[3], execution_graph),
- inner_task)
- assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph),
- core.task.EndSubWorkflowTask)
-
- _assert_execution_is_api_task(_get_task_by_name(execution_tasks[5], execution_graph),
- simple_after_task)
- assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph),
- core.task.EndWorkflowTask)
- storage.release_sqlite_storage(task_context.model)
-
-
-def _assert_execution_is_api_task(execution_task, api_task):
- assert execution_task.id == api_task.id
- assert execution_task.name == api_task.name
- assert execution_task.implementation == api_task.implementation
- assert execution_task.actor == api_task.actor
- assert execution_task.inputs == api_task.inputs
-
-
-def _get_task_by_name(task_name, graph):
- return graph.node[task_name]['task']
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
new file mode 100644
index 0000000..0a95d43
--- /dev/null
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from networkx import topological_sort, DiGraph
+
+from aria.orchestrator import context
+from aria.orchestrator.workflows import api, core
+
+from tests import mock
+from tests import storage
+
+
+def test_task_graph_into_execution_graph(tmpdir):
+ interface_name = 'Standard'
+ operation_name = 'create'
+ task_context = mock.context.simple(str(tmpdir))
+ node = task_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface = mock.models.create_interface(
+ node.service,
+ interface_name,
+ operation_name
+ )
+ node.interfaces[interface.name] = interface
+ task_context.model.node.update(node)
+
+ def sub_workflow(name, **_):
+ return api.task_graph.TaskGraph(name)
+
+ with context.workflow.current.push(task_context):
+ test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph')
+ simple_before_task = api.task.OperationTask.for_node(node=node,
+ interface_name=interface_name,
+ operation_name=operation_name)
+ simple_after_task = api.task.OperationTask.for_node(node=node,
+ interface_name=interface_name,
+ operation_name=operation_name)
+
+ inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph')
+ inner_task = api.task.OperationTask.for_node(node=node,
+ interface_name=interface_name,
+ operation_name=operation_name)
+ inner_task_graph.add_tasks(inner_task)
+
+ test_task_graph.add_tasks(simple_before_task)
+ test_task_graph.add_tasks(simple_after_task)
+ test_task_graph.add_tasks(inner_task_graph)
+ test_task_graph.add_dependency(inner_task_graph, simple_before_task)
+ test_task_graph.add_dependency(simple_after_task, inner_task_graph)
+
+ # Direct check
+ execution_graph = DiGraph()
+ core.translation.build_execution_graph(task_graph=test_task_graph,
+ execution_graph=execution_graph)
+ execution_tasks = topological_sort(execution_graph)
+
+ assert len(execution_tasks) == 7
+
+ expected_tasks_names = [
+ '{0}-Start'.format(test_task_graph.id),
+ simple_before_task.id,
+ '{0}-Start'.format(inner_task_graph.id),
+ inner_task.id,
+ '{0}-End'.format(inner_task_graph.id),
+ simple_after_task.id,
+ '{0}-End'.format(test_task_graph.id)
+ ]
+
+ assert expected_tasks_names == execution_tasks
+
+ assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph),
+ core.task.StartWorkflowTask)
+
+ _assert_execution_is_api_task(_get_task_by_name(execution_tasks[1], execution_graph),
+ simple_before_task)
+ assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph),
+ core.task.StartSubWorkflowTask)
+
+ _assert_execution_is_api_task(_get_task_by_name(execution_tasks[3], execution_graph),
+ inner_task)
+ assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph),
+ core.task.EndSubWorkflowTask)
+
+ _assert_execution_is_api_task(_get_task_by_name(execution_tasks[5], execution_graph),
+ simple_after_task)
+ assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph),
+ core.task.EndWorkflowTask)
+ storage.release_sqlite_storage(task_context.model)
+
+
+def _assert_execution_is_api_task(execution_task, api_task):
+ assert execution_task.id == api_task.id
+ assert execution_task.name == api_task.name
+ assert execution_task.implementation == api_task.implementation
+ assert execution_task.actor == api_task.actor
+ assert execution_task.inputs == api_task.inputs
+
+
+def _get_task_by_name(task_name, graph):
+ return graph.node[task_name]['task']
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/utils/test_threading.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_threading.py b/tests/utils/test_threading.py
new file mode 100644
index 0000000..39ce717
--- /dev/null
+++ b/tests/utils/test_threading.py
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import pytest
+
+from aria.utils import threading
+
+
+class TestPluginManager(object):
+
+ def test_exception_raised_from_thread(self):
+
+ def error_raising_func():
+ raise ValueError('This is an error')
+
+ thread = threading.ExceptionThread(target=error_raising_func)
+ thread.start()
+ thread.join()
+
+ assert thread.is_error()
+ with pytest.raises(ValueError):
+ thread.raise_error_if_exists()