You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by ra...@apache.org on 2017/04/19 13:37:32 UTC
[3/5] incubator-ariatosca git commit: ARIA-48 Revamped ARIA CLI
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/modeling/service_template.py
----------------------------------------------------------------------
diff --git a/aria/modeling/service_template.py b/aria/modeling/service_template.py
index 51fea2f..f1c2bcb 100644
--- a/aria/modeling/service_template.py
+++ b/aria/modeling/service_template.py
@@ -280,7 +280,7 @@ class ServiceTemplateBase(TemplateModelMixin):
('interface_types', formatting.as_raw(self.interface_types)),
('artifact_types', formatting.as_raw(self.artifact_types))))
- def instantiate(self, container):
+ def instantiate(self, container, model_storage, inputs=None): # pylint: disable=arguments-differ
from . import models
context = ConsumptionContext.get_thread_local()
now = datetime.now()
@@ -288,13 +288,14 @@ class ServiceTemplateBase(TemplateModelMixin):
updated_at=now,
description=deepcopy_with_locators(self.description),
service_template=self)
- #service.name = '{0}_{1}'.format(self.name, service.id)
-
context.modeling.instance = service
+ service.inputs = utils.create_inputs(inputs or {}, self.inputs)
+ # TODO: now that we have inputs, we should scan properties and inputs and evaluate functions
+
for plugin_specification in self.plugin_specifications.itervalues():
if plugin_specification.enabled:
- if plugin_specification.resolve():
+ if plugin_specification.resolve(model_storage):
plugin = plugin_specification.plugin
service.plugins[plugin.name] = plugin
else:
@@ -316,15 +317,8 @@ class ServiceTemplateBase(TemplateModelMixin):
if self.substitution_template is not None:
service.substitution = self.substitution_template.instantiate(container)
- utils.instantiate_dict(self, service.inputs, self.inputs)
utils.instantiate_dict(self, service.outputs, self.outputs)
- for name, the_input in context.modeling.inputs.iteritems():
- if name not in service.inputs:
- context.validation.report('input "{0}" is not supported'.format(name))
- else:
- service.inputs[name].value = the_input
-
return service
def validate(self):
@@ -448,8 +442,7 @@ class NodeTemplateBase(TemplateModelMixin):
__tablename__ = 'node_template'
__private_fields__ = ['type_fk',
- 'service_template_fk',
- 'service_template_name']
+ 'service_template_fk']
# region foreign_keys
@@ -472,6 +465,11 @@ class NodeTemplateBase(TemplateModelMixin):
"""Required for use by SQLAlchemy queries"""
return association_proxy('service_template', 'name')
+ @declared_attr
+ def type_name(cls):
+ """Required for use by SQLAlchemy queries"""
+ return association_proxy('type', 'name')
+
# endregion
# region one_to_one relationships
@@ -558,6 +556,7 @@ class NodeTemplateBase(TemplateModelMixin):
type=self.type,
description=deepcopy_with_locators(self.description),
state=models.Node.INITIAL,
+ runtime_properties={},
node_template=self)
utils.instantiate_dict(node, node.properties, self.properties)
utils.instantiate_dict(node, node.interfaces, self.interface_templates)
@@ -1238,7 +1237,8 @@ class RequirementTemplateBase(TemplateModelMixin):
# Find first node that matches the type
elif self.target_node_type is not None:
- for target_node_template in context.modeling.template.node_templates.itervalues():
+ for target_node_template in \
+ self.node_template.service_template.node_templates.values():
if self.target_node_type.get_descendant(target_node_template.type.name) is None:
continue
@@ -1865,16 +1865,22 @@ class OperationTemplateBase(TemplateModelMixin):
def instantiate(self, container):
from . import models
- if self.plugin_specification and self.plugin_specification.enabled:
- plugin = self.plugin_specification.plugin
- implementation = self.implementation if plugin is not None else None
- # "plugin" would be none if a match was not found. In that case, a validation error
- # should already have been reported in ServiceTemplateBase.instantiate, so we will
- # continue silently here
+ if self.plugin_specification:
+ if self.plugin_specification.enabled:
+ plugin = self.plugin_specification.plugin
+ implementation = self.implementation if plugin is not None else None
+ # "plugin" would be none if a match was not found. In that case, a validation error
+ # should already have been reported in ServiceTemplateBase.instantiate, so we will
+ # continue silently here
+ else:
+ # If the plugin is disabled, the operation should be disabled, too
+ plugin = None
+ implementation = None
else:
- # If the plugin is disabled, the operation should be disabled, too
+ # using the execution plugin
plugin = None
- implementation = None
+ implementation = self.implementation
+
operation = models.Operation(name=self.name,
description=deepcopy_with_locators(self.description),
relationship_edge=self.relationship_edge,
@@ -2120,25 +2126,16 @@ class PluginSpecificationBase(TemplateModelMixin):
def coerce_values(self, container, report_issues):
pass
- def resolve(self):
+ def resolve(self, model_storage):
# TODO: we are planning a separate "instantiation" module where this will be called or
- # moved to. There, we will probably have a context with a storage manager. Until then,
- # this is the only potentially available context, which of course will only be available
- # if we're in a workflow.
- from ..orchestrator import context
- try:
- workflow_context = context.workflow.current.get()
- plugins = workflow_context.model.plugin.list()
- except context.exceptions.ContextException:
- plugins = None
-
+ # moved to.
+ plugins = model_storage.plugin.list()
matching_plugins = []
- if plugins:
- for plugin in plugins:
- # TODO: we need to use a version comparator
- if (plugin.name == self.name) and \
+ for plugin in plugins:
+ # TODO: we need to use a version comparator
+ if (plugin.name == self.name) and \
((self.version is None) or (plugin.package_version >= self.version)):
- matching_plugins.append(plugin)
+ matching_plugins.append(plugin)
self.plugin = None
if matching_plugins:
# Return highest version of plugin
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/modeling/utils.py
----------------------------------------------------------------------
diff --git a/aria/modeling/utils.py b/aria/modeling/utils.py
index 0b4015c..91d7b9c 100644
--- a/aria/modeling/utils.py
+++ b/aria/modeling/utils.py
@@ -13,12 +13,100 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import os
+from json import JSONEncoder
+from StringIO import StringIO
+
+from . import exceptions
from ..parser.consumption import ConsumptionContext
from ..parser.exceptions import InvalidValueError
from ..parser.presentation import Value
from ..utils.collections import OrderedDict
from ..utils.console import puts
-from .exceptions import CannotEvaluateFunctionException
+from ..utils.type import validate_value_type
+
+
+class ModelJSONEncoder(JSONEncoder):
+ def default(self, o): # pylint: disable=method-hidden
+ from .mixins import ModelMixin
+ if isinstance(o, ModelMixin):
+ if hasattr(o, 'value'):
+ dict_to_return = o.to_dict(fields=('value',))
+ return dict_to_return['value']
+ else:
+ return o.to_dict()
+ else:
+ return JSONEncoder.default(self, o)
+
+
+def create_inputs(inputs, template_inputs):
+ """
+ :param inputs: key-value dict
+ :param template_inputs: parameter name to parameter object dict
+ :return: dict of parameter name to Parameter models
+ """
+ merged_inputs = _merge_and_validate_inputs(inputs, template_inputs)
+
+ from . import models
+ input_models = []
+ for input_name, input_val in merged_inputs.iteritems():
+ parameter = models.Parameter(
+ name=input_name,
+ type_name=template_inputs[input_name].type_name,
+ description=template_inputs[input_name].description,
+ value=input_val)
+ input_models.append(parameter)
+
+ return dict((inp.name, inp) for inp in input_models)
+
+
+def _merge_and_validate_inputs(inputs, template_inputs):
+ """
+ :param inputs: key-value dict
+ :param template_inputs: parameter name to parameter object dict
+ :return:
+ """
+ merged_inputs = inputs.copy()
+
+ missing_inputs = []
+ wrong_type_inputs = {}
+ for input_name, input_template in template_inputs.iteritems():
+ if input_name not in inputs:
+ if input_template.value is not None:
+ merged_inputs[input_name] = input_template.value # apply default value
+ else:
+ missing_inputs.append(input_name)
+ else:
+ # Validate input type
+ try:
+ validate_value_type(inputs[input_name], input_template.type_name)
+ except ValueError:
+ wrong_type_inputs[input_name] = input_template.type_name
+ except RuntimeError:
+ # TODO: This error shouldn't be raised (or caught), but right now we lack support
+ # for custom data_types, which will raise this error. Skipping their validation.
+ pass
+
+ if missing_inputs:
+ raise exceptions.MissingRequiredInputsException(
+ 'Required inputs {0} have not been specified - expected inputs: {1}'
+ .format(missing_inputs, template_inputs.keys()))
+
+ if wrong_type_inputs:
+ error_message = StringIO()
+ for param_name, param_type in wrong_type_inputs.iteritems():
+ error_message.write('Input "{0}" must be of type {1}{2}'
+ .format(param_name, param_type, os.linesep))
+ raise exceptions.InputsOfWrongTypeException(error_message.getvalue())
+
+ undeclared_inputs = [input_name for input_name in inputs.keys()
+ if input_name not in template_inputs]
+ if undeclared_inputs:
+ raise exceptions.UndeclaredInputsException(
+ 'Undeclared inputs have been specified: {0}; Expected inputs: {1}'
+ .format(undeclared_inputs, template_inputs.keys()))
+
+ return merged_inputs
def coerce_value(container, value, report_issues=False):
@@ -35,7 +123,7 @@ def coerce_value(container, value, report_issues=False):
try:
value = value._evaluate(context, container)
value = coerce_value(container, value, report_issues)
- except CannotEvaluateFunctionException:
+ except exceptions.CannotEvaluateFunctionException:
pass
except InvalidValueError as e:
if report_issues:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 127641f..15843db 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -19,7 +19,6 @@ A common context for both workflow and operation
import logging
from contextlib import contextmanager
-from datetime import datetime
from functools import partial
import jinja2
@@ -55,6 +54,7 @@ class BaseContext(object):
self,
name,
service_id,
+ execution_id,
model_storage,
resource_storage,
workdir=None,
@@ -65,27 +65,17 @@ class BaseContext(object):
self._model = model_storage
self._resource = resource_storage
self._service_id = service_id
+ self._execution_id = execution_id
self._workdir = workdir
self.logger = None
- def _create_execution(self):
- now = datetime.utcnow()
- execution = self.model.execution.model_cls(
- service_instance=self.service_instance,
- workflow_name=self._workflow_name,
- created_at=now,
- parameters=self.parameters,
- )
- self.model.execution.put(execution)
- return execution.id
-
- def _register_logger(self, logger_name=None, level=None, task_id=None):
- self.logger = self.PrefixedLogger(logging.getLogger(logger_name or self.__class__.__name__),
- self.logging_id,
- task_id=task_id)
- self.logger.addHandler(aria_logger.create_console_log_handler())
- self.logger.addHandler(self._get_sqla_handler())
+ def _register_logger(self, level=None, task_id=None):
+ self.logger = self.PrefixedLogger(
+ logging.getLogger(aria_logger.TASK_LOGGER_NAME), self.logging_id, task_id=task_id)
self.logger.setLevel(level or logging.DEBUG)
+ if not self.logger.handlers:
+ self.logger.addHandler(aria_logger.create_console_log_handler())
+ self.logger.addHandler(self._get_sqla_handler())
def _get_sqla_handler(self):
api_kwargs = {}
@@ -168,13 +158,13 @@ class BaseContext(object):
Download a blueprint resource from the resource storage
"""
try:
- self.resource.deployment.download(entry_id=str(self.service.id),
- destination=destination,
- path=path)
+ self.resource.service.download(entry_id=str(self.service.id),
+ destination=destination,
+ path=path)
except exceptions.StorageError:
- self.resource.blueprint.download(entry_id=str(self.service_template.id),
- destination=destination,
- path=path)
+ self.resource.service_template.download(entry_id=str(self.service_template.id),
+ destination=destination,
+ path=path)
def download_resource_and_render(self, destination, path=None, variables=None):
"""
@@ -193,9 +183,10 @@ class BaseContext(object):
Read a deployment resource as string from the resource storage
"""
try:
- return self.resource.deployment.read(entry_id=str(self.service.id), path=path)
+ return self.resource.service.read(entry_id=str(self.service.id), path=path)
except exceptions.StorageError:
- return self.resource.deployment.read(entry_id=str(self.service_template.id), path=path)
+ return self.resource.service_template.read(entry_id=str(self.service_template.id),
+ path=path)
def get_resource_and_render(self, path=None, variables=None):
"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index cbd186c..c7d8246 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -36,7 +36,6 @@ class BaseOperationContext(BaseContext):
service_id,
task_id,
actor_id,
- execution_id,
**kwargs):
super(BaseOperationContext, self).__init__(
name=name,
@@ -47,7 +46,6 @@ class BaseOperationContext(BaseContext):
self._task_id = task_id
self._actor_id = actor_id
self._thread_local = threading.local()
- self._execution_id = execution_id
self._register_logger(task_id=self.task.id)
def __repr__(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index 5f86d9d..667d22f 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -19,7 +19,6 @@ Workflow and operation contexts
import threading
from contextlib import contextmanager
-from datetime import datetime
from .exceptions import ContextException
from .common import BaseContext
@@ -35,36 +34,21 @@ class WorkflowContext(BaseContext):
task_max_attempts=1,
task_retry_interval=0,
task_ignore_failure=False,
- execution_id=None,
*args, **kwargs):
super(WorkflowContext, self).__init__(*args, **kwargs)
self._workflow_name = workflow_name
- self.parameters = parameters or {}
+ self._parameters = parameters or {}
self._task_max_attempts = task_max_attempts
self._task_retry_interval = task_retry_interval
self._task_ignore_failure = task_ignore_failure
- # TODO: execution creation should happen somewhere else
- # should be moved there, when such logical place exists
- self._execution_id = execution_id or self._create_execution()
self._register_logger()
def __repr__(self):
return (
'{name}(deployment_id={self._service_id}, '
- 'workflow_name={self._workflow_name}'.format(
+ 'workflow_name={self._workflow_name}, execution_id={self._execution_id})'.format(
name=self.__class__.__name__, self=self))
- def _create_execution(self):
- now = datetime.utcnow()
- execution = self.model.execution.model_cls(
- service=self.service,
- workflow_name=self._workflow_name,
- created_at=now,
- parameters=self.parameters,
- )
- self.model.execution.put(execution)
- return execution.id
-
@property
def logging_id(self):
return '{0}[{1}]'.format(self._workflow_name, self._execution_id)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py
index c00b66b..8d3dcc6 100644
--- a/aria/orchestrator/exceptions.py
+++ b/aria/orchestrator/exceptions.py
@@ -25,6 +25,13 @@ class OrchestratorError(AriaError):
pass
+class InvalidPluginError(AriaError):
+ """
+ Raised when an invalid plugin is validated unsuccessfully
+ """
+ pass
+
+
class PluginAlreadyExistsError(AriaError):
"""
Raised when a plugin with the same package name and package version already exists
@@ -46,3 +53,24 @@ class TaskAbortException(RuntimeError):
Used internally when ctx.task.abort is called
"""
pass
+
+
+class UndeclaredWorkflowError(AriaError):
+ """
+ Raised when attempting to execute an undeclared workflow
+ """
+ pass
+
+
+class ActiveExecutionsError(AriaError):
+ """
+ Raised when attempting to execute a workflow on a service which already has an active execution
+ """
+ pass
+
+
+class WorkflowImplementationNotFoundError(AriaError):
+ """
+ Raised when attempting to import a workflow's code but the implementation is not found
+ """
+ pass
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/execution_plugin/ctx_proxy/server.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
index 817d064..52a5312 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
@@ -24,6 +24,7 @@ import StringIO
import wsgiref.simple_server
import bottle
+from aria import modeling
from .. import exceptions
@@ -111,7 +112,7 @@ class CtxProxy(object):
result = json.dumps({
'type': result_type,
'payload': payload
- })
+ }, cls=modeling.utils.ModelJSONEncoder)
except Exception as e:
traceback_out = StringIO.StringIO()
traceback.print_exc(file=traceback_out)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/execution_plugin/instantiation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/instantiation.py b/aria/orchestrator/execution_plugin/instantiation.py
index 960835c..7627a38 100644
--- a/aria/orchestrator/execution_plugin/instantiation.py
+++ b/aria/orchestrator/execution_plugin/instantiation.py
@@ -27,7 +27,7 @@ def configure_operation(operation):
arguments = OrderedDict()
arguments['script_path'] = operation.implementation
arguments['process'] = _get_process(configuration.pop('process')) \
- if 'process' in configuration else None
+ if 'process' in configuration else dict()
host = None
interface = operation.interface
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/plugin.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/plugin.py b/aria/orchestrator/plugin.py
index d815754..f99666c 100644
--- a/aria/orchestrator/plugin.py
+++ b/aria/orchestrator/plugin.py
@@ -17,6 +17,7 @@ import os
import tempfile
import subprocess
import sys
+import zipfile
from datetime import datetime
import wagon
@@ -43,11 +44,11 @@ class PluginManager(object):
os_props = metadata['build_server_os_properties']
plugin = cls(
+ name=metadata['package_name'],
archive_name=metadata['archive_name'],
supported_platform=metadata['supported_platform'],
supported_py_versions=metadata['supported_python_versions'],
- # Remove suffix colon after upgrading wagon to > 0.5.0
- distribution=os_props.get('distribution:') or os_props.get('distribution'),
+ distribution=os_props.get('distribution'),
distribution_release=os_props['distribution_version'],
distribution_version=os_props['distribution_release'],
package_name=metadata['package_name'],
@@ -70,6 +71,28 @@ class PluginManager(object):
self._plugins_dir,
'{0}-{1}'.format(plugin.package_name, plugin.package_version))
+ @staticmethod
+ def validate_plugin(source):
+ """
+ validate a plugin archive.
+ A valid plugin is a wagon (http://github.com/cloudify-cosmo/wagon)
+ in the zip format (suffix may also be .wgn).
+ """
+ if not zipfile.is_zipfile(source):
+ raise exceptions.InvalidPluginError(
+ 'Archive {0} is of an unsupported type. Only '
+ 'zip/wgn is allowed'.format(source))
+ with zipfile.ZipFile(source, 'r') as zip_file:
+ infos = zip_file.infolist()
+ try:
+ package_name = infos[0].filename[:infos[0].filename.index('/')]
+ package_json_path = "{0}/{1}".format(package_name, 'package.json')
+ zip_file.getinfo(package_json_path)
+ except (KeyError, ValueError, IndexError):
+ raise exceptions.InvalidPluginError(
+ 'Failed to validate plugin {0} '
+ '(package.json was not found in archive)'.format(source))
+
def _install_wagon(self, source, prefix):
pip_freeze_output = self._pip_freeze()
file_descriptor, constraint_path = tempfile.mkstemp(prefix='constraint-', suffix='.txt')
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/runner.py b/aria/orchestrator/runner.py
deleted file mode 100644
index f1633fa..0000000
--- a/aria/orchestrator/runner.py
+++ /dev/null
@@ -1,101 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Workflow runner
-"""
-
-import tempfile
-import os
-
-from .context.workflow import WorkflowContext
-from .workflows.core.engine import Engine
-from .workflows.executor.thread import ThreadExecutor
-from ..storage import (
- sql_mapi,
- filesystem_rapi,
-)
-from .. import (
- application_model_storage,
- application_resource_storage
-)
-
-
-class Runner(object):
- """
- Runs workflows on a deployment. By default uses temporary storage (either on disk or in memory)
- but can also be used with existing storage.
-
- Handles the initialization of the storage engine and provides convenience methods for
- sub-classes to create tasks.
-
- :param path: path to Sqlite database file; use '' (the default) to use a temporary file,
- and None to use an in-memory database
- :type path: string
- """
-
- def __init__(self, workflow_name, workflow_fn, inputs, initialize_model_storage_fn,
- service_id_fn, storage_path='', is_storage_temporary=True):
- if storage_path == '':
- # Temporary file storage
- the_file, storage_path = tempfile.mkstemp(suffix='.db', prefix='aria-')
- os.close(the_file)
-
- self._storage_path = storage_path
- self._storage_dir = os.path.dirname(storage_path)
- self._storage_name = os.path.basename(storage_path)
- self._is_storage_temporary = is_storage_temporary
-
- workflow_context = self.create_workflow_context(workflow_name, initialize_model_storage_fn,
- service_id_fn)
-
- tasks_graph = workflow_fn(ctx=workflow_context, **inputs)
-
- self._engine = Engine(
- executor=ThreadExecutor(),
- workflow_context=workflow_context,
- tasks_graph=tasks_graph)
-
- def run(self):
- try:
- self._engine.execute()
- finally:
- self.cleanup()
-
- def create_workflow_context(self,
- workflow_name,
- initialize_model_storage_fn,
- service_id_fn):
- self.cleanup()
- model_storage = application_model_storage(
- sql_mapi.SQLAlchemyModelAPI,
- initiator_kwargs=dict(base_dir=self._storage_dir, filename=self._storage_name))
- if initialize_model_storage_fn:
- initialize_model_storage_fn(model_storage)
- resource_storage = application_resource_storage(
- filesystem_rapi.FileSystemResourceAPI, api_kwargs=dict(directory='.'))
- return WorkflowContext(
- name=workflow_name,
- model_storage=model_storage,
- resource_storage=resource_storage,
- service_id=service_id_fn(),
- workflow_name=self.__class__.__name__,
- task_max_attempts=1,
- task_retry_interval=1)
-
- def cleanup(self):
- if (self._is_storage_temporary and (self._storage_path is not None) and
- os.path.isfile(self._storage_path)):
- os.remove(self._storage_path)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
new file mode 100644
index 0000000..1ea60a1
--- /dev/null
+++ b/aria/orchestrator/workflow_runner.py
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow runner
+"""
+
+import os
+import sys
+from datetime import datetime
+
+from . import exceptions
+from .context.workflow import WorkflowContext
+from .workflows import builtin
+from .workflows.core.engine import Engine
+from .workflows.executor.process import ProcessExecutor
+from ..modeling import models
+from ..modeling import utils as modeling_utils
+from ..utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class WorkflowRunner(object):
+
+ def __init__(self, workflow_name, service_id, inputs,
+ model_storage, resource_storage, plugin_manager,
+ executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
+ task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
+ """
+ Manages a single workflow execution on a given service
+ :param workflow_name: Workflow name
+ :param service_id: Service id
+ :param inputs: A key-value dict of inputs for the execution
+ :param model_storage: Model storage
+ :param resource_storage: Resource storage
+ :param plugin_manager: Plugin manager
+ :param executor: Executor for tasks. Defaults to a ProcessExecutor instance.
+ :param task_max_attempts: Maximum attempts of repeating each failing task
+ :param task_retry_interval: Retry interval in between retry attempts of a failing task
+ """
+
+ self._model_storage = model_storage
+ self._resource_storage = resource_storage
+ self._workflow_name = workflow_name
+
+ # the IDs are stored rather than the models themselves, so this module could be used
+ # by several threads without raising errors on model objects shared between threads
+ self._service_id = service_id
+
+ self._validate_workflow_exists_for_service()
+
+ workflow_fn = self._get_workflow_fn()
+
+ execution = self._create_execution_model(inputs)
+ self._execution_id = execution.id
+
+ workflow_context = WorkflowContext(
+ name=self.__class__.__name__,
+ model_storage=self._model_storage,
+ resource_storage=resource_storage,
+ service_id=service_id,
+ execution_id=execution.id,
+ workflow_name=workflow_name,
+ task_max_attempts=task_max_attempts,
+ task_retry_interval=task_retry_interval)
+
+ # transforming the execution inputs to dict, to pass them to the workflow function
+ execution_inputs_dict = dict(inp.unwrap() for inp in self.execution.inputs.values())
+ self._tasks_graph = workflow_fn(ctx=workflow_context, **execution_inputs_dict)
+
+ executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
+ self._engine = Engine(
+ executor=executor,
+ workflow_context=workflow_context,
+ tasks_graph=self._tasks_graph)
+
+ @property
+ def execution(self):
+ return self._model_storage.execution.get(self._execution_id)
+
+ @property
+ def service(self):
+ return self._model_storage.service.get(self._service_id)
+
+ def execute(self):
+ self._engine.execute()
+
+ def cancel(self):
+ self._engine.cancel_execution()
+
+ def _create_execution_model(self, inputs):
+ execution = models.Execution(
+ created_at=datetime.utcnow(),
+ service=self.service,
+ workflow_name=self._workflow_name,
+ inputs={})
+
+ if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
+ workflow_inputs = dict() # built-in workflows don't have any inputs
+ else:
+ workflow_inputs = self.service.workflows[self._workflow_name].inputs
+
+ execution.inputs = modeling_utils.create_inputs(inputs, workflow_inputs)
+ # TODO: these two following calls should execute atomically
+ self._validate_no_active_executions(execution)
+ self._model_storage.execution.put(execution)
+ return execution
+
+ def _validate_workflow_exists_for_service(self):
+ if self._workflow_name not in self.service.workflows and \
+ self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
+ raise exceptions.UndeclaredWorkflowError(
+ 'No workflow policy {0} declared in service {1}'
+ .format(self._workflow_name, self.service.name))
+
+ def _validate_no_active_executions(self, execution):
+ active_executions = [e for e in self.service.executions if e.is_active()]
+ if active_executions:
+ raise exceptions.ActiveExecutionsError(
+ "Can't start execution; Service {0} has an active execution with id {1}"
+ .format(self.service.name, active_executions[0].id))
+
+ def _get_workflow_fn(self):
+ if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
+ return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
+ self._workflow_name))
+
+ workflow = self.service.workflows[self._workflow_name]
+
+ # TODO: Custom workflow support needs improvement, currently this code uses internal
+ # knowledge of the resource storage; Instead, workflows should probably be loaded
+ # in a similar manner to operation plugins. Also consider passing to import_fullname
+ # as paths instead of appending to sys path.
+ service_template_resources_path = os.path.join(
+ self._resource_storage.service_template.base_path,
+ str(self.service.service_template.id))
+ sys.path.append(service_template_resources_path)
+
+ try:
+ workflow_fn = import_fullname(workflow.implementation)
+ except ImportError:
+ raise exceptions.WorkflowImplementationNotFoundError(
+ 'Could not find workflow {0} implementation at {1}'.format(
+ self._workflow_name, workflow.implementation))
+
+ return workflow_fn
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index 49c584c..82c40c3 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -16,18 +16,16 @@
"""
Provides the tasks to be entered into the task graph
"""
-import copy
+from ... import context
from ....modeling import models
-from ....utils.collections import (OrderedDict, FrozenDict)
+from ....modeling import utils as modeling_utils
from ....utils.uuid import generate_uuid
-from ... import context
-from .. import exceptions
class BaseTask(object):
"""
- Abstract task_graph task
+ Abstract task graph task
"""
def __init__(self, ctx=None, **kwargs):
@@ -56,14 +54,13 @@ class BaseTask(object):
class OperationTask(BaseTask):
"""
- Represents an operation task in the task graph.
+ Represents an operation task in the task graph
"""
NAME_FORMAT = '{interface}:{operation}@{type}:{name}'
def __init__(self,
actor,
- actor_type,
interface_name,
operation_name,
inputs=None,
@@ -75,122 +72,101 @@ class OperationTask(BaseTask):
:meth:`for_relationship`.
"""
+ actor_type = type(actor).__name__.lower()
+ assert isinstance(actor, (models.Node, models.Relationship))
+ assert actor_type in ('node', 'relationship')
assert interface_name and operation_name
super(OperationTask, self).__init__()
- operation = None
- interface = actor.interfaces.get(interface_name)
- if interface is not None:
- operation = interface.operations.get(operation_name)
-
- if operation is None:
- raise exceptions.OperationNotFoundException(
- 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"'
- .format(operation_name, interface_name, actor_type, actor.name))
-
- if operation.implementation is None:
- raise exceptions.OperationNotFoundException(
- 'Empty operation "{0}" on interface "{1}" for {2} "{3}"'
- .format(operation_name, interface_name, actor_type, actor.name))
-
self.actor = actor
- self.actor_type = actor_type
- self.interface_name = interface_name
- self.operation_name = operation_name
-
- self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
- name=actor.name,
- interface=interface_name,
- operation=operation_name)
self.max_attempts = (self.workflow_context._task_max_attempts
if max_attempts is None else max_attempts)
self.retry_interval = (self.workflow_context._task_retry_interval
if retry_interval is None else retry_interval)
self.ignore_failure = (self.workflow_context._task_ignore_failure
if ignore_failure is None else ignore_failure)
- self.implementation = operation.implementation
- self.plugin = operation.plugin
+ self.interface_name = interface_name
+ self.operation_name = operation_name
- # Wrap inputs
- inputs = copy.deepcopy(inputs) if inputs else {}
- for k, v in inputs.iteritems():
- if not isinstance(v, models.Parameter):
- inputs[k] = models.Parameter.wrap(k, v)
+ operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
+ self.plugin = operation.plugin
+ self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs)
+ self.implementation = operation.implementation
+ self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
+ name=actor.name,
+ interface=self.interface_name,
+ operation=self.operation_name)
- self.inputs = OrderedDict(operation.inputs)
- if inputs:
- self.inputs.update(inputs)
- self.inputs = FrozenDict(self.inputs)
+ def __repr__(self):
+ return self.name
@classmethod
def for_node(cls,
node,
interface_name,
operation_name,
- inputs=None,
max_attempts=None,
retry_interval=None,
- ignore_failure=None):
+ ignore_failure=None,
+ inputs=None):
"""
Creates an operation on a node.
:param node: The node on which to run the operation
:param interface_name: The interface name
:param operation_name: The operation name within the interface
- :param inputs: Override the operation's inputs
:param max_attempts: The maximum number of attempts in case the operation fails
- (if not specified the defaults is taken from the workflow context)
+ (if not specified the defaults it taken from the workflow context)
:param retry_interval: The interval in seconds between attempts when the operation fails
- (if not specified the defaults is taken from the workflow context)
+ (if not specified the defaults it taken from the workflow context)
:param ignore_failure: Whether to ignore failures
- (if not specified the defaults is taken from the workflow context)
+ (if not specified the defaults it taken from the workflow context)
+ :param inputs: Additional operation inputs
"""
assert isinstance(node, models.Node)
return cls(
actor=node,
- actor_type='node',
interface_name=interface_name,
operation_name=operation_name,
- inputs=inputs,
max_attempts=max_attempts,
retry_interval=retry_interval,
- ignore_failure=ignore_failure)
+ ignore_failure=ignore_failure,
+ inputs=inputs)
@classmethod
def for_relationship(cls,
relationship,
interface_name,
operation_name,
- inputs=None,
max_attempts=None,
retry_interval=None,
- ignore_failure=None):
+ ignore_failure=None,
+ inputs=None):
"""
- Creates an operation on a relationship.
+ Creates an operation on a relationship edge.
:param relationship: The relationship on which to run the operation
:param interface_name: The interface name
:param operation_name: The operation name within the interface
- :param inputs: Override the operation's inputs
:param max_attempts: The maximum number of attempts in case the operation fails
- (if not specified the defaults is taken from the workflow context)
+ (if not specified the defaults it taken from the workflow context)
:param retry_interval: The interval in seconds between attempts when the operation fails
- (if not specified the defaults is taken from the workflow context)
+ (if not specified the defaults it taken from the workflow context)
:param ignore_failure: Whether to ignore failures
- (if not specified the defaults is taken from the workflow context)
+ (if not specified the defaults it taken from the workflow context)
+ :param inputs: Additional operation inputs
"""
assert isinstance(relationship, models.Relationship)
return cls(
actor=relationship,
- actor_type='relationship',
interface_name=interface_name,
operation_name=operation_name,
- inputs=inputs,
max_attempts=max_attempts,
retry_interval=retry_interval,
- ignore_failure=ignore_failure)
+ ignore_failure=ignore_failure,
+ inputs=inputs)
class WorkflowTask(BaseTask):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/builtin/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/__init__.py b/aria/orchestrator/workflows/builtin/__init__.py
index d43a962..8b13c62 100644
--- a/aria/orchestrator/workflows/builtin/__init__.py
+++ b/aria/orchestrator/workflows/builtin/__init__.py
@@ -24,6 +24,7 @@ from .stop import stop
BUILTIN_WORKFLOWS = ('install', 'uninstall', 'start', 'stop')
+BUILTIN_WORKFLOWS_PATH_PREFIX = 'aria.orchestrator.workflows.builtin'
__all__ = [
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/builtin/execute_operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/execute_operation.py b/aria/orchestrator/workflows/builtin/execute_operation.py
index 348f47a..16504ec 100644
--- a/aria/orchestrator/workflows/builtin/execute_operation.py
+++ b/aria/orchestrator/workflows/builtin/execute_operation.py
@@ -17,7 +17,7 @@
Builtin execute_operation workflow
"""
-from ..api.task import OperationTask
+from . import utils
from ... import workflow
@@ -28,7 +28,6 @@ def execute_operation(
interface_name,
operation_name,
operation_kwargs,
- allow_kwargs_override,
run_by_dependency_order,
type_names,
node_template_ids,
@@ -41,7 +40,6 @@ def execute_operation(
:param TaskGraph graph: the graph which will describe the workflow.
:param basestring operation: the operation name to execute
:param dict operation_kwargs:
- :param bool allow_kwargs_override:
:param bool run_by_dependency_order:
:param type_names:
:param node_template_ids:
@@ -71,8 +69,7 @@ def execute_operation(
node=node,
interface_name=interface_name,
operation_name=operation_name,
- operation_kwargs=operation_kwargs,
- allow_kwargs_override=allow_kwargs_override
+ operation_kwargs=operation_kwargs
)
)
@@ -108,21 +105,16 @@ def _create_node_task(
node,
interface_name,
operation_name,
- operation_kwargs,
- allow_kwargs_override):
+ operation_kwargs):
"""
A workflow which executes a single operation
:param node: the node instance to install
:param basestring operation: the operation name
:param dict operation_kwargs:
- :param bool allow_kwargs_override:
:return:
"""
- if allow_kwargs_override is not None:
- operation_kwargs['allow_kwargs_override'] = allow_kwargs_override
-
- return OperationTask.for_node(
+ return utils.create_node_task(
node=node,
interface_name=interface_name,
operation_name=operation_name,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/builtin/utils.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py
index 752fe35..2254d13 100644
--- a/aria/orchestrator/workflows/builtin/utils.py
+++ b/aria/orchestrator/workflows/builtin/utils.py
@@ -12,26 +12,31 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from ..api.task import OperationTask
+
+from ..api.task import OperationTask, StubTask
from .. import exceptions
-def create_node_task(node, interface_name, operation_name):
+def create_node_task(node, interface_name, operation_name, **kwargs):
"""
Returns a new operation task if the operation exists in the node, otherwise returns None.
"""
try:
+ if _is_empty_task(node, interface_name, operation_name):
+ return StubTask()
+
return OperationTask.for_node(node=node,
interface_name=interface_name,
- operation_name=operation_name)
+ operation_name=operation_name,
+ **kwargs)
except exceptions.OperationNotFoundException:
# We will skip nodes which do not have the operation
return None
def create_relationships_tasks(
- node, interface_name, source_operation_name=None, target_operation_name=None):
+ node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs):
"""
Creates a relationship task (source and target) for all of a node_instance relationships.
:param basestring source_operation_name: the relationship operation name.
@@ -43,21 +48,18 @@ def create_relationships_tasks(
"""
sub_tasks = []
for relationship in node.outbound_relationships:
- try:
- relationship_operations = relationship_tasks(
- relationship,
- interface_name,
- source_operation_name=source_operation_name,
- target_operation_name=target_operation_name)
- sub_tasks.append(relationship_operations)
- except exceptions.OperationNotFoundException:
- # We will skip relationships which do not have the operation
- pass
+ relationship_operations = relationship_tasks(
+ relationship,
+ interface_name,
+ source_operation_name=source_operation_name,
+ target_operation_name=target_operation_name,
+ **kwargs)
+ sub_tasks.append(relationship_operations)
return sub_tasks
-def relationship_tasks(
- relationship, interface_name, source_operation_name=None, target_operation_name=None):
+def relationship_tasks(relationship, interface_name, source_operation_name=None,
+ target_operation_name=None, **kwargs):
"""
Creates a relationship task source and target.
:param Relationship relationship: the relationship instance itself
@@ -68,17 +70,33 @@ def relationship_tasks(
"""
operations = []
if source_operation_name:
- operations.append(
- OperationTask.for_relationship(relationship=relationship,
- interface_name=interface_name,
- operation_name=source_operation_name)
- )
+ try:
+ if _is_empty_task(relationship, interface_name, source_operation_name):
+ operations.append(StubTask())
+ else:
+ operations.append(
+ OperationTask.for_relationship(relationship=relationship,
+ interface_name=interface_name,
+ operation_name=source_operation_name,
+ **kwargs)
+ )
+ except exceptions.OperationNotFoundException:
+ # We will skip relationships which do not have the operation
+ pass
if target_operation_name:
- operations.append(
- OperationTask.for_relationship(relationship=relationship,
- interface_name=interface_name,
- operation_name=target_operation_name)
- )
+ try:
+ if _is_empty_task(relationship, interface_name, target_operation_name):
+ operations.append(StubTask())
+ else:
+ operations.append(
+ OperationTask.for_relationship(relationship=relationship,
+ interface_name=interface_name,
+ operation_name=target_operation_name,
+ **kwargs)
+ )
+ except exceptions.OperationNotFoundException:
+ # We will skip relationships which do not have the operation
+ pass
return operations
@@ -106,3 +124,15 @@ def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False):
graph.add_dependency(dependency, task)
else:
graph.add_dependency(task, dependencies)
+
+
+def _is_empty_task(actor, interface_name, operation_name):
+ interface = actor.interfaces.get(interface_name)
+ if interface:
+ operation = interface.operations.get(operation_name)
+ if operation:
+ return operation.implementation is None
+
+ raise exceptions.OperationNotFoundException(
+ 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"'
+ .format(operation_name, interface_name, type(actor).__name__.lower(), actor.name))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index f73cade..155d0ee 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -88,12 +88,12 @@ class Engine(logger.LoggerMixin):
def _executable_tasks(self):
now = datetime.utcnow()
return (task for task in self._tasks_iter()
- if task.is_waiting and
+ if task.is_waiting() and
task.due_at <= now and
not self._task_has_dependencies(task))
def _ended_tasks(self):
- return (task for task in self._tasks_iter() if task.has_ended)
+ return (task for task in self._tasks_iter() if task.has_ended())
def _task_has_dependencies(self, task):
return len(self._execution_graph.pred.get(task.id, {})) > 0
@@ -105,7 +105,7 @@ class Engine(logger.LoggerMixin):
for _, data in self._execution_graph.nodes_iter(data=True):
task = data['task']
if isinstance(task, engine_task.OperationTask):
- if not task.model_task.has_ended:
+ if not task.model_task.has_ended():
self._workflow_context.model.task.refresh(task.model_task)
yield task
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index ba93e21..2b26152 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -69,11 +69,9 @@ class StubTask(BaseTask):
self.status = models.Task.PENDING
self.due_at = datetime.utcnow()
- @property
def has_ended(self):
return self.status in (models.Task.SUCCESS, models.Task.FAILED)
- @property
def is_waiting(self):
return self.status in (models.Task.PENDING, models.Task.RETRYING)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/exceptions.py b/aria/orchestrator/workflows/exceptions.py
index 0ca263f..b5ae496 100644
--- a/aria/orchestrator/workflows/exceptions.py
+++ b/aria/orchestrator/workflows/exceptions.py
@@ -16,6 +16,8 @@
"""
Workflow related Exception classes
"""
+import os
+
from .. import exceptions
@@ -52,10 +54,10 @@ class ProcessException(ExecutorException):
Describes the error in detail
"""
return (
- 'Command "{error.command}" executed with an error.\n'
- 'code: {error.return_code}\n'
- 'error: {error.stderr}\n'
- 'output: {error.stdout}'.format(error=self))
+ 'Command "{error.command}" executed with an error.{0}'
+ 'code: {error.return_code}{0}'
+ 'error: {error.stderr}{0}'
+ 'output: {error.stdout}'.format(os.linesep, error=self))
class AriaEngineError(exceptions.AriaError):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/executor/celery.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/celery.py b/aria/orchestrator/workflows/executor/celery.py
index baa0375..7bd9b7c 100644
--- a/aria/orchestrator/workflows/executor/celery.py
+++ b/aria/orchestrator/workflows/executor/celery.py
@@ -44,7 +44,7 @@ class CeleryExecutor(BaseExecutor):
def execute(self, task):
self._tasks[task.id] = task
- inputs = dict((k, v.value) for k, v in task.inputs.iteritems())
+ inputs = dict(inp.unwrap() for inp in task.inputs.values())
inputs['ctx'] = task.context
self._results[task.id] = self._app.send_task(
task.operation_mapping,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
new file mode 100644
index 0000000..d894b25
--- /dev/null
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Dry executor
+"""
+
+from datetime import datetime
+
+from .base import BaseExecutor
+
+
+class DryExecutor(BaseExecutor):
+ """
+ Executor which dry runs tasks - prints task information without causing any side effects
+ """
+
+ def execute(self, task):
+ # updating the task manually instead of calling self._task_started(task),
+ # to avoid any side effects raising that event might cause
+ with task._update():
+ task.started_at = datetime.utcnow()
+ task.status = task.STARTED
+
+ actor_type = type(task.actor).__name__.lower()
+ implementation = '{0} > '.format(task.plugin) if task.plugin else ''
+ implementation += task.implementation
+ inputs = dict(inp.unwrap() for inp in task.inputs.values())
+
+ task.context.logger.info(
+ 'Executing {actor_type} {task.actor.name} operation {task.interface_name} '
+ '{task.operation_name}: {implementation} (Inputs: {inputs})'
+ .format(actor_type=actor_type, task=task, implementation=implementation, inputs=inputs))
+
+ # updating the task manually instead of calling self._task_succeeded(task),
+ # to avoid any side effects raising that event might cause
+ with task._update():
+ task.ended_at = datetime.utcnow()
+ task.status = task.SUCCESS
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index f814c4d..851d78e 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -148,7 +148,7 @@ class ProcessExecutor(base.BaseExecutor):
return {
'task_id': task.id,
'implementation': task.implementation,
- 'operation_inputs': dict((k, v.value) for k, v in task.inputs.iteritems()),
+ 'operation_inputs': dict(inp.unwrap() for inp in task.inputs.values()),
'port': self._server_port,
'context': task.context.serialization_dict,
}
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 1a49af5..f422592 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -21,6 +21,7 @@ import Queue
import threading
from aria.utils import imports
+
from .base import BaseExecutor
@@ -58,7 +59,7 @@ class ThreadExecutor(BaseExecutor):
self._task_started(task)
try:
task_func = imports.load_attribute(task.implementation)
- inputs = dict((k, v.value) for k, v in task.inputs.iteritems())
+ inputs = dict(inp.unwrap() for inp in task.inputs.values())
task_func(ctx=task.context, **inputs)
self._task_succeeded(task)
except BaseException as e:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/parser/consumption/__init__.py
----------------------------------------------------------------------
diff --git a/aria/parser/consumption/__init__.py b/aria/parser/consumption/__init__.py
index 7da8490..8f6d2b6 100644
--- a/aria/parser/consumption/__init__.py
+++ b/aria/parser/consumption/__init__.py
@@ -17,10 +17,21 @@
from .exceptions import ConsumerException
from .context import ConsumptionContext
from .style import Style
-from .consumer import Consumer, ConsumerChain
+from .consumer import (
+ Consumer,
+ ConsumerChain
+)
from .presentation import Read
from .validation import Validate
-from .modeling import ServiceTemplate, Types, ServiceInstance
+from .modeling import (
+ ServiceTemplate,
+ Types,
+ ServiceInstance,
+ FindHosts,
+ ConfigureOperations,
+ SatisfyRequirements,
+ ValidateCapabilities
+)
from .inputs import Inputs
__all__ = (
@@ -34,4 +45,7 @@ __all__ = (
'ServiceTemplate',
'Types',
'ServiceInstance',
- 'Inputs')
+ 'Inputs',
+ 'SatisfyRequirements',
+ 'ValidateCapabilities'
+)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/parser/consumption/modeling.py
----------------------------------------------------------------------
diff --git a/aria/parser/consumption/modeling.py b/aria/parser/consumption/modeling.py
index 6c616b4..771fd7f 100644
--- a/aria/parser/consumption/modeling.py
+++ b/aria/parser/consumption/modeling.py
@@ -106,7 +106,8 @@ class InstantiateServiceInstance(Consumer):
'template')
return
- self.context.modeling.template.instantiate(None)
+ self.context.modeling.template.instantiate(None, None,
+ inputs=dict(self.context.modeling.inputs))
class CoerceServiceInstanceValues(Consumer):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/storage/core.py
----------------------------------------------------------------------
diff --git a/aria/storage/core.py b/aria/storage/core.py
index 8302fc9..8caca66 100644
--- a/aria/storage/core.py
+++ b/aria/storage/core.py
@@ -38,7 +38,7 @@ API:
* StorageDriver - class, abstract model implementation.
"""
-from aria.logger import LoggerMixin
+from aria.logger import LoggerMixin, NullHandler
from . import sql_mapi
__all__ = (
@@ -71,6 +71,10 @@ class Storage(LoggerMixin):
:param kwargs:
"""
super(Storage, self).__init__(**kwargs)
+ # Set the logger handler of any storage object to NullHandler.
+ # This is since the absence of a handler shows up while using the CLI in the form of:
+ # `No handlers could be found for logger "aria.ResourceStorage"`.
+ self.logger.addHandler(NullHandler())
self.api = api_cls
self.registered = {}
self._initiator = initiator
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/storage/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/storage/exceptions.py b/aria/storage/exceptions.py
index f982f63..3f0ecec 100644
--- a/aria/storage/exceptions.py
+++ b/aria/storage/exceptions.py
@@ -23,3 +23,7 @@ class StorageError(exceptions.AriaError):
General storage exception
"""
pass
+
+
+class NotFoundError(StorageError):
+ pass
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index 138432a..cf2a365 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import os
import copy
import json
@@ -189,9 +190,9 @@ def apply_tracked_changes(tracked_changes, model):
if not value:
del successfully_updated_changes[key]
model.logger.error(
- 'Registering all the changes to the storage has failed. \n'
- 'The successful updates were: \n '
- '{0}'.format(json.dumps(successfully_updated_changes, indent=4)))
+ 'Registering all the changes to the storage has failed. {0}'
+ 'The successful updates were: {0} '
+ '{1}'.format(os.linesep, json.dumps(successfully_updated_changes, indent=4)))
raise
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/storage/sql_mapi.py
----------------------------------------------------------------------
diff --git a/aria/storage/sql_mapi.py b/aria/storage/sql_mapi.py
index 8d34bb4..730d007 100644
--- a/aria/storage/sql_mapi.py
+++ b/aria/storage/sql_mapi.py
@@ -59,7 +59,7 @@ class SQLAlchemyModelAPI(api.ModelAPI):
result = query.first()
if not result:
- raise exceptions.StorageError(
+ raise exceptions.NotFoundError(
'Requested `{0}` with ID `{1}` was not found'
.format(self.model_cls.__name__, entry_id)
)
@@ -69,13 +69,13 @@ class SQLAlchemyModelAPI(api.ModelAPI):
assert hasattr(self.model_cls, 'name')
result = self.list(include=include, filters={'name': entry_name})
if not result:
- raise exceptions.StorageError(
- 'Requested {0} with NAME `{1}` was not found'
+ raise exceptions.NotFoundError(
+ 'Requested {0} with name `{1}` was not found'
.format(self.model_cls.__name__, entry_name)
)
elif len(result) > 1:
raise exceptions.StorageError(
- 'Requested {0} with NAME `{1}` returned more than 1 value'
+ 'Requested {0} with name `{1}` returned more than 1 value'
.format(self.model_cls.__name__, entry_name)
)
else:
@@ -92,10 +92,8 @@ class SQLAlchemyModelAPI(api.ModelAPI):
results, total, size, offset = self._paginate(query, pagination)
return ListResult(
- items=results,
- metadata=dict(total=total,
- size=size,
- offset=offset)
+ dict(total=total, size=size, offset=offset),
+ results
)
def iter(self,
@@ -406,19 +404,11 @@ def init_storage(base_dir, filename='db.sqlite'):
return dict(engine=engine, session=session)
-class ListResult(object):
+class ListResult(list):
"""
a ListResult contains results about the requested items.
"""
- def __init__(self, items, metadata):
- self.items = items
+ def __init__(self, metadata, *args, **qwargs):
+ super(ListResult, self).__init__(*args, **qwargs)
self.metadata = metadata
-
- def __len__(self):
- return len(self.items)
-
- def __iter__(self):
- return iter(self.items)
-
- def __getitem__(self, item):
- return self.items[item]
+ self.items = self
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/utils/application.py
----------------------------------------------------------------------
diff --git a/aria/utils/application.py b/aria/utils/application.py
deleted file mode 100644
index 2f40825..0000000
--- a/aria/utils/application.py
+++ /dev/null
@@ -1,294 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Convenience storage related tools.
-# TODO rename module name
-"""
-
-import json
-import os
-import shutil
-import tarfile
-import tempfile
-from datetime import datetime
-
-from aria.storage.exceptions import StorageError
-from aria.logger import LoggerMixin
-
-
-class StorageManager(LoggerMixin):
- """
- Convenience wrapper to simplify work with the lower level storage mechanism
- """
-
- def __init__(
- self,
- model_storage,
- resource_storage,
- blueprint_path,
- blueprint_id,
- blueprint_plan,
- deployment_id,
- deployment_plan,
- **kwargs):
- super(StorageManager, self).__init__(**kwargs)
- self.model_storage = model_storage
- self.resource_storage = resource_storage
- self.blueprint_path = blueprint_path
- self.blueprint_id = blueprint_id
- self.blueprint_plan = blueprint_plan
- self.deployment_id = deployment_id
- self.deployment_plan = deployment_plan
-
- @classmethod
- def from_deployment(
- cls,
- model_storage,
- resource_storage,
- deployment_id,
- deployment_plan):
- """
- Create a StorageManager from a deployment
- """
- return cls(
- model_storage=model_storage,
- resource_storage=resource_storage,
- deployment_id=deployment_id,
- deployment_plan=deployment_plan,
- blueprint_path=None,
- blueprint_plan=None,
- blueprint_id=None
- )
-
- @classmethod
- def from_blueprint(
- cls,
- model_storage,
- resource_storage,
- blueprint_path,
- blueprint_id,
- blueprint_plan):
- """
- Create a StorageManager from a blueprint
- """
- return cls(
- model_storage=model_storage,
- resource_storage=resource_storage,
- blueprint_path=blueprint_path,
- blueprint_plan=blueprint_plan,
- blueprint_id=blueprint_id,
- deployment_id=None,
- deployment_plan=None)
-
- def create_blueprint_storage(self, source, main_file_name=None):
- """
- create blueprint model & resource
- """
- assert self.blueprint_path and self.blueprint_id
- assert hasattr(self.resource_storage, 'blueprint')
- assert hasattr(self.model_storage, 'blueprint')
-
- self.logger.debug('creating blueprint resource storage entry')
- self.resource_storage.service_template.upload(
- entry_id=self.blueprint_id,
- source=os.path.dirname(source))
- self.logger.debug('created blueprint resource storage entry')
-
- self.logger.debug('creating blueprint model storage entry')
- now = datetime.utcnow()
- blueprint = self.model_storage.service_template.model_cls(
- plan=self.blueprint_plan,
- id=self.blueprint_id,
- description=self.blueprint_plan.get('description'),
- created_at=now,
- updated_at=now,
- main_file_name=main_file_name,
- )
- self.model_storage.service_template.put(blueprint)
- self.logger.debug('created blueprint model storage entry')
-
- def create_nodes_storage(self):
- """
- create nodes model
- """
- assert self.blueprint_path and self.blueprint_id
- assert hasattr(self.model_storage, 'node')
- assert hasattr(self.model_storage, 'relationship')
-
- for node in self.blueprint_plan['nodes']:
- node_copy = node.copy()
- for field in ('name',
- 'deployment_plugins_to_install',
- 'interfaces',
- 'instances'):
- node_copy.pop(field)
- scalable = node_copy.pop('capabilities')['scalable']['properties']
- for index, relationship in enumerate(node_copy['relationships']):
- relationship = self.model_storage.relationship.model_cls(**relationship)
- self.model_storage.relationship.put(relationship)
- node_copy['relationships'][index] = relationship
-
- node_copy = self.model_storage.node.model_cls(
- blueprint_id=self.blueprint_id,
- planned_number_of_instances=scalable['current_instances'],
- deploy_number_of_instances=scalable['default_instances'],
- min_number_of_instances=scalable['min_instances'],
- max_number_of_instances=scalable['max_instances'],
- number_of_instances=scalable['current_instances'],
- **node_copy)
- self.model_storage.node.put(node_copy)
-
- def create_deployment_storage(self):
- """
- create deployment model & resource
- """
- assert self.deployment_id and self.deployment_plan
-
- assert hasattr(self.resource_storage, 'blueprint')
- assert hasattr(self.resource_storage, 'deployment')
- assert hasattr(self.model_storage, 'deployment')
-
- self.logger.debug('creating deployment resource storage entry')
- temp_dir = tempfile.mkdtemp()
- try:
- self.resource_storage.service_template.download(
- entry_id=self.blueprint_id,
- destination=temp_dir)
- self.resource_storage.service_instance.upload(
- entry_id=self.deployment_id,
- source=temp_dir)
- finally:
- shutil.rmtree(temp_dir, ignore_errors=True)
- self.logger.debug('created deployment resource storage entry')
-
- self.logger.debug('creating deployment model storage entry')
- now = datetime.utcnow()
- deployment = self.model_storage.service_instance.model_cls(
- id=self.deployment_id,
- blueprint_id=self.blueprint_id,
- description=self.deployment_plan['description'],
- workflows=self.deployment_plan['workflows'],
- inputs=self.deployment_plan['inputs'],
- policy_types=self.deployment_plan['policy_types'],
- policy_triggers=self.deployment_plan['policy_triggers'],
- groups=self.deployment_plan['groups'],
- scaling_groups=self.deployment_plan['scaling_groups'],
- outputs=self.deployment_plan['outputs'],
- created_at=now,
- updated_at=now
- )
- self.model_storage.service_instance.put(deployment)
- self.logger.debug('created deployment model storage entry')
-
- def create_node_instances_storage(self):
- """
- create node_instances model
- """
- assert self.deployment_id and self.deployment_plan
- assert hasattr(self.model_storage, 'node_instance')
- assert hasattr(self.model_storage, 'relationship_instance')
-
- self.logger.debug('creating node-instances model storage entries')
- for node_instance in self.deployment_plan['node_instances']:
- node_model = self.model_storage.node.get(node_instance['node_id'])
- relationship_instances = []
-
- for index, relationship_instance in enumerate(node_instance['relationships']):
- relationship_instance_model = self.model_storage.relationship.model_cls(
- relationship=node_model.relationships[index],
- target_name=relationship_instance['target_name'],
- type=relationship_instance['type'],
- target_id=relationship_instance['target_id'])
- relationship_instances.append(relationship_instance_model)
- self.model_storage.relationship.put(relationship_instance_model)
-
- node_instance_model = self.model_storage.node.model_cls(
- node=node_model,
- id=node_instance['id'],
- runtime_properties={},
- state=self.model_storage.node.model_cls.UNINITIALIZED,
- deployment_id=self.deployment_id,
- version='1.0',
- relationship_instances=relationship_instances)
-
- self.model_storage.node.put(node_instance_model)
- self.logger.debug('created node-instances model storage entries')
-
- def create_plugin_storage(self, plugin_id, source):
- """
- create plugin model & resource
- """
- assert hasattr(self.model_storage, 'plugin')
- assert hasattr(self.resource_storage, 'plugin')
-
- self.logger.debug('creating plugin resource storage entry')
- self.resource_storage.plugin.upload(entry_id=plugin_id, source=source)
- self.logger.debug('created plugin resource storage entry')
-
- self.logger.debug('creating plugin model storage entry')
- plugin = _load_plugin_from_archive(source)
- build_props = plugin.get('build_server_os_properties')
- now = datetime.utcnow()
-
- plugin = self.model_storage.plugin.model_cls(
- id=plugin_id,
- package_name=plugin.get('package_name'),
- package_version=plugin.get('package_version'),
- archive_name=plugin.get('archive_name'),
- package_source=plugin.get('package_source'),
- supported_platform=plugin.get('supported_platform'),
- distribution=build_props.get('distribution'),
- distribution_version=build_props.get('distribution_version'),
- distribution_release=build_props.get('distribution_release'),
- wheels=plugin.get('wheels'),
- excluded_wheels=plugin.get('excluded_wheels'),
- supported_py_versions=plugin.get('supported_python_versions'),
- uploaded_at=now
- )
- self.model_storage.plugin.put(plugin)
- self.logger.debug('created plugin model storage entry')
-
-
-def _load_plugin_from_archive(tar_source):
- if not tarfile.is_tarfile(tar_source):
- # TODO: go over the exceptions
- raise StorageError(
- 'the provided tar archive can not be read.')
-
- with tarfile.open(tar_source) as tar:
- tar_members = tar.getmembers()
- # a wheel plugin will contain exactly one sub directory
- if not tar_members:
- raise StorageError(
- 'archive file structure malformed. expecting exactly one '
- 'sub directory; got none.')
- package_json_path = os.path.join(tar_members[0].name,
- 'package.json')
- try:
- package_member = tar.getmember(package_json_path)
- except KeyError:
- raise StorageError("'package.json' was not found under {0}"
- .format(package_json_path))
- try:
- package_json = tar.extractfile(package_member)
- except tarfile.ExtractError as e:
- raise StorageError(str(e))
- try:
- return json.load(package_json)
- except ValueError as e:
- raise StorageError("'package.json' is not a valid json: "
- "{json_str}. error is {error}"
- .format(json_str=package_json.read(), error=str(e)))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/utils/archive.py
----------------------------------------------------------------------
diff --git a/aria/utils/archive.py b/aria/utils/archive.py
new file mode 100644
index 0000000..63d9004
--- /dev/null
+++ b/aria/utils/archive.py
@@ -0,0 +1,63 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import tarfile
+import zipfile
+import tempfile
+from contextlib import closing
+
+
+def is_archive(source):
+ return tarfile.is_tarfile(source) or zipfile.is_zipfile(source)
+
+
+def extract_archive(source):
+ if tarfile.is_tarfile(source):
+ return untar(source)
+ elif zipfile.is_zipfile(source):
+ return unzip(source)
+ raise ValueError(
+ 'Unsupported archive type provided or archive is not valid: {0}.'.format(source))
+
+
+def tar(source, destination):
+ with closing(tarfile.open(destination, 'w:gz')) as tar_archive:
+ tar_archive.add(source, arcname=os.path.basename(source))
+
+
+def untar(archive, destination=None):
+ if not destination:
+ destination = tempfile.mkdtemp()
+ with closing(tarfile.open(name=archive)) as tar_archive:
+ tar_archive.extractall(path=destination, members=tar_archive.getmembers())
+ return destination
+
+
+def zip(source, destination):
+ with closing(zipfile.ZipFile(destination, 'w')) as zip_file:
+ for root, _, files in os.walk(source):
+ for filename in files:
+ file_path = os.path.join(root, filename)
+ source_dir = os.path.dirname(source)
+ zip_file.write(
+ file_path, os.path.relpath(file_path, source_dir))
+ return destination
+
+
+def unzip(archive, destination=None):
+ if not destination:
+ destination = tempfile.mkdtemp()
+ with closing(zipfile.ZipFile(archive, 'r')) as zip_file:
+ zip_file.extractall(destination)
+ return destination
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/utils/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/utils/exceptions.py b/aria/utils/exceptions.py
index 9e3e80f..b60cee4 100644
--- a/aria/utils/exceptions.py
+++ b/aria/utils/exceptions.py
@@ -15,6 +15,7 @@
import sys
import linecache
+import StringIO
import traceback as tb
import jsonpickle
@@ -89,6 +90,16 @@ def _print_stack(frame):
puts(line)
+def get_exception_as_string(exc_type, exc_val, traceback):
+ s_traceback = StringIO.StringIO()
+ tb.print_exception(
+ etype=exc_type,
+ value=exc_val,
+ tb=traceback,
+ file=s_traceback)
+ return s_traceback.getvalue()
+
+
class _WrappedException(Exception):
def __init__(self, exception_type, exception_str):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/utils/file.py
----------------------------------------------------------------------
diff --git a/aria/utils/file.py b/aria/utils/file.py
index b515f70..6d1aa16 100644
--- a/aria/utils/file.py
+++ b/aria/utils/file.py
@@ -15,6 +15,7 @@
import errno
import os
+import shutil
def makedirs(path):
@@ -26,3 +27,15 @@ def makedirs(path):
except IOError as e:
if e.errno != errno.EEXIST:
raise
+
+def remove_if_exists(path):
+
+ try:
+ if os.path.isfile(path):
+ os.remove(path)
+ if os.path.isdir(path):
+ shutil.rmtree(path)
+
+ except OSError as e:
+ if e.errno != errno.ENOENT: # errno.ENOENT = no such file or directory
+ raise # re-raise exception if a different error occurred
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/utils/formatting.py
----------------------------------------------------------------------
diff --git a/aria/utils/formatting.py b/aria/utils/formatting.py
index 8a223e9..b5e141d 100644
--- a/aria/utils/formatting.py
+++ b/aria/utils/formatting.py
@@ -83,6 +83,34 @@ def full_type_name(value):
return name if module == '__builtin__' else '%s.%s' % (module, name)
+def decode_list(data):
+ decoded_list = []
+ for item in data:
+ if isinstance(item, unicode):
+ item = item.encode('utf-8')
+ elif isinstance(item, list):
+ item = decode_list(item)
+ elif isinstance(item, dict):
+ item = decode_dict(item)
+ decoded_list.append(item)
+ return decoded_list
+
+
+def decode_dict(data):
+ decoded_dict = {}
+ for key, value in data.iteritems():
+ if isinstance(key, unicode):
+ key = key.encode('utf-8')
+ if isinstance(value, unicode):
+ value = value.encode('utf-8')
+ elif isinstance(value, list):
+ value = decode_list(value)
+ elif isinstance(value, dict):
+ value = decode_dict(value)
+ decoded_dict[key] = value
+ return decoded_dict
+
+
def safe_str(value):
"""
Like :code:`str` coercion, but makes sure that Unicode strings are properly