You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2016/12/03 13:48:32 UTC
[1/2] incubator-ariatosca git commit: code review 1 [Forced Update!]
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-30-SQL-based-storage-implementation 2d8f13385 -> 30a439832 (forced update)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/storage/sql_mapi.py
----------------------------------------------------------------------
diff --git a/aria/storage/sql_mapi.py b/aria/storage/sql_mapi.py
new file mode 100644
index 0000000..ab26dd7
--- /dev/null
+++ b/aria/storage/sql_mapi.py
@@ -0,0 +1,363 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+SQLAlchemy based MAPI
+"""
+
+from sqlalchemy.exc import SQLAlchemyError
+from sqlalchemy.sql.elements import Label
+
+from aria.utils.collections import OrderedDict
+
+from aria.storage import (
+ api,
+ exceptions
+)
+
+
+DEFAULT_SQL_DIALECT = 'sqlite'
+
+
+class SQLAlchemyModelAPI(api.ModelAPI):
+ """
+ SQL based MAPI.
+ """
+
+ def __init__(self,
+ engine,
+ session,
+ **kwargs):
+ super(SQLAlchemyModelAPI, self).__init__(**kwargs)
+ self._engine = engine
+ self._session = session
+
+ def get(self, entry_id, include=None, filters=None, locking=False, **kwargs):
+ """Return a single result based on the model class and element ID
+ """
+ filters = filters or {'id': entry_id}
+ query = self._get_query(include, filters)
+ if locking:
+ query = query.with_for_update()
+ result = query.first()
+
+ if not result:
+ raise exceptions.StorageError(
+ 'Requested {0} with ID `{1}` was not found'
+ .format(self.model_cls.__name__, entry_id)
+ )
+ return result
+
+ def iter(self,
+ include=None,
+ filters=None,
+ pagination=None,
+ sort=None,
+ **kwargs):
+ """Return a (possibly empty) list of `model_class` results
+ """
+ query = self._get_query(include, filters, sort)
+
+ results, _, _, _ = self._paginate(query, pagination)
+
+ for result in results:
+ yield result
+
+ def put(self, entry, **kwargs):
+ """Create a `model_class` instance from a serializable `model` object
+
+ :param entry: A dict with relevant kwargs, or an instance of a class
+ that has a `to_dict` method, and whose attributes match the columns
+ of `model_class` (might also my just an instance of `model_class`)
+ :return: An instance of `model_class`
+ """
+ self._session.add(entry)
+ self._safe_commit()
+ return entry
+
+ def delete(self, entry_id, filters=None, **kwargs):
+ """Delete a single result based on the model class and element ID
+ """
+ try:
+ instance = self.get(
+ entry_id,
+ filters=filters
+ )
+ except exceptions.StorageError:
+ raise exceptions.StorageError(
+ 'Could not delete {0} with ID `{1}` - element not found'
+ .format(
+ self.model_cls.__name__,
+ entry_id
+ )
+ )
+ self._load_properties(instance)
+ self._session.delete(instance)
+ self._safe_commit()
+ return instance
+
+ # TODO: this might need rework
+ def update(self, entry, **kwargs):
+ """Add `instance` to the DB session, and attempt to commit
+
+ :return: The updated instance
+ """
+ return self.put(entry)
+
+ def refresh(self, entry):
+ """Reload the instance with fresh information from the DB
+
+ :param entry: Instance to be re-loaded from the DB
+ :return: The refreshed instance
+ """
+ self._session.refresh(entry)
+ self._load_properties(entry)
+ return entry
+
+ def _destroy_connection(self):
+ pass
+
+ def _establish_connection(self):
+ pass
+
+ def create(self):
+ self.model_cls.__table__.create(self._engine)
+
+ def drop(self):
+ """
+ Drop the table from the storage.
+ :return:
+ """
+ self.model_cls.__table__.drop(self._engine)
+
+ def _safe_commit(self):
+ """Try to commit changes in the session. Roll back if exception raised
+ Excepts SQLAlchemy errors and rollbacks if they're caught
+ """
+ try:
+ self._session.commit()
+ except SQLAlchemyError as e:
+ self._session.rollback()
+ raise exceptions.StorageError(
+ 'SQL Storage error: {0}'.format(str(e))
+ )
+
+ def _get_base_query(self, include, joins):
+ """Create the initial query from the model class and included columns
+
+ :param include: A (possibly empty) list of columns to include in
+ the query
+ :param joins: A (possibly empty) list of models on which the query
+ should join
+ :return: An SQLAlchemy AppenderQuery object
+ """
+
+ # If only some columns are included, query through the session object
+ if include:
+ query = self._session.query(*include)
+ else:
+ # If all columns should be returned, query directly from the model
+ query = self._session.query(self.model_cls)
+
+ # Add any joins that might be necessary
+ for join_model in joins:
+ query = query.join(join_model)
+
+ return query
+
+ @staticmethod
+ def _sort_query(query, sort=None):
+ """Add sorting clauses to the query
+
+ :param query: Base SQL query
+ :param sort: An optional dictionary where keys are column names to
+ sort by, and values are the order (asc/desc)
+ :return: An SQLAlchemy AppenderQuery object
+ """
+ if sort:
+ for column, order in sort.items():
+ if order == 'desc':
+ column = column.desc()
+ query = query.order_by(column)
+ return query
+
+ @staticmethod
+ def _filter_query(query, filters):
+ """Add filter clauses to the query
+
+ :param query: Base SQL query
+ :param filters: An optional dictionary where keys are column names to
+ filter by, and values are values applicable for those columns (or lists
+ of such values)
+ :return: An SQLAlchemy AppenderQuery object
+ """
+ for column, value in filters.items():
+ # If there are multiple values, use `in_`, otherwise, use `eq`
+ if isinstance(value, (list, tuple)):
+ query = query.filter(column.in_(value))
+ else:
+ query = query.filter(column == value)
+
+ return query
+
+ def _get_query(self,
+ include=None,
+ filters=None,
+ sort=None):
+ """Get an SQL query object based on the params passed
+
+ :param include: An optional list of columns to include in the query
+ :param filters: An optional dictionary where keys are column names to
+ filter by, and values are values applicable for those columns (or lists
+ of such values)
+ :param sort: An optional dictionary where keys are column names to
+ sort by, and values are the order (asc/desc)
+ :return: A sorted and filtered query with only the relevant
+ columns
+ """
+
+ include = include or []
+ filters = filters or dict()
+ sort = sort or OrderedDict()
+
+ joins = self._get_join_models_list(include, filters, sort)
+ include, filters, sort = self._get_columns_from_field_names(
+ include, filters, sort
+ )
+
+ query = self._get_base_query(include, joins)
+ query = self._filter_query(query, filters)
+ query = self._sort_query(query, sort)
+ return query
+
+ def _get_columns_from_field_names(self,
+ include,
+ filters,
+ sort):
+ """Go over the optional parameters (include, filters, sort), and
+ replace column names with actual SQLA column objects
+ """
+ all_includes = [self._get_column(c) for c in include]
+ include = []
+ # Columns that are inferred from properties (Labels) should be included
+ # last for the following joins to work properly
+ for col in all_includes:
+ if isinstance(col, Label):
+ include.append(col)
+ else:
+ include.insert(0, col)
+
+ filters = dict((self._get_column(c), filters[c]) for c in filters)
+ sort = OrderedDict((self._get_column(c), sort[c]) for c in sort)
+
+ return include, filters, sort
+
+ def _get_join_models_list(self, include, filters, sort):
+ """Return a list of models on which the query should be joined, as
+ inferred from the include, filter and sort column names
+ """
+ if not self.model_cls.is_resource:
+ return []
+
+ all_column_names = include + filters.keys() + sort.keys()
+ join_columns = set(column_name for column_name in all_column_names
+ if self._is_join_column(column_name))
+
+ # If the only columns included are the columns on which we would
+ # normally join, there isn't actually a need to join, as the FROM
+ # clause in the query will be generated from the relevant models anyway
+ if include == list(join_columns):
+ return []
+
+ # Initializing a set, because the same model can appear in several
+ # join lists
+ join_models = set()
+ for column_name in join_columns:
+ join_models.update(
+ self.model_cls.join_properties[column_name]['models']
+ )
+ # Sort the models by their correct join order
+ join_models = sorted(join_models,
+ key=lambda model: model.join_order, reverse=True)
+
+ return join_models
+
+ def _is_join_column(self, column_name):
+ """Return False if the column name corresponds to a regular SQLA
+ column that `model_class` has.
+ Return True if the column that should be used is a join column (see
+ SQLModelBase for an explanation)
+ """
+ return self.model_cls.is_resource and \
+ column_name in self.model_cls.join_properties
+
+ def _get_column(self, column_name):
+ """Return the column on which an action (filtering, sorting, etc.)
+ would need to be performed. Can be either an attribute of the class,
+ or needs to be inferred from the class' `join_properties` property
+ """
+ if self._is_join_column(column_name):
+ return self.model_cls.join_properties[column_name]['column']
+ else:
+ return getattr(self.model_cls, column_name)
+
+ # TODO is this really needed in aria?
+ @staticmethod
+ def _paginate(query, pagination):
+ """Paginate the query by size and offset
+
+ :param query: Current SQLAlchemy query object
+ :param pagination: An optional dict with size and offset keys
+ :return: A tuple with four elements:
+ - results: `size` items starting from `offset`
+ - the total count of items
+ - `size` [default: 0]
+ - `offset` [default: 0]
+ """
+ if pagination:
+ size = pagination.get('size', 0)
+ offset = pagination.get('offset', 0)
+ total = query.order_by(None).count() # Fastest way to count
+ results = query.limit(size).offset(offset).all()
+ return results, total, size, offset
+ else:
+ results = query.all()
+ return results, len(results), 0, 0
+
+ @staticmethod
+ def _load_properties(instance):
+ """A helper method used to overcome a problem where the properties
+ that rely on joins aren't being loaded automatically
+ """
+ if instance.is_resource:
+ for prop in instance.join_properties:
+ getattr(instance, prop)
+
+
+class ListResult(object):
+ """
+ a ListResult contains results about the requested items.
+ """
+ def __init__(self, items, metadata):
+ self.items = items
+ self.metadata = metadata
+
+ def __len__(self):
+ return len(self.items)
+
+ def __iter__(self):
+ return iter(self.items)
+
+ def __getitem__(self, item):
+ return self.items[item]
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/storage/structures.py
----------------------------------------------------------------------
diff --git a/aria/storage/structures.py b/aria/storage/structures.py
index 9b120de..b297789 100644
--- a/aria/storage/structures.py
+++ b/aria/storage/structures.py
@@ -28,11 +28,12 @@ classes:
"""
import json
-import jsonpickle
from sqlalchemy import VARCHAR
from sqlalchemy.ext.mutable import Mutable
+from sqlalchemy.orm import relationship, backref
from sqlalchemy.ext.declarative import declarative_base
# pylint: disable=unused-import
+from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy import (
schema,
Column,
@@ -49,82 +50,40 @@ from sqlalchemy import (
orm,
)
-
Model = declarative_base()
-def foreign_key(
- parent_table,
- id_col_name='storage_id',
- nullable=False,
- column_type=Integer
-):
+def foreign_key(foreign_key_column, nullable=False):
"""Return a ForeignKey object with the relevant
- :param parent_table: SQL name of the parent table
- :param id_col_name: Name of the parent table's ID column [default: `id`]
+ :param foreign_key_column: Unique id column in the parent table
:param nullable: Should the column be allowed to remain empty
- :param column_type: The type (integer/text/etc.) of the column
- :return:
"""
return Column(
- column_type,
- ForeignKey(
- '{0}.{1}'.format(parent_table.__tablename__, id_col_name),
- ondelete='CASCADE'
- ),
+ ForeignKey(foreign_key_column, ondelete='CASCADE'),
nullable=nullable
)
-def one_to_many_relationship(
- child_class_name,
- column_name,
- parent_class_name,
- back_reference_name,
- parent_id_name='storage_id',
-):
+def one_to_many_relationship(child_class,
+ parent_class,
+ foreign_key_column,
+ backreference=None):
"""Return a one-to-many SQL relationship object
Meant to be used from inside the *child* object
- :param child_class_name: Class name of the child table
- :param column_name: Name of the column pointing to the parent table
- :param parent_class_name: Class name of the parent table
- :param back_reference_name: The name to give to the reference to the child
- :param parent_id_name: Name of the parent table's ID column [default: `id`]
- :return:
+ :param parent_class: Class of the parent table
+ :param child_class: Class of the child table
+ :param foreign_key_column: The column of the foreign key
+ :param backreference: The name to give to the reference to the child
"""
- return orm.relationship(
- parent_class_name,
- primaryjoin='{0}.{1} == {2}.{3}'.format(
- child_class_name,
- column_name,
- parent_class_name,
- parent_id_name
- ),
+ backreference = backreference or child_class.__tablename__
+ return relationship(
+ parent_class,
+ primaryjoin=lambda: parent_class.storage_id == foreign_key_column,
# The following line make sure that when the *parent* is
# deleted, all its connected children are deleted as well
- backref=orm.backref(back_reference_name, cascade='all')
- )
-
-
-def many_to_many_relationship(
- other_table_class_name,
- connecting_table,
- back_reference_name
-):
- """Return a many-to-many SQL relationship object
-
- :param other_table_class_name: The name of the table we're connecting to
- :param connecting_table: The secondary table used in the relationship
- :param back_reference_name: The name to give to the reference to the
- current table from the other table
- :return:
- """
- return orm.relationship(
- other_table_class_name,
- secondary=connecting_table,
- backref=orm.backref(back_reference_name, lazy='dynamic')
+ backref=backref(backreference, cascade='all')
)
@@ -184,44 +143,59 @@ class MutableDict(Mutable, dict):
class SQLModelBase(Model):
- """Abstract base class for all SQL models that allows [de]serialization
+ """
+ Abstract base class for all SQL models that allows [de]serialization
"""
# SQLAlchemy syntax
__abstract__ = True
- # Indicates to the storage manager whether the table is a resource or not
+ # Does the class represent a resource (Blueprint, Deployment, etc.) or a
+ # management table (User, Tenant, etc.), as they are handled differently
is_resource = False
+ # This would be overridden once the models are created.
+ __table__ = None
+
_private_fields = []
# Indicates whether the `id` column in this class should be unique
is_id_unique = True
- def to_dict(self, **kwargs):
- """
- Convert the model into dict
- :return:
- """
- return dict((field, getattr(self, field)) for field in self.fields())
+ storage_id = Column(Integer, primary_key=True, autoincrement=True)
+ id = Column(Text, index=True)
- def to_json(self):
- """
- Convert the model into json.
- :return:
+ @classmethod
+ def unique_id(cls):
+ return 'id'
+
+ def to_dict(self, suppress_error=False):
+ """Return a dict representation of the model
+
+ :param suppress_error: If set to True, sets `None` to attributes that
+ it's unable to retrieve (e.g., if a relationship wasn't established
+ yet, and so it's impossible to access a property through it)
"""
- return jsonpickle.encode(self.to_dict(), unpicklable=False)
+ if suppress_error:
+ res = dict()
+ for field in self.fields():
+ try:
+ field_value = getattr(self, field)
+ except AttributeError:
+ field_value = None
+ res[field] = field_value
+ else:
+ # Can't simply call here `self.to_response()` because inheriting
+ # class might override it, but we always need the same code here
+ res = dict((f, getattr(self, f)) for f in self.fields())
+ return res
- def fields(self):
+ @classmethod
+ def fields(cls):
"""Return the list of field names for this table
Mostly for backwards compatibility in the code (that uses `fields`)
"""
- return self.__table__.columns.keys()
-
- def _get_unique_id(self):
- """A method to allow classes to override the default representation
- """
- return 'id', self.id
+ return set(cls.__table__.columns.keys()) - set(cls._private_fields)
def __str__(self):
id_name, id_value = self._get_unique_id()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/utils/application.py
----------------------------------------------------------------------
diff --git a/aria/utils/application.py b/aria/utils/application.py
index b1a7fcc..113e054 100644
--- a/aria/utils/application.py
+++ b/aria/utils/application.py
@@ -117,7 +117,7 @@ class StorageManager(LoggerMixin):
updated_at=now,
main_file_name=main_file_name,
)
- self.model_storage.blueprint.store(blueprint)
+ self.model_storage.blueprint.put(blueprint)
self.logger.debug('created blueprint model storage entry')
def create_nodes_storage(self):
@@ -138,7 +138,7 @@ class StorageManager(LoggerMixin):
scalable = node_copy.pop('capabilities')['scalable']['properties']
for index, relationship in enumerate(node_copy['relationships']):
relationship = self.model_storage.relationship.model_cls(**relationship)
- self.model_storage.relationship.store(relationship)
+ self.model_storage.relationship.put(relationship)
node_copy['relationships'][index] = relationship
node_copy = self.model_storage.node.model_cls(
@@ -149,7 +149,7 @@ class StorageManager(LoggerMixin):
max_number_of_instances=scalable['max_instances'],
number_of_instances=scalable['current_instances'],
**node_copy)
- self.model_storage.node.store(node_copy)
+ self.model_storage.node.put(node_copy)
def create_deployment_storage(self):
"""
@@ -190,7 +190,7 @@ class StorageManager(LoggerMixin):
created_at=now,
updated_at=now
)
- self.model_storage.deployment.store(deployment)
+ self.model_storage.deployment.put(deployment)
self.logger.debug('created deployment model storage entry')
def create_node_instances_storage(self):
@@ -213,7 +213,7 @@ class StorageManager(LoggerMixin):
type=relationship_instance['type'],
target_id=relationship_instance['target_id'])
relationship_instances.append(relationship_instance_model)
- self.model_storage.relationship_instance.store(relationship_instance_model)
+ self.model_storage.relationship_instance.put(relationship_instance_model)
node_instance_model = self.model_storage.node_instance.model_cls(
node=node_model,
@@ -224,7 +224,7 @@ class StorageManager(LoggerMixin):
version='1.0',
relationship_instances=relationship_instances)
- self.model_storage.node_instance.store(node_instance_model)
+ self.model_storage.node_instance.put(node_instance_model)
self.logger.debug('created node-instances model storage entries')
def create_plugin_storage(self, plugin_id, source):
@@ -258,7 +258,7 @@ class StorageManager(LoggerMixin):
supported_py_versions=plugin.get('supported_python_versions'),
uploaded_at=now
)
- self.model_storage.plugin.store(plugin)
+ self.model_storage.plugin.put(plugin)
self.logger.debug('created plugin model storage entry')
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/tests/mock/context.py
----------------------------------------------------------------------
diff --git a/tests/mock/context.py b/tests/mock/context.py
index 0d09bb1..0ab18bf 100644
--- a/tests/mock/context.py
+++ b/tests/mock/context.py
@@ -18,7 +18,7 @@ import pytest
from aria import application_model_storage
from aria.orchestrator import context
-from aria.storage.mapi import SQLAlchemyModelAPI
+from aria.storage.sql_mapi import SQLAlchemyModelAPI
from tests.storage import get_sqlite_api_params
@@ -29,39 +29,39 @@ from . import models
def simple(**kwargs):
api_params = get_sqlite_api_params()
model_storage = application_model_storage(SQLAlchemyModelAPI, api_params=api_params)
- model_storage.blueprint.store(models.get_blueprint())
+ model_storage.blueprint.put(models.get_blueprint())
blueprint = model_storage.blueprint.get(models.BLUEPRINT_ID)
deployment = models.get_deployment(blueprint)
- model_storage.deployment.store(deployment)
+ model_storage.deployment.put(deployment)
#################################################################################
# Creating a simple deployment with node -> node as a graph
dependency_node = models.get_dependency_node(deployment)
- model_storage.node.store(dependency_node)
+ model_storage.node.put(dependency_node)
storage_dependency_node = model_storage.node.get(dependency_node.id)
dependency_node_instance = models.get_dependency_node_instance(storage_dependency_node)
- model_storage.node_instance.store(dependency_node_instance)
+ model_storage.node_instance.put(dependency_node_instance)
storage_dependency_node_instance = model_storage.node_instance.get(dependency_node_instance.id)
dependent_node = models.get_dependent_node(deployment)
- model_storage.node.store(dependent_node)
+ model_storage.node.put(dependent_node)
storage_dependent_node = model_storage.node.get(dependent_node.id)
dependent_node_instance = models.get_dependent_node_instance(storage_dependent_node)
- model_storage.node_instance.store(dependent_node_instance)
+ model_storage.node_instance.put(dependent_node_instance)
storage_dependent_node_instance = model_storage.node_instance.get(dependent_node_instance.id)
relationship = models.get_relationship(storage_dependent_node, storage_dependency_node)
- model_storage.relationship.store(relationship)
+ model_storage.relationship.put(relationship)
storage_relationship = model_storage.relationship.get(relationship.id)
relationship_instance = models.get_relationship_instance(
relationship=storage_relationship,
target_instance=storage_dependency_node_instance,
source_instance=storage_dependent_node_instance
)
- model_storage.relationship_instance.store(relationship_instance)
+ model_storage.relationship_instance.put(relationship_instance)
final_kwargs = dict(
name='simple_context',
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index bdcbed9..5de3380 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -46,7 +46,7 @@ def get_dependency_node(deployment):
operations=dict((key, {}) for key in operations.NODE_OPERATIONS),
min_number_of_instances=1,
max_number_of_instances=1,
- deployment_storage_id=deployment.storage_id
+ deployment_fk=deployment.storage_id
)
@@ -56,8 +56,8 @@ def get_dependency_node_instance(dependency_node):
host_id=DEPENDENCY_NODE_INSTANCE_ID,
runtime_properties={'ip': '1.1.1.1'},
version=None,
- node_storage_id=dependency_node.storage_id,
- deployment_storage_id=dependency_node.deployment.storage_id,
+ node_fk=dependency_node.storage_id,
+ deployment_fk=dependency_node.deployment.storage_id,
state='',
scaling_groups={}
)
@@ -66,8 +66,8 @@ def get_dependency_node_instance(dependency_node):
def get_relationship(source=None, target=None):
return models.Relationship(
id=RELATIONSHIP_ID,
- source_node_storage_id=source.storage_id,
- target_node_storage_id=target.storage_id,
+ source_node_fk=source.storage_id,
+ target_node_fk=target.storage_id,
source_interfaces={},
source_operations=dict((key, {}) for key in operations.RELATIONSHIP_OPERATIONS),
target_interfaces={},
@@ -81,17 +81,16 @@ def get_relationship(source=None, target=None):
def get_relationship_instance(source_instance, target_instance, relationship):
return models.RelationshipInstance(
id=RELATIONSHIP_INSTANCE_ID,
- type='some_type',
- relationship_storage_id=relationship.storage_id,
- target_node_instance_storage_id=target_instance.storage_id,
- source_node_instance_storage_id=source_instance.storage_id,
+ relationship_fk=relationship.storage_id,
+ target_node_instance_fk=target_instance.storage_id,
+ source_node_instance_fk=source_instance.storage_id,
)
def get_dependent_node(deployment):
return models.Node(
id=DEPENDENT_NODE_ID,
- deployment_storage_id=deployment.storage_id,
+ deployment_fk=deployment.storage_id,
host_id=DEPENDENT_NODE_ID,
type='test_node_type',
type_hierarchy=[],
@@ -111,8 +110,8 @@ def get_dependent_node_instance(dependent_node):
host_id=DEPENDENT_NODE_INSTANCE_ID,
runtime_properties={},
version=None,
- node_storage_id=dependent_node.storage_id,
- deployment_storage_id=dependent_node.deployment.storage_id,
+ node_fk=dependent_node.storage_id,
+ deployment_fk=dependent_node.deployment.storage_id,
state='',
scaling_groups={}
)
@@ -133,7 +132,7 @@ def get_blueprint():
def get_execution(deployment):
return models.Execution(
id=EXECUTION_ID,
- deployment_storage_id=deployment.storage_id,
+ deployment_fk=deployment.storage_id,
status=models.Execution.STARTED,
workflow_id=WORKFLOW_ID,
started_at=datetime.utcnow(),
@@ -145,7 +144,7 @@ def get_deployment(blueprint):
now = datetime.utcnow()
return models.Deployment(
id=DEPLOYMENT_ID,
- blueprint_storage_id=blueprint.storage_id,
+ blueprint_fk=blueprint.storage_id,
description='',
created_at=now,
updated_at=now,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/tests/orchestrator/context/test_toolbelt.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py
index 480f289..5cad219 100644
--- a/tests/orchestrator/context/test_toolbelt.py
+++ b/tests/orchestrator/context/test_toolbelt.py
@@ -68,7 +68,7 @@ def test_host_ip(workflow_context, executor):
'operation': op_path(host_ip, module_path=__name__)
}
- workflow_context.model.node.store(dependency_node)
+ workflow_context.model.node.put(dependency_node)
inputs = {'putput': True}
@workflow
@@ -95,7 +95,7 @@ def test_dependent_node_instances(workflow_context, executor):
'operation': op_path(dependent_nodes, module_path=__name__)
}
- workflow_context.model.node.store(dependency_node)
+ workflow_context.model.node.put(dependency_node)
inputs = {'putput': True}
@workflow
@@ -121,7 +121,7 @@ def test_relationship_tool_belt(workflow_context, executor):
relationship.source_operations[operation_name] = {
'operation': op_path(relationship_operation, module_path=__name__)
}
- workflow_context.model.relationship.store(relationship)
+ workflow_context.model.relationship.put(relationship)
inputs = {'putput': True}
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/tests/orchestrator/context/test_workflow.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_workflow.py b/tests/orchestrator/context/test_workflow.py
index 4c4979f..fbe5d75 100644
--- a/tests/orchestrator/context/test_workflow.py
+++ b/tests/orchestrator/context/test_workflow.py
@@ -19,10 +19,9 @@ import pytest
from aria import application_model_storage
from aria.orchestrator import context
-from aria.storage.mapi.sql import SQLAlchemyModelAPI
-
-from tests.mock import models
+from aria.storage.sql_mapi import SQLAlchemyModelAPI
from tests import storage as test_storage
+from tests.mock import models
class TestWorkflowContext(object):
@@ -60,7 +59,7 @@ class TestWorkflowContext(object):
def storage():
api_params = test_storage.get_sqlite_api_params()
result = application_model_storage(SQLAlchemyModelAPI, api_params=api_params)
- result.blueprint.store(models.get_blueprint())
+ result.blueprint.put(models.get_blueprint())
blueprint = result.blueprint.get(models.BLUEPRINT_ID)
- result.deployment.store(models.get_deployment(blueprint))
+ result.deployment.put(models.get_deployment(blueprint))
return result
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/tests/orchestrator/workflows/api/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/api/test_task.py b/tests/orchestrator/workflows/api/test_task.py
index 4da42c1..3ae700e 100644
--- a/tests/orchestrator/workflows/api/test_task.py
+++ b/tests/orchestrator/workflows/api/test_task.py
@@ -30,7 +30,7 @@ def ctx():
:return:
"""
simple_context = mock.context.simple()
- simple_context.model.execution.store(mock.models.get_execution(simple_context.deployment))
+ simple_context.model.execution.put(mock.models.get_execution(simple_context.deployment))
return simple_context
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index b58460a..45eaa27 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import time
# TODO: fix together with the test
# import threading
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/tests/storage/test_model_storage.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_model_storage.py b/tests/storage/test_model_storage.py
index 8fdf870..4d610f3 100644
--- a/tests/storage/test_model_storage.py
+++ b/tests/storage/test_model_storage.py
@@ -23,17 +23,17 @@ from aria.storage import (
ModelStorage,
models,
exceptions,
- mapi as storage_api,
+ sql_mapi,
)
-
-from tests import storage
+from aria import application_model_storage
+from tests.storage import get_sqlite_api_params
temp_dir = tempfile.mkdtemp()
-APIs = [
- ModelStorage(storage_api.SQLAlchemyModelAPI, api_params=storage.get_sqlite_api_params()),
- # ModelStorage(storage_api.FileSystemModelAPI, api_params=dict(directory=temp_dir)),
-]
+
+@pytest.fixture
+def storage():
+ return ModelStorage(sql_mapi.SQLAlchemyModelAPI, api_params=get_sqlite_api_params())
@pytest.fixture(autouse=True)
@@ -45,18 +45,16 @@ def cleanup():
pass
-@pytest.mark.parametrize('storage', APIs)
def test_storage_base(storage):
with pytest.raises(AttributeError):
storage.non_existent_attribute()
-@pytest.mark.parametrize('storage', APIs)
def test_model_storage(storage):
storage.register(models.ProviderContext)
pc = models.ProviderContext(context={}, name='context_name', id='id1')
- storage.provider_context.store(pc)
+ storage.provider_context.put(pc)
assert storage.provider_context.get('id1') == pc
@@ -73,12 +71,11 @@ def test_model_storage(storage):
storage.provider_context.get('id1')
-@pytest.mark.parametrize('storage', APIs)
def test_storage_driver(storage):
storage.register(models.ProviderContext)
pc = models.ProviderContext(context={}, name='context_name', id='id2')
- storage.registered['provider_context'].store(entry=pc)
+ storage.registered['provider_context'].put(entry=pc)
assert storage.registered['provider_context'].get(entry_id='id2') == pc
@@ -91,17 +88,16 @@ def test_storage_driver(storage):
storage.registered['provider_context'].get('id2')
-# @pytest.mark.parametrize('storage', APIs)
-# def test_application_storage_factory(storage):
-# storage = application_model_storage(api, api_params=api_params)
-# assert storage.node
-# assert storage.node_instance
-# assert storage.plugin
-# assert storage.blueprint
-# assert storage.snapshot
-# assert storage.deployment
-# assert storage.deployment_update
-# assert storage.deployment_update_step
-# assert storage.deployment_modification
-# assert storage.execution
-# assert storage.provider_context
+def test_application_storage_factory():
+ storage = application_model_storage(sql_mapi.SQLAlchemyModelAPI,
+ api_params=get_sqlite_api_params())
+ assert storage.node
+ assert storage.node_instance
+ assert storage.plugin
+ assert storage.blueprint
+ assert storage.deployment
+ assert storage.deployment_update
+ assert storage.deployment_update_step
+ assert storage.deployment_modification
+ assert storage.execution
+ assert storage.provider_context
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/tests/storage/test_resource_storage.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_resource_storage.py b/tests/storage/test_resource_storage.py
index 452867e..4347512 100644
--- a/tests/storage/test_resource_storage.py
+++ b/tests/storage/test_resource_storage.py
@@ -18,8 +18,8 @@ import tempfile
import pytest
+from aria.storage.filesystem_rapi import FileSystemResourceAPI
from aria.storage import (
- rapi,
exceptions,
ResourceStorage
)
@@ -44,12 +44,12 @@ class TestResourceStorage(TestFileSystem):
storage.blueprint.upload(entry_id=id, source=tmp_dir)
def _create_storage(self):
- return ResourceStorage(rapi.FileSystemResourceAPI,
+ return ResourceStorage(FileSystemResourceAPI,
api_params=dict(directory=self.path))
def test_name(self):
- api = rapi.FileSystemResourceAPI
- storage = ResourceStorage(rapi.FileSystemResourceAPI,
+ api = FileSystemResourceAPI
+ storage = ResourceStorage(FileSystemResourceAPI,
items=['blueprint'],
api_params=dict(directory=self.path))
assert repr(storage) == 'ResourceStorage(api={api})'.format(api=api)
@@ -62,8 +62,7 @@ class TestResourceStorage(TestFileSystem):
assert os.path.exists(os.path.join(self.path, 'blueprint'))
def test_upload_file(self):
- storage = ResourceStorage(rapi.FileSystemResourceAPI,
- api_params=dict(directory=self.path))
+ storage = ResourceStorage(FileSystemResourceAPI, api_params=dict(directory=self.path))
self._create(storage)
tmpfile_path = tempfile.mkstemp(suffix=self.__class__.__name__, dir=self.path)[1]
self._upload(storage, tmpfile_path, id='blueprint_id')
@@ -104,7 +103,7 @@ class TestResourceStorage(TestFileSystem):
storage = self._create_storage()
self._create(storage)
with pytest.raises(exceptions.StorageError):
- storage.blueprint.data(entry_id='blueprint_id', path='fake_path')
+ storage.blueprint.read(entry_id='blueprint_id', path='fake_path')
def test_data_file(self):
storage = self._create_storage()
@@ -112,7 +111,7 @@ class TestResourceStorage(TestFileSystem):
tmpfile_path = tempfile.mkstemp(suffix=self.__class__.__name__, dir=self.path)[1]
self._upload(storage, tmpfile_path, 'blueprint_id')
- assert storage.blueprint.data(entry_id='blueprint_id') == 'fake context'
+ assert storage.blueprint.read(entry_id='blueprint_id') == 'fake context'
def test_upload_dir(self):
storage = self._create_storage()
@@ -189,4 +188,4 @@ class TestResourceStorage(TestFileSystem):
storage.blueprint.upload(entry_id='blueprint_id', source=tmp_dir)
with pytest.raises(exceptions.StorageError):
- storage.blueprint.data(entry_id='blueprint_id')
+ storage.blueprint.read(entry_id='blueprint_id')
[2/2] incubator-ariatosca git commit: code review 1
Posted by mx...@apache.org.
code review 1
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/30a43983
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/30a43983
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/30a43983
Branch: refs/heads/ARIA-30-SQL-based-storage-implementation
Commit: 30a439832be94b4b013103aedb9020e08b3fdbb0
Parents: c9df5b1
Author: mxmrlv <mx...@gmail.com>
Authored: Sat Dec 3 13:09:14 2016 +0200
Committer: mxmrlv <mx...@gmail.com>
Committed: Sat Dec 3 15:48:20 2016 +0200
----------------------------------------------------------------------
aria/__init__.py | 5 +-
aria/orchestrator/context/common.py | 6 +-
aria/orchestrator/context/toolbelt.py | 2 +-
aria/orchestrator/context/workflow.py | 8 +-
aria/orchestrator/workflows/api/task.py | 10 +-
aria/orchestrator/workflows/core/task.py | 4 +-
aria/storage/__init__.py | 101 +---
aria/storage/api.py | 12 +-
aria/storage/core.py | 125 +++++
aria/storage/filesystem_api.py | 39 --
aria/storage/filesystem_rapi.py | 126 +++++
aria/storage/mapi/__init__.py | 20 -
aria/storage/mapi/filesystem.py | 118 ----
aria/storage/mapi/inmemory.py | 148 -----
aria/storage/mapi/sql.py | 368 ------------
aria/storage/models.py | 556 ++++++-------------
aria/storage/rapi/__init__.py | 18 -
aria/storage/rapi/filesystem.py | 119 ----
aria/storage/sql_mapi.py | 363 ++++++++++++
aria/storage/structures.py | 132 ++---
aria/utils/application.py | 14 +-
tests/mock/context.py | 18 +-
tests/mock/models.py | 27 +-
tests/orchestrator/context/test_toolbelt.py | 6 +-
tests/orchestrator/context/test_workflow.py | 9 +-
tests/orchestrator/workflows/api/test_task.py | 2 +-
.../orchestrator/workflows/core/test_engine.py | 1 -
tests/storage/test_model_storage.py | 48 +-
tests/storage/test_resource_storage.py | 17 +-
29 files changed, 942 insertions(+), 1480 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/__init__.py
----------------------------------------------------------------------
diff --git a/aria/__init__.py b/aria/__init__.py
index 6e810f0..5317afa 100644
--- a/aria/__init__.py
+++ b/aria/__init__.py
@@ -36,7 +36,6 @@ __all__ = (
'operation',
)
-_model_storage = {}
_resource_storage = {}
@@ -69,7 +68,6 @@ def application_model_storage(api, api_params=None):
storage.models.Relationship,
storage.models.RelationshipInstance,
storage.models.Plugin,
- storage.models.Snapshot,
storage.models.DeploymentUpdate,
storage.models.DeploymentUpdateStep,
storage.models.DeploymentModification,
@@ -78,8 +76,7 @@ def application_model_storage(api, api_params=None):
storage.models.Task,
]
# if api not in _model_storage:
- _model_storage[api] = storage.ModelStorage(api, items=models, api_params=api_params or {})
- return _model_storage[api]
+ return storage.ModelStorage(api, items=models, api_params=api_params or {})
def application_resource_storage(driver):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 7b65e2b..a4cc4a4 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -100,7 +100,7 @@ class BaseContext(logger.LoggerMixin):
"""
Store the execution in the model storage
"""
- self.model.execution.store(value)
+ self.model.execution.put(value)
@property
def name(self):
@@ -136,6 +136,6 @@ class BaseContext(logger.LoggerMixin):
Read a deployment resource as string from the resource storage
"""
try:
- return self.resource.deployment.data(entry_id=self.deployment.id, path=path)
+ return self.resource.deployment.read(entry_id=self.deployment.id, path=path)
except exceptions.StorageError:
- return self.resource.blueprint.data(entry_id=self.blueprint.id, path=path)
+ return self.resource.blueprint.read(entry_id=self.blueprint.id, path=path)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/orchestrator/context/toolbelt.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/toolbelt.py b/aria/orchestrator/context/toolbelt.py
index ae0e1ff..ceaeb72 100644
--- a/aria/orchestrator/context/toolbelt.py
+++ b/aria/orchestrator/context/toolbelt.py
@@ -33,7 +33,7 @@ class NodeToolBelt(object):
:return:
"""
assert isinstance(self._op_context, operation.NodeOperationContext)
- filters = {'target_node_instance_storage_id': self._op_context.node_instance.storage_id}
+ filters = {'target_node_instance_fk': self._op_context.node_instance.storage_id}
for relationship_instance in \
self._op_context.model.relationship_instance.iter(filters=filters):
yield relationship_instance.source_node_instance
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index 8797271..7d86bf3 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -57,9 +57,9 @@ class WorkflowContext(BaseContext):
created_at=now,
status=execution_cls.PENDING,
parameters=self.parameters,
- deployment_storage_id=self.deployment.storage_id
+ deployment_fk=self.deployment.storage_id
)
- self.model.execution.store(execution)
+ self.model.execution.put(execution)
@property
def nodes(self):
@@ -68,7 +68,7 @@ class WorkflowContext(BaseContext):
"""
return self.model.node.iter(
filters={
- 'deployment_storage_id': self.deployment.storage_id
+ 'deployment_fk': self.deployment.storage_id
}
)
@@ -79,7 +79,7 @@ class WorkflowContext(BaseContext):
"""
return self.model.node_instance.iter(
filters={
- 'deployment_storage_id': self.deployment.storage_id
+ 'deployment_fk': self.deployment.storage_id
}
)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index 358315c..1c12407 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -18,7 +18,7 @@ Provides the tasks to be entered into the task graph
"""
from uuid import uuid4
-from aria import storage
+from aria.storage import models
from ... import context
from .. import exceptions
@@ -75,8 +75,8 @@ class OperationTask(BaseTask):
:param actor: the operation host on which this operation is registered.
:param inputs: operation inputs.
"""
- assert isinstance(actor, (storage.models.NodeInstance,
- storage.models.RelationshipInstance))
+ assert isinstance(actor, (models.NodeInstance,
+ models.RelationshipInstance))
super(OperationTask, self).__init__()
self.actor = actor
self.name = '{name}.{actor.id}'.format(name=name, actor=actor)
@@ -97,7 +97,7 @@ class OperationTask(BaseTask):
:param instance: the node of which this operation belongs to.
:param name: the name of the operation.
"""
- assert isinstance(instance, storage.models.NodeInstance)
+ assert isinstance(instance, models.NodeInstance)
operation_details = instance.node.operations[name]
operation_inputs = operation_details.get('inputs', {})
operation_inputs.update(inputs or {})
@@ -119,7 +119,7 @@ class OperationTask(BaseTask):
with 'source_operations' and 'target_operations'
:param inputs any additional inputs to the operation
"""
- assert isinstance(instance, storage.models.RelationshipInstance)
+ assert isinstance(instance, models.RelationshipInstance)
if operation_end not in [cls.TARGET_OPERATION, cls.SOURCE_OPERATION]:
raise exceptions.TaskException('The operation end should be {0} or {1}'.format(
cls.TARGET_OPERATION, cls.SOURCE_OPERATION
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index fd00307..3b18965 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -128,7 +128,7 @@ class OperationTask(BaseTask):
retry_interval=api_task.retry_interval,
ignore_failure=api_task.ignore_failure,
)
- self._workflow_context.model.task.store(operation_task)
+ self._workflow_context.model.task.put(operation_task)
self._ctx = context_class(name=api_task.name,
workflow_context=self._workflow_context,
@@ -162,7 +162,7 @@ class OperationTask(BaseTask):
@model_task.setter
def model_task(self, value):
- self._workflow_context.model.task.store(value)
+ self._workflow_context.model.task.put(value)
@property
def context(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/storage/__init__.py
----------------------------------------------------------------------
diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py
index 6740cd0..fd69d47 100644
--- a/aria/storage/__init__.py
+++ b/aria/storage/__init__.py
@@ -37,93 +37,28 @@ API:
* drivers - module, a pool of Aria standard drivers.
* StorageDriver - class, abstract model implementation.
"""
-
-from aria.logger import LoggerMixin
+from .core import (
+ Storage,
+ ModelStorage,
+ ResourceStorage,
+)
from . import (
- models,
exceptions,
- api as storage_api,
- structures
+ api,
+ structures,
+ core,
+ filesystem_rapi,
+ sql_mapi,
+ models
)
-
__all__ = (
- 'ModelStorage',
- 'models',
+ 'exceptions',
'structures',
+ # 'Storage',
+ # 'ModelStorage',
+ # 'ResourceStorage',
+ 'filesystem_rapi',
+ 'sql_mapi',
+ 'api'
)
-
-
-class Storage(LoggerMixin):
- """
- Represents the storage
- """
- def __init__(self, api, items=(), api_params=None, **kwargs):
- self._api_params = api_params or {}
- super(Storage, self).__init__(**kwargs)
- self.api = api
- self.registered = {}
- for item in items:
- self.register(item)
- self.logger.debug('{name} object is ready: {0!r}'.format(
- self, name=self.__class__.__name__))
-
- def __repr__(self):
- return '{name}(api={self.api})'.format(name=self.__class__.__name__, self=self)
-
- def __getattr__(self, item):
- try:
- return self.registered[item]
- except KeyError:
- return super(Storage, self).__getattribute__(item)
-
- def register(self, entry):
- """
- Register the entry to the storage
- :param name:
- :return:
- """
- raise NotImplementedError('Subclass must implement abstract register method')
-
-
-class ResourceStorage(Storage):
- """
- Represents resource storage.
- """
- def register(self, name):
- """
- Register the resource type to resource storage.
- :param name:
- :return:
- """
- self.registered[name] = self.api(name=name, **self._api_params)
- self.registered[name].create()
- self.logger.debug('setup {name} in storage {self!r}'.format(name=name, self=self))
-
-
-class ModelStorage(Storage):
- """
- Represents model storage.
- """
- def register(self, model):
- """
- Register the model into the model storage.
- :param model: the model to register.
- :return:
- """
- model_name = storage_api.generate_lower_name(model)
- if model_name in self.registered:
- self.logger.debug('{name} in already storage {self!r}'.format(name=model_name,
- self=self))
- return
- self.registered[model_name] = self.api(name=model_name, model_cls=model, **self._api_params)
- self.registered[model_name].create()
- self.logger.debug('setup {name} in storage {self!r}'.format(name=model_name, self=self))
-
- def drop(self):
- """
- Drop all the tables from the model.
- :return:
- """
- for mapi in self.registered.values():
- mapi.drop()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/storage/api.py
----------------------------------------------------------------------
diff --git a/aria/storage/api.py b/aria/storage/api.py
index 7bdbd5d..1a7ca45 100644
--- a/aria/storage/api.py
+++ b/aria/storage/api.py
@@ -60,12 +60,6 @@ class StorageAPI(object):
"""
pass
- def __getattr__(self, item):
- try:
- return self.registered[item]
- except KeyError:
- return super(StorageAPI, self).__getattribute__(item)
-
class ModelAPI(StorageAPI):
"""
@@ -109,7 +103,7 @@ class ModelAPI(StorageAPI):
"""
raise NotImplementedError('Subclass must implement abstract get method')
- def store(self, entry, **kwargs):
+ def put(self, entry, **kwargs):
"""
Store entry in storage
@@ -171,9 +165,9 @@ class ResourceAPI(StorageAPI):
"""
return self._name
- def data(self, entry_id, path=None, **kwargs):
+ def read(self, entry_id, path=None, **kwargs):
"""
- Get a bytesteam from the storagee.
+ Get a bytesteam from the storage.
:param entry_id:
:param path:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/storage/core.py
----------------------------------------------------------------------
diff --git a/aria/storage/core.py b/aria/storage/core.py
new file mode 100644
index 0000000..e452698
--- /dev/null
+++ b/aria/storage/core.py
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Aria's storage Sub-Package
+Path: aria.storage
+
+Storage package is a generic abstraction over different storage types.
+We define this abstraction with the following components:
+
+1. storage: simple mapi to use
+2. driver: implementation of the database client mapi.
+3. model: defines the structure of the table/document.
+4. field: defines a field/item in the model.
+
+API:
+ * application_storage_factory - function, default Aria storage factory.
+ * Storage - class, simple storage mapi.
+ * models - module, default Aria standard models.
+ * structures - module, default Aria structures - holds the base model,
+ and different fields types.
+ * Model - class, abstract model implementation.
+ * Field - class, base field implementation.
+ * IterField - class, base iterable field implementation.
+ * drivers - module, a pool of Aria standard drivers.
+ * StorageDriver - class, abstract model implementation.
+"""
+
+from aria.logger import LoggerMixin
+from . import api as storage_api
+
+__all__ = (
+ 'Storage',
+ 'ModelStorage',
+ 'ResourceStorage'
+)
+
+
+class Storage(LoggerMixin):
+ """
+ Represents the storage
+ """
+ def __init__(self, api_cls, api_params=None, items=(), **kwargs):
+ self._api_params = api_params or {}
+ super(Storage, self).__init__(**kwargs)
+ self.api = api_cls
+ self.registered = {}
+ for item in items:
+ self.register(item)
+ self.logger.debug('{name} object is ready: {0!r}'.format(
+ self, name=self.__class__.__name__))
+
+ def __repr__(self):
+ return '{name}(api={self.api})'.format(name=self.__class__.__name__, self=self)
+
+ def __getattr__(self, item):
+ try:
+ return self.registered[item]
+ except KeyError:
+ return super(Storage, self).__getattribute__(item)
+
+ def register(self, entry):
+ """
+ Register the entry to the storage
+ :param name:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract register method')
+
+
+class ResourceStorage(Storage):
+ """
+ Represents resource storage.
+ """
+ def register(self, name):
+ """
+ Register the resource type to resource storage.
+ :param name:
+ :return:
+ """
+ self.registered[name] = self.api(name=name, **self._api_params)
+ self.registered[name].create()
+ self.logger.debug('setup {name} in storage {self!r}'.format(name=name, self=self))
+
+
+class ModelStorage(Storage):
+ """
+ Represents model storage.
+ """
+ def register(self, model_cls):
+ """
+ Register the model into the model storage.
+ :param model_cls: the model to register.
+ :return:
+ """
+ model_name = storage_api.generate_lower_name(model_cls)
+ if model_name in self.registered:
+ self.logger.debug('{name} in already storage {self!r}'.format(name=model_name,
+ self=self))
+ return
+ self.registered[model_name] = self.api(name=model_name,
+ model_cls=model_cls,
+ **self._api_params)
+ self.registered[model_name].create()
+ self.logger.debug('setup {name} in storage {self!r}'.format(name=model_name, self=self))
+
+ def drop(self):
+ """
+ Drop all the tables from the model.
+ :return:
+ """
+ for mapi in self.registered.values():
+ mapi.drop()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/storage/filesystem_api.py
----------------------------------------------------------------------
diff --git a/aria/storage/filesystem_api.py b/aria/storage/filesystem_api.py
deleted file mode 100644
index f28d1f6..0000000
--- a/aria/storage/filesystem_api.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-Filesystem based API Base
-"""
-from multiprocessing import RLock
-
-from . import api
-
-
-class BaseFileSystemAPI(api.StorageAPI):
- """
- Base class which handles storage on the file system.
- """
-
- def create(self, **kwargs):
- super(BaseFileSystemAPI, self).create(**kwargs)
-
- def __init__(self, *args, **kwargs):
- super(BaseFileSystemAPI, self).__init__(*args, **kwargs)
- self._lock = RLock()
-
- def _establish_connection(self):
- self._lock.acquire()
-
- def _destroy_connection(self):
- self._lock.release()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/storage/filesystem_rapi.py
----------------------------------------------------------------------
diff --git a/aria/storage/filesystem_rapi.py b/aria/storage/filesystem_rapi.py
new file mode 100644
index 0000000..af30e1d
--- /dev/null
+++ b/aria/storage/filesystem_rapi.py
@@ -0,0 +1,126 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+SQLalchemy based RAPI
+"""
+import os
+import shutil
+from functools import partial
+from distutils import dir_util # https://github.com/PyCQA/pylint/issues/73; pylint: disable=no-name-in-module
+from multiprocessing import RLock
+
+from aria.storage import (
+ api,
+ exceptions
+)
+
+
+class FileSystemResourceAPI(api.ResourceAPI):
+ """
+ File system resource storage.
+ """
+
+ def __init__(self, directory, **kwargs):
+ """
+ File system implementation for storage api.
+ :param str directory: root dir for storage.
+ """
+ super(FileSystemResourceAPI, self).__init__(**kwargs)
+ self.directory = directory
+ self.base_path = os.path.join(self.directory, self.name)
+ self._join_path = partial(os.path.join, self.base_path)
+ self._lock = RLock()
+
+ def _establish_connection(self):
+ self._lock.acquire()
+
+ def _destroy_connection(self):
+ self._lock.release()
+
+ def __repr__(self):
+ return '{cls.__name__}(directory={self.directory})'.format(
+ cls=self.__class__, self=self)
+
+ def create(self, **kwargs):
+ """
+ Create directory in storage by path.
+ tries to create the root directory as well.
+ :param str name: path of file in storage.
+ """
+ try:
+ os.makedirs(self.directory)
+ except (OSError, IOError):
+ pass
+ os.makedirs(self.base_path)
+
+ def read(self, entry_id, path=None, **_):
+ """
+ Retrieve the content of a file system storage resource.
+
+ :param str entry_type: the type of the entry.
+ :param str entry_id: the id of the entry.
+ :param str path: a path to a specific resource.
+ :return: the content of the file
+ :rtype: bytes
+ """
+ resource_relative_path = os.path.join(self.name, entry_id, path or '')
+ resource = os.path.join(self.directory, resource_relative_path)
+ if not os.path.exists(resource):
+ raise exceptions.StorageError("Resource {0} does not exist".
+ format(resource_relative_path))
+ if not os.path.isfile(resource):
+ resources = os.listdir(resource)
+ if len(resources) != 1:
+ raise exceptions.StorageError('No resource in path: {0}'.format(resource))
+ resource = os.path.join(resource, resources[0])
+ with open(resource, 'rb') as resource_file:
+ return resource_file.read()
+
+ def download(self, entry_id, destination, path=None, **_):
+ """
+ Download a specific file or dir from the file system resource storage.
+
+ :param str entry_type: the name of the entry.
+ :param str entry_id: the id of the entry
+ :param str destination: the destination of the files.
+ :param str path: a path on the remote machine relative to the root of the entry.
+ """
+ resource_relative_path = os.path.join(self.name, entry_id, path or '')
+ resource = os.path.join(self.directory, resource_relative_path)
+ if not os.path.exists(resource):
+ raise exceptions.StorageError("Resource {0} does not exist".
+ format(resource_relative_path))
+ if os.path.isfile(resource):
+ shutil.copy2(resource, destination)
+ else:
+ dir_util.copy_tree(resource, destination) # pylint: disable=no-member
+
+ def upload(self, entry_id, source, path=None, **_):
+ """
+ Uploads a specific file or dir to the file system resource storage.
+
+ :param str entry_type: the name of the entry.
+ :param str entry_id: the id of the entry
+ :param source: the source of the files to upload.
+ :param path: the destination of the file/s relative to the entry root dir.
+ """
+ resource_directory = os.path.join(self.directory, self.name, entry_id)
+ if not os.path.exists(resource_directory):
+ os.makedirs(resource_directory)
+ destination = os.path.join(resource_directory, path or '')
+ if os.path.isfile(source):
+ shutil.copy2(source, destination)
+ else:
+ dir_util.copy_tree(source, destination) # pylint: disable=no-member
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/storage/mapi/__init__.py
----------------------------------------------------------------------
diff --git a/aria/storage/mapi/__init__.py b/aria/storage/mapi/__init__.py
deleted file mode 100644
index d4a8c6e..0000000
--- a/aria/storage/mapi/__init__.py
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-A collection of MAPIs
-"""
-from .filesystem import FileSystemModelAPI
-from .inmemory import InMemoryModelAPI
-from .sql import SQLAlchemyModelAPI
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/storage/mapi/filesystem.py
----------------------------------------------------------------------
diff --git a/aria/storage/mapi/filesystem.py b/aria/storage/mapi/filesystem.py
deleted file mode 100644
index fa24869..0000000
--- a/aria/storage/mapi/filesystem.py
+++ /dev/null
@@ -1,118 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-SQLalchemy based MAPI
-"""
-import os
-from functools import partial
-
-import jsonpickle
-
-from .. import (
- api,
- filesystem_api
-)
-
-
-class FileSystemModelAPI(api.ModelAPI, filesystem_api.BaseFileSystemAPI):
- """
- File system model storage.
- """
-
- def __init__(self, directory, **kwargs):
- """
- File system implementation for storage api.
- :param str directory: root dir for storage.
- """
- super(FileSystemModelAPI, self).__init__(**kwargs)
- self.directory = directory
- self.base_path = os.path.join(self.directory, self.name)
- self._join_path = partial(os.path.join, self.base_path)
-
- def __repr__(self):
- return '{cls.__name__}(directory={self.directory})'.format(
- cls=self.__class__, self=self)
-
- def create(self, **kwargs):
- """
- Create directory in storage by path.
- tries to create the root directory as well.
- :param str name: path of file in storage.
- """
- with self.connect():
- try:
- os.makedirs(self.directory)
- except (OSError, IOError):
- pass
- os.makedirs(self.base_path)
-
- def get(self, entry_id, **kwargs):
- """
- Getter from storage.
- :param str entry_id: id of the file to get from storage.
- :return: value of file from storage.
- :rtype: dict
- """
- with self.connect():
- with open(self._join_path(entry_id)) as file_obj:
- return jsonpickle.loads(file_obj.read())
-
- def store(self, entry, **kwargs):
- """
- Delete from storage.
- :param Model entry: name of directory in storage.
- """
- with self.connect():
- with open(self._join_path(entry.id), 'w') as file_obj:
- file_obj.write(jsonpickle.dumps(entry))
-
- def delete(self, entry_id, **kwargs):
- """
- Delete from storage.
- :param str name: name of directory in storage.
- :param str entry_id: id of the file to delete from storage.
- """
- with self.connect():
- os.remove(self._join_path(entry_id))
-
- def iter(self, filters=None, **kwargs):
- """
- Generator over the entries of directory in storage.
- :param dict filters: filters for query
- """
- filters = filters or {}
- with self.connect():
- for entry_id in os.listdir(self.base_path):
- value = self.get(entry_id=entry_id)
- for filter_name, filter_value in filters.items():
- if value.get(filter_name) != filter_value:
- break
- else:
- yield value
-
- def update(self, entry_id, **kwargs):
- """
- Updates and entry in storage.
-
- :param str name: name of table/document in storage.
- :param str entry_id: id of the document to get from storage.
- :param kwargs: the fields to update.
- :return:
- """
- with self.connect():
- entry = self.get(entry_id)
- for key, value in kwargs.items():
- setattr(entry, key, value)
- self.store(entry)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/storage/mapi/inmemory.py
----------------------------------------------------------------------
diff --git a/aria/storage/mapi/inmemory.py b/aria/storage/mapi/inmemory.py
deleted file mode 100644
index 09dbcfc..0000000
--- a/aria/storage/mapi/inmemory.py
+++ /dev/null
@@ -1,148 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-# DEPRECATED
-#pylint: skip-file
-
-from collections import namedtuple
-
-
-from .. import api
-from ..structures import orm
-
-
-_Pointer = namedtuple('_Pointer', 'name, is_iter')
-
-storage = {}
-
-
-class InMemoryModelAPI(api.ModelAPI):
- def __init__(self, *args, **kwargs):
- """
- Managing the model in the storage, using the driver.
-
- :param basestring name: the name of the model.
- :param ModelDriver driver: the driver which supports this model in the storage.
- :param Model model_cls: table/document class model.
- """
- super(InMemoryModelAPI, self).__init__(*args, **kwargs)
- self.pointer_mapping = {}
-
- def create(self):
- """
- Creates the model in the storage.
- """
- with self.connect():
- storage[self.name] = {}
- self._setup_pointers_mapping()
-
- def _setup_pointers_mapping(self):
- for field_name, field_cls in vars(self.model_cls).items():
- if not (getattr(field_cls, 'impl', None) is not None and
- isinstance(field_cls.impl.parent_token, orm.RelationshipProperty)):
- continue
- pointer_key = _Pointer(field_name, is_iter=False)
- self.pointer_mapping[pointer_key] = self.__class__(
- name=api.generate_lower_name(field_cls.class_),
- model_cls=field_cls.class_)
-
- def get(self, entry_id, **kwargs):
- """
- Getter for the model from the storage.
-
- :param basestring entry_id: the id of the table/document.
- :return: model instance
- :rtype: Model
- """
- with self.connect():
- data = storage[self.name][entry_id]
- data.update(self._get_pointers(data, **kwargs))
- return self.model_cls(**data)
-
- def store(self, entry, **kwargs):
- """
- Setter for the model in the storage.
-
- :param Model entry: the table/document to store.
- """
- with self.connect():
- data = entry.to_dict
- data.update(self._store_pointers(data, **kwargs))
- storage[self.name][entry.id] = data
-
- def delete(self, entry_id, **kwargs):
- """
- Delete the model from storage.
-
- :param basestring entry_id: id of the entity to delete from storage.
- """
- entry = self.get(entry_id)
- with self.connect():
- self._delete_pointers(entry, **kwargs)
- storage[self.name].pop(entry_id)
-
- def iter(self, **kwargs):
- """
- Generator over the entries of model in storage.
- """
- with self.connect():
- for data in storage[self.name].values():
- data.update(self._get_pointers(data, **kwargs))
- yield self.model_cls(**data)
-
- def update(self, entry_id, **kwargs):
- """
- Updates and entry in storage.
-
- :param str entry_id: the id of the table/document.
- :param kwargs: the fields to update.
- :return:
- """
- with self.connect():
- storage[self.name][entry_id].update(**kwargs)
-
- def _get_pointers(self, data, **kwargs):
- pointers = {}
- for field, schema in self.pointer_mapping.items():
- if field.is_iter:
- pointers[field.name] = [
- schema.get(entry_id=pointer_id, **kwargs)
- for pointer_id in data[field.name]
- if pointer_id]
- elif data[field.name]:
- pointers[field.name] = schema.get(entry_id=data[field.name], **kwargs)
- return pointers
-
- def _store_pointers(self, data, **kwargs):
- pointers = {}
- for field, model_api in self.pointer_mapping.items():
- if field.is_iter:
- pointers[field.name] = []
- for iter_entity in data[field.name]:
- pointers[field.name].append(iter_entity.id)
- model_api.store(iter_entity, **kwargs)
- else:
- pointers[field.name] = data[field.name].id
- model_api.store(data[field.name], **kwargs)
- return pointers
-
- def _delete_pointers(self, entry, **kwargs):
- for field, schema in self.pointer_mapping.items():
- if field.is_iter:
- for iter_entry in getattr(entry, field.name):
- schema.delete(iter_entry.id, **kwargs)
- else:
- schema.delete(getattr(entry, field.name).id, **kwargs)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/storage/mapi/sql.py
----------------------------------------------------------------------
diff --git a/aria/storage/mapi/sql.py b/aria/storage/mapi/sql.py
deleted file mode 100644
index 652dc9f..0000000
--- a/aria/storage/mapi/sql.py
+++ /dev/null
@@ -1,368 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-SQLAlchemy based MAPI
-"""
-
-from sqlite3 import DatabaseError as SQLiteDBError
-from sqlalchemy.exc import SQLAlchemyError
-from sqlalchemy.sql.elements import Label
-
-try:
- from psycopg2 import DatabaseError as Psycopg2DBError
- sql_errors = (SQLAlchemyError, SQLiteDBError, Psycopg2DBError)
-except ImportError:
- sql_errors = (SQLAlchemyError, SQLiteDBError)
- Psycopg2DBError = None
-
-from aria.utils.collections import OrderedDict
-
-from ... import storage
-
-
-DEFAULT_SQL_DIALECT = 'sqlite'
-
-
-class SQLAlchemyModelAPI(storage.api.ModelAPI):
- """
- SQL based MAPI.
- """
-
- def __init__(self,
- engine,
- session,
- **kwargs):
- super(SQLAlchemyModelAPI, self).__init__(**kwargs)
- self._engine = engine
- self._session = session
-
- def get(self, entry_id, include=None, filters=None, locking=False, **kwargs):
- """Return a single result based on the model class and element ID
- """
- filters = filters or {'id': entry_id}
- query = self._get_query(include, filters)
- if locking:
- query = query.with_for_update()
- result = query.first()
-
- if not result:
- raise storage.exceptions.StorageError(
- 'Requested {0} with ID `{1}` was not found'
- .format(self.model_cls.__name__, entry_id)
- )
- return result
-
- def iter(self,
- include=None,
- filters=None,
- pagination=None,
- sort=None,
- **kwargs):
- """Return a (possibly empty) list of `model_class` results
- """
- query = self._get_query(include, filters, sort)
-
- results, _, _, _ = self._paginate(query, pagination)
-
- for result in results:
- yield result
-
- def store(self, entry, **kwargs):
- """Create a `model_class` instance from a serializable `model` object
-
- :param entry: A dict with relevant kwargs, or an instance of a class
- that has a `to_dict` method, and whose attributes match the columns
- of `model_class` (might also my just an instance of `model_class`)
- :return: An instance of `model_class`
- """
- self._session.add(entry)
- self._safe_commit()
- return entry
-
- def delete(self, entry_id, filters=None, **kwargs):
- """Delete a single result based on the model class and element ID
- """
- try:
- instance = self.get(
- entry_id,
- filters=filters
- )
- except storage.exceptions.StorageError:
- raise storage.exceptions.StorageError(
- 'Could not delete {0} with ID `{1}` - element not found'
- .format(
- self.model_cls.__name__,
- entry_id
- )
- )
- self._load_properties(instance)
- self._session.delete(instance)
- self._safe_commit()
- return instance
-
- # TODO: this might need rework
- def update(self, entry, **kwargs):
- """Add `instance` to the DB session, and attempt to commit
-
- :return: The updated instance
- """
- return self.store(entry)
-
- def refresh(self, entry):
- """Reload the instance with fresh information from the DB
-
- :param entry: Instance to be re-loaded from the DB
- :return: The refreshed instance
- """
- self._session.refresh(entry)
- self._load_properties(entry)
- return entry
-
- def _destroy_connection(self):
- pass
-
- def _establish_connection(self):
- pass
-
- def create(self):
- self.model_cls.__table__.create(self._engine)
-
- def drop(self):
- """
- Drop the table from the storage.
- :return:
- """
- self.model_cls.__table__.drop(self._engine)
-
- def _safe_commit(self):
- """Try to commit changes in the session. Roll back if exception raised
- Excepts SQLAlchemy errors and rollbacks if they're caught
- """
- try:
- self._session.commit()
- except sql_errors as e:
- self._session.rollback()
- raise storage.exceptions.StorageError(
- 'SQL Storage error: {0}'.format(str(e))
- )
-
- def _get_base_query(self, include, joins):
- """Create the initial query from the model class and included columns
-
- :param include: A (possibly empty) list of columns to include in
- the query
- :param joins: A (possibly empty) list of models on which the query
- should join
- :return: An SQLAlchemy AppenderQuery object
- """
-
- # If only some columns are included, query through the session object
- if include:
- query = self._session.query(*include)
- else:
- # If all columns should be returned, query directly from the model
- query = self._session.query(self.model_cls)
-
- # Add any joins that might be necessary
- for join_model in joins:
- query = query.join(join_model)
-
- return query
-
- @staticmethod
- def _sort_query(query, sort=None):
- """Add sorting clauses to the query
-
- :param query: Base SQL query
- :param sort: An optional dictionary where keys are column names to
- sort by, and values are the order (asc/desc)
- :return: An SQLAlchemy AppenderQuery object
- """
- if sort:
- for column, order in sort.items():
- if order == 'desc':
- column = column.desc()
- query = query.order_by(column)
- return query
-
- @staticmethod
- def _filter_query(query, filters):
- """Add filter clauses to the query
-
- :param query: Base SQL query
- :param filters: An optional dictionary where keys are column names to
- filter by, and values are values applicable for those columns (or lists
- of such values)
- :return: An SQLAlchemy AppenderQuery object
- """
- for column, value in filters.items():
- # If there are multiple values, use `in_`, otherwise, use `eq`
- if isinstance(value, (list, tuple)):
- query = query.filter(column.in_(value))
- else:
- query = query.filter(column == value)
-
- return query
-
- def _get_query(self,
- include=None,
- filters=None,
- sort=None):
- """Get an SQL query object based on the params passed
-
- :param include: An optional list of columns to include in the query
- :param filters: An optional dictionary where keys are column names to
- filter by, and values are values applicable for those columns (or lists
- of such values)
- :param sort: An optional dictionary where keys are column names to
- sort by, and values are the order (asc/desc)
- :return: A sorted and filtered query with only the relevant
- columns
- """
-
- include = include or []
- filters = filters or dict()
- sort = sort or OrderedDict()
-
- joins = self._get_join_models_list(include, filters, sort)
- include, filters, sort = self._get_columns_from_field_names(
- include, filters, sort
- )
-
- query = self._get_base_query(include, joins)
- query = self._filter_query(query, filters)
- query = self._sort_query(query, sort)
- return query
-
- def _get_columns_from_field_names(self,
- include,
- filters,
- sort):
- """Go over the optional parameters (include, filters, sort), and
- replace column names with actual SQLA column objects
- """
- all_includes = [self._get_column(c) for c in include]
- include = []
- # Columns that are inferred from properties (Labels) should be included
- # last for the following joins to work properly
- for col in all_includes:
- if isinstance(col, Label):
- include.append(col)
- else:
- include.insert(0, col)
-
- filters = dict((self._get_column(c), filters[c]) for c in filters)
- sort = OrderedDict((self._get_column(c), sort[c]) for c in sort)
-
- return include, filters, sort
-
- def _get_join_models_list(self, include, filters, sort):
- """Return a list of models on which the query should be joined, as
- inferred from the include, filter and sort column names
- """
- if not self.model_cls.is_resource:
- return []
-
- all_column_names = include + filters.keys() + sort.keys()
- join_columns = set(column_name for column_name in all_column_names
- if self._is_join_column(column_name))
-
- # If the only columns included are the columns on which we would
- # normally join, there isn't actually a need to join, as the FROM
- # clause in the query will be generated from the relevant models anyway
- if include == list(join_columns):
- return []
-
- # Initializing a set, because the same model can appear in several
- # join lists
- join_models = set()
- for column_name in join_columns:
- join_models.update(
- self.model_cls.join_properties[column_name]['models']
- )
- # Sort the models by their correct join order
- join_models = sorted(join_models,
- key=lambda model: model.join_order, reverse=True)
-
- return join_models
-
- def _is_join_column(self, column_name):
- """Return False if the column name corresponds to a regular SQLA
- column that `model_class` has.
- Return True if the column that should be used is a join column (see
- SQLModelBase for an explanation)
- """
- return self.model_cls.is_resource and \
- column_name in self.model_cls.join_properties
-
- def _get_column(self, column_name):
- """Return the column on which an action (filtering, sorting, etc.)
- would need to be performed. Can be either an attribute of the class,
- or needs to be inferred from the class' `join_properties` property
- """
- if self._is_join_column(column_name):
- return self.model_cls.join_properties[column_name]['column']
- else:
- return getattr(self.model_cls, column_name)
-
- # TODO is this really needed in aria?
- @staticmethod
- def _paginate(query, pagination):
- """Paginate the query by size and offset
-
- :param query: Current SQLAlchemy query object
- :param pagination: An optional dict with size and offset keys
- :return: A tuple with four elements:
- - results: `size` items starting from `offset`
- - the total count of items
- - `size` [default: 0]
- - `offset` [default: 0]
- """
- if pagination:
- size = pagination.get('size', 0)
- offset = pagination.get('offset', 0)
- total = query.order_by(None).count() # Fastest way to count
- results = query.limit(size).offset(offset).all()
- return results, total, size, offset
- else:
- results = query.all()
- return results, len(results), 0, 0
-
- @staticmethod
- def _load_properties(instance):
- """A helper method used to overcome a problem where the properties
- that rely on joins aren't being loaded automatically
- """
- if instance.is_resource:
- for prop in instance.join_properties:
- getattr(instance, prop)
-
-
-class ListResult(object):
- """
- a ListResult contains results about the requested items.
- """
- def __init__(self, items, metadata):
- self.items = items
- self.metadata = metadata
-
- def __len__(self):
- return len(self.items)
-
- def __iter__(self):
- return iter(self.items)
-
- def __getitem__(self, item):
- return self.items[item]
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/storage/models.py
----------------------------------------------------------------------
diff --git a/aria/storage/models.py b/aria/storage/models.py
index b013a51..0047ab5 100644
--- a/aria/storage/models.py
+++ b/aria/storage/models.py
@@ -36,9 +36,13 @@ classes:
* ProviderContext - provider context implementation model.
* Plugin - plugin implementation model.
"""
+from collections import namedtuple
from datetime import datetime
from uuid import uuid4
+from sqlalchemy.ext.declarative.base import declared_attr
+from sqlalchemy.orm import validates
+
from .structures import (
SQLModelBase,
Column,
@@ -53,12 +57,12 @@ from .structures import (
MutableDict,
Dict,
foreign_key,
- one_to_many_relationship
+ one_to_many_relationship,
+ association_proxy
)
__all__ = (
'Blueprint',
- 'Snapshot',
'Deployment',
'DeploymentUpdateStep',
'DeploymentUpdate',
@@ -72,28 +76,20 @@ __all__ = (
'Plugin',
)
-# todo: sort this, maybe move from mgr or move from aria???
-# TODO: this must change
-ACTION_TYPES = ('a')
-ENTITY_TYPES = ('b')
-
def uuid_generator():
- """
- wrapper function which generates ids
- """
return str(uuid4())
+#pylint: disable=no-self-argument
+
+
class Blueprint(SQLModelBase):
"""
Blueprint model representation.
"""
__tablename__ = 'blueprints'
- storage_id = Column(Integer, primary_key=True, autoincrement=True)
- id = Column(Text, index=True)
-
created_at = Column(DateTime, nullable=False, index=True)
main_file_name = Column(Text, nullable=False)
plan = Column(MutableDict.as_mutable(Dict), nullable=False)
@@ -101,48 +97,15 @@ class Blueprint(SQLModelBase):
description = Column(Text)
-class Snapshot(SQLModelBase):
- """
- Snapshot model representation.
- """
- __tablename__ = 'snapshots'
-
- CREATED = 'created'
- FAILED = 'failed'
- CREATING = 'creating'
- UPLOADED = 'uploaded'
-
- STATES = [CREATED, FAILED, CREATING, UPLOADED]
- END_STATES = [CREATED, FAILED, UPLOADED]
-
- storage_id = Column(Integer, primary_key=True, autoincrement=True)
- id = Column(Text, index=True)
-
- created_at = Column(DateTime, nullable=False, index=True)
- status = Column(Enum(*STATES, name='snapshot_status'))
- error = Column(Text)
-
-
class Deployment(SQLModelBase):
"""
Deployment model representation.
"""
__tablename__ = 'deployments'
- # See base class for an explanation on these properties
- join_properties = {
- 'blueprint_id': {
- # No need to provide the Blueprint table, as it's already joined
- 'models': [Blueprint],
- 'column': Blueprint.id.label('blueprint_id')
- },
- }
- join_order = 2
+ _private_fields = ['blueprint_fk']
- _private_fields = ['blueprint_storage_id']
-
- storage_id = Column(Integer, primary_key=True, autoincrement=True)
- id = Column(Text, index=True)
+ blueprint_fk = foreign_key(Blueprint.storage_id)
created_at = Column(DateTime, nullable=False, index=True)
description = Column(Text)
@@ -156,21 +119,9 @@ class Deployment(SQLModelBase):
updated_at = Column(DateTime)
workflows = Column(MutableDict.as_mutable(Dict))
- blueprint_storage_id = foreign_key(Blueprint)
- blueprint = one_to_many_relationship(
- child_class_name='Deployment',
- column_name='blueprint_storage_id',
- parent_class_name='Blueprint',
- back_reference_name='deployments'
- )
-
- @property
- def blueprint_id(self):
- """
- Returns the blueprint is
- :return:
- """
- return self.blueprint.id
+ @declared_attr
+ def blueprint(cls):
+ return one_to_many_relationship(cls, Blueprint, cls.blueprint_fk)
class Execution(SQLModelBase):
@@ -187,28 +138,37 @@ class Execution(SQLModelBase):
CANCELLING = 'cancelling'
FORCE_CANCELLING = 'force_cancelling'
- STATES = [TERMINATED, FAILED, CANCELLED, PENDING, STARTED,
- CANCELLING, FORCE_CANCELLING]
+ STATES = [TERMINATED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING, FORCE_CANCELLING]
END_STATES = [TERMINATED, FAILED, CANCELLED]
ACTIVE_STATES = [state for state in STATES if state not in END_STATES]
- # See base class for an explanation on these properties
- join_properties = {
- 'blueprint_id': {
- 'models': [Deployment, Blueprint],
- 'column': Blueprint.id.label('blueprint_id')
- },
- 'deployment_id': {
- 'models': [Deployment],
- 'column': Deployment.id.label('deployment_id')
- }
+ VALID_TRANSITIONS = {
+ PENDING: [STARTED, CANCELLED],
+ STARTED: END_STATES + [CANCELLING],
+ CANCELLING: END_STATES
}
- join_order = 3
- _private_fields = ['deployment_storage_id']
-
- storage_id = Column(Integer, primary_key=True, autoincrement=True)
- id = Column(Text, index=True)
+ @validates('status')
+ def validate_status(self, key, value):
+ """Validation function that verifies execution status transitions are OK"""
+ try:
+ current_status = getattr(self, key)
+ except AttributeError:
+ return
+ valid_transitions = Execution.VALID_TRANSITIONS.get(current_status, [])
+ if all([current_status is not None,
+ current_status != value,
+ value not in valid_transitions]):
+ raise ValueError('Cannot change execution status from {current} to {new}'.format(
+ current=current_status,
+ new=value))
+ return value
+
+ deployment_id = association_proxy('deployment', 'id')
+ blueprint_id = association_proxy('deployment', 'blueprint_id')
+
+ deployment_fk = foreign_key(Deployment.storage_id, nullable=True)
+ _private_fields = ['deployment_fk']
created_at = Column(DateTime, index=True)
started_at = Column(DateTime, nullable=True, index=True)
@@ -216,32 +176,12 @@ class Execution(SQLModelBase):
error = Column(Text, nullable=True)
is_system_workflow = Column(Boolean, nullable=False, default=False)
parameters = Column(MutableDict.as_mutable(Dict))
- status = Column(Enum(*STATES, name='execution_status'))
+ status = Column(Enum(*STATES, name='execution_status'), default=PENDING)
workflow_id = Column(Text, nullable=False)
- deployment_storage_id = foreign_key(Deployment, nullable=True)
- deployment = one_to_many_relationship(
- child_class_name='Execution',
- column_name='deployment_storage_id',
- parent_class_name='Deployment',
- back_reference_name='executions'
- )
-
- @property
- def deployment_id(self):
- """
- Returns the deployment id
- :return:
- """
- return self.deployment.id if self.deployment else None
-
- @property
- def blueprint_id(self):
- """
- Returns the blueprint id
- :return:
- """
- return self.deployment.blueprint_id if self.deployment else None
+ @declared_attr
+ def deployment(cls):
+ return one_to_many_relationship(cls, Deployment, cls.deployment_fk)
def __str__(self):
id_name, id_value = self._get_unique_id()
@@ -259,64 +199,28 @@ class DeploymentUpdate(SQLModelBase):
"""
__tablename__ = 'deployment_updates'
- # See base class for an explanation on these properties
- join_properties = {
- 'execution_id': {
- 'models': [Execution],
- 'column': Execution.id.label('execution_id')
- },
- 'deployment_id': {
- 'models': [Deployment],
- 'column': Deployment.id.label('deployment_id')
- },
- }
- join_order = 4
-
- _private_fields = ['execution_storage_id']
+ deployment_id = association_proxy('deployment', 'id')
+ execution_id = association_proxy('execution', 'id')
- storage_id = Column(Integer, primary_key=True, autoincrement=True)
- id = Column(Text, index=True)
+ deployment_fk = foreign_key(Deployment.storage_id)
+ execution_fk = foreign_key(Execution.storage_id, nullable=True)
+ _private_fields = ['execution_fk', 'deployment_fk']
created_at = Column(DateTime, nullable=False, index=True)
deployment_plan = Column(MutableDict.as_mutable(Dict))
- deployment_update_node_instances = Column(MutableDict.as_mutable(
- Dict))
+ deployment_update_node_instances = Column(MutableDict.as_mutable(Dict))
deployment_update_deployment = Column(MutableDict.as_mutable(Dict))
deployment_update_nodes = Column(MutableDict.as_mutable(Dict))
modified_entity_ids = Column(MutableDict.as_mutable(Dict))
state = Column(Text)
- execution_storage_id = foreign_key(Execution, nullable=True)
- execution = one_to_many_relationship(
- child_class_name='DeploymentUpdate',
- column_name='execution_storage_id',
- parent_class_name='Execution',
- back_reference_name='deployment_updates'
- )
+ @declared_attr
+ def execution(cls):
+ return one_to_many_relationship(cls, Execution, cls.execution_fk)
- deployment_storage_id = foreign_key(Deployment)
- deployment = one_to_many_relationship(
- child_class_name='DeploymentUpdate',
- column_name='deployment_storage_id',
- parent_class_name='Deployment',
- back_reference_name='deployment_updates'
- )
-
- @property
- def execution_id(self):
- """
- Returns the execution id
- :return:
- """
- return self.execution.id if self.execution else None
-
- @property
- def deployment_id(self):
- """
- Rerturns the deployment id
- :return:
- """
- return self.deployment.id
+ @declared_attr
+ def deployment(cls):
+ return one_to_many_relationship(cls, Deployment, cls.deployment_fk)
def to_dict(self, suppress_error=False, **kwargs):
dep_update_dict = super(DeploymentUpdate, self).to_dict(suppress_error)
@@ -330,39 +234,40 @@ class DeploymentUpdateStep(SQLModelBase):
Deployment update step model representation.
"""
__tablename__ = 'deployment_update_steps'
+ _action_types = namedtuple('ACTION_TYPES', 'ADD, REMOVE, MODIFY')
+ ACTION_TYPES = _action_types(ADD='add', REMOVE='remove', MODIFY='modify')
+ _entity_types = namedtuple(
+ 'ENTITY_TYPES',
+ 'NODE, RELATIONSHIP, PROPERTY, OPERATION, WORKFLOW, OUTPUT, DESCRIPTION, GROUP, '
+ 'POLICY_TYPE, POLICY_TRIGGER, PLUGIN')
+ ENTITY_TYPES = _entity_types(
+ NODE='node',
+ RELATIONSHIP='relationship',
+ PROPERTY='property',
+ OPERATION='operation',
+ WORKFLOW='workflow',
+ OUTPUT='output',
+ DESCRIPTION='description',
+ GROUP='group',
+ POLICY_TYPE='policy_type',
+ POLICY_TRIGGER='policy_trigger',
+ PLUGIN='plugin'
+ )
- # See base class for an explanation on these properties
- join_properties = {
- 'deployment_update_id': {
- 'models': [DeploymentUpdate],
- 'column': DeploymentUpdate.id.label('deployment_update_id')
- },
- }
- join_order = 5
-
- _private_fields = ['deployment_update_storage_id']
-
- id = Column(Integer, primary_key=True, autoincrement=True)
+ deployment_update_id = association_proxy('deployment_update', 'id')
+ deployment_update_fk = foreign_key(DeploymentUpdate.storage_id)
+ _private_fields = ['deployment_update_fk']
action = Column(Enum(*ACTION_TYPES, name='action_type'))
entity_id = Column(Text, nullable=False)
entity_type = Column(Enum(*ENTITY_TYPES, name='entity_type'))
- deployment_update_storage_id = foreign_key(DeploymentUpdate)
- deployment_update = one_to_many_relationship(
- child_class_name='DeploymentUpdateStep',
- column_name='deployment_update_storage_id',
- parent_class_name='DeploymentUpdate',
- back_reference_name='steps'
- )
-
- @property
- def deployment_update_id(self):
- """
- Returns the deployment update id
- :return:
- """
- return self.deployment_update.id
+ @declared_attr
+ def deployment_update(cls):
+ return one_to_many_relationship(cls,
+ DeploymentUpdate,
+ cls.deployment_update_fk,
+ backreference='steps')
class DeploymentModification(SQLModelBase):
@@ -378,43 +283,23 @@ class DeploymentModification(SQLModelBase):
STATES = [STARTED, FINISHED, ROLLEDBACK]
END_STATES = [FINISHED, ROLLEDBACK]
- # See base class for an explanation on these properties
- join_properties = {
- 'deployment_id': {
- 'models': [Deployment],
- 'column': Deployment.id.label('deployment_id')
- },
- }
- join_order = 3
-
- _private_fields = ['deployment_storage_id']
-
- storage_id = Column(Integer, primary_key=True, autoincrement=True)
- id = Column(Text, index=True)
+ deployment_fk = foreign_key(Deployment.storage_id)
+ _private_fields = ['deployment_fk']
+ deployment_id = association_proxy('deployment', 'id')
context = Column(MutableDict.as_mutable(Dict))
created_at = Column(DateTime, nullable=False, index=True)
ended_at = Column(DateTime, index=True)
modified_nodes = Column(MutableDict.as_mutable(Dict))
node_instances = Column(MutableDict.as_mutable(Dict))
- status = Column(
- Enum(*STATES, name='deployment_modification_status'))
-
- deployment_storage_id = foreign_key(Deployment)
- deployment = one_to_many_relationship(
- child_class_name='DeploymentModification',
- column_name='deployment_storage_id',
- parent_class_name='Deployment',
- back_reference_name='modifications'
- )
+ status = Column(Enum(*STATES, name='deployment_modification_status'))
- @property
- def deployment_id(self):
- """
- Returns the deployment id
- :return:
- """
- return self.deployment.id
+ @declared_attr
+ def deployment(cls):
+ return one_to_many_relationship(cls,
+ Deployment,
+ cls.deployment_fk,
+ backreference='modifications')
class Node(SQLModelBase):
@@ -425,22 +310,16 @@ class Node(SQLModelBase):
# See base class for an explanation on these properties
is_id_unique = False
- join_properties = {
- 'blueprint_id': {
- 'models': [Deployment, Blueprint],
- 'column': Blueprint.id.label('blueprint_id')
- },
- 'deployment_id': {
- 'models': [Deployment],
- 'column': Deployment.id.label('deployment_id')
- },
- }
- join_order = 3
- _private_fields = ['deployment_storage_id']
+ _private_fields = ['deployment_fk']
+ deployment_fk = foreign_key(Deployment.storage_id)
- storage_id = Column(Integer, primary_key=True, autoincrement=True)
- id = Column(Text, index=True)
+ deployment_id = association_proxy('deployment', 'id')
+ blueprint_id = association_proxy('deployment', 'id')
+
+ @declared_attr
+ def deployment(cls):
+ return one_to_many_relationship(cls, Deployment, cls.deployment_fk)
deploy_number_of_instances = Column(Integer, nullable=False)
# TODO: This probably should be a foreign key, but there's no guarantee
@@ -457,30 +336,6 @@ class Node(SQLModelBase):
type = Column(Text, nullable=False, index=True)
type_hierarchy = Column(PickleType)
- deployment_storage_id = foreign_key(Deployment)
- deployment = one_to_many_relationship(
- child_class_name='Node',
- column_name='deployment_storage_id',
- parent_class_name='Deployment',
- back_reference_name='nodes'
- )
-
- @property
- def deployment_id(self):
- """
- Returns the deployment id
- :return:
- """
- return self.deployment.id
-
- @property
- def blueprint_id(self):
- """
- Returns the blueprint id
- :return:
- """
- return self.deployment.blueprint_id
-
class Relationship(SQLModelBase):
"""
@@ -488,47 +343,30 @@ class Relationship(SQLModelBase):
"""
__tablename__ = 'relationships'
- join_properties = {
- 'blueprint_id': {
- 'models': [Node, Deployment, Blueprint],
- 'column': Blueprint.id.label('blueprint_id')
- },
- 'deployment_id': {
- 'models': [Node, Deployment],
- 'column': Deployment.id.label('deployment_id')
- }
- }
- join_order = 4
- _private_fields = ['relationship_storage_source_node_id',
- 'relationship_storage_target_node_id']
+ blueprint_id = association_proxy('blueprint', 'id')
+ deployment_id = association_proxy('deployment', 'id')
- storage_id = Column(Integer, primary_key=True, autoincrement=True)
- id = Column(Text, index=True)
+ _private_fields = ['source_node_fk', 'target_node_fk']
+
+ source_node_fk = foreign_key(Node.storage_id)
+ target_node_fk = foreign_key(Node.storage_id)
+
+ @declared_attr
+ def source_node(cls):
+ return one_to_many_relationship(cls, Node, cls.source_node_fk, 'outbound_relationships')
+
+ @declared_attr
+ def target_node(cls):
+ return one_to_many_relationship(cls, Node, cls.target_node_fk, 'inbound_relationships')
source_interfaces = Column(MutableDict.as_mutable(Dict))
source_operations = Column(MutableDict.as_mutable(Dict))
target_interfaces = Column(MutableDict.as_mutable(Dict))
target_operations = Column(MutableDict.as_mutable(Dict))
type = Column(String)
- type_hierarchy = Column(PickleType) # TODO: this should be list
+ type_hierarchy = Column(PickleType)
properties = Column(MutableDict.as_mutable(Dict))
- source_node_storage_id = foreign_key(Node)
- target_node_storage_id = foreign_key(Node)
-
- source_node = one_to_many_relationship(
- child_class_name='Relationship',
- column_name='source_node_storage_id',
- parent_class_name='Node',
- back_reference_name='outbound_relationships'
- )
- target_node = one_to_many_relationship(
- child_class_name='Relationship',
- column_name='target_node_storage_id',
- parent_class_name='Node',
- back_reference_name='inbound_relationships'
- )
-
class NodeInstance(SQLModelBase):
"""
@@ -536,20 +374,12 @@ class NodeInstance(SQLModelBase):
"""
__tablename__ = 'node_instances'
- # See base class for an explanation on these properties
- join_properties = {
- 'node_id': {
- 'models': [Node],
- 'column': Node.id.label('node_id')
- },
- 'deployment_id': {
- 'models': [Node, Deployment],
- 'column': Deployment.id.label('deployment_id')
- },
- }
- join_order = 4
+ node_fk = foreign_key(Node.storage_id)
+ deployment_fk = foreign_key(Deployment.storage_id)
+ _private_fields = ['node_fk', 'deployment_fk']
- _private_fields = ['node_storage_id', 'deployment_storage_id']
+ node_id = association_proxy('node', 'id')
+ deployment_id = association_proxy('node', 'deployment_id')
storage_id = Column(Integer, primary_key=True, autoincrement=True)
id = Column(Text, index=True)
@@ -562,29 +392,9 @@ class NodeInstance(SQLModelBase):
state = Column(Text, nullable=False)
version = Column(Integer, default=1)
- node_storage_id = foreign_key(Node)
- node = one_to_many_relationship(
- child_class_name='NodeInstance',
- column_name='node_storage_id',
- parent_class_name='Node',
- back_reference_name='node_instances'
- )
-
- @property
- def node_id(self):
- """
- Returns the node id
- :return:
- """
- return self.node.id
-
- deployment_storage_id = foreign_key(Deployment)
- deployment = one_to_many_relationship(
- child_class_name='NodeInstance',
- column_name='deployment_storage_id',
- parent_class_name='Deployment',
- back_reference_name='node_instances'
- )
+ @declared_attr
+ def node(cls):
+ return one_to_many_relationship(cls, Node, cls.node_fk)
class RelationshipInstance(SQLModelBase):
@@ -593,48 +403,34 @@ class RelationshipInstance(SQLModelBase):
"""
__tablename__ = 'relationship_instances'
- join_properties = {
- 'blueprint_id': {
- 'models': [Relationship, Node, Deployment, Blueprint],
- 'column': Blueprint.id.label('blueprint_id')
- },
- 'deployment_id': {
- 'models': [Relationship, Node, Deployment],
- 'column': Deployment.id.label('deployment_id')
- }
- }
- join_order = 5
+ blueprint_id = association_proxy('blueprint', 'id')
+ deployment_id = association_proxy('deployment', 'id')
- _private_fields = ['relationship_storage_id',
- 'source_node_instance_id',
- 'target_node_instance_id']
+ relationship_fk = foreign_key(Relationship.storage_id)
+ source_node_instance_fk = foreign_key(NodeInstance.storage_id)
+ target_node_instance_fk = foreign_key(NodeInstance.storage_id)
- storage_id = Column(Integer, primary_key=True, autoincrement=True)
- id = Column(Text, index=True)
+ _private_fields = ['relationship_storage_fk',
+ 'source_node_instance_fk',
+ 'target_node_instance_fk']
- type = Column(String)
+ @declared_attr
+ def source_node_instance(cls):
+ return one_to_many_relationship(cls,
+ NodeInstance,
+ cls.source_node_instance_fk,
+ 'outbound_relationship_instances')
- source_node_instance_storage_id = foreign_key(NodeInstance)
- source_node_instance = one_to_many_relationship(
- child_class_name='RelationshipInstance',
- column_name='source_node_instance_storage_id',
- parent_class_name='NodeInstance',
- back_reference_name='outbound_relationship_instances'
- )
- target_node_instance_storage_id = foreign_key(NodeInstance)
- target_node_instance = one_to_many_relationship(
- child_class_name='RelationshipInstance',
- column_name='target_node_instance_storage_id',
- parent_class_name='NodeInstance',
- back_reference_name='inbound_relationship_instances'
- )
- relationship_storage_id = foreign_key(Relationship)
- relationship = one_to_many_relationship(
- child_class_name='RelationshipInstance',
- column_name='relationship_storage_id',
- parent_class_name='Relationship',
- back_reference_name='relationship_instances'
- )
+ @declared_attr
+ def target_node_instance(cls):
+ return one_to_many_relationship(cls,
+ NodeInstance,
+ cls.target_node_instance_fk,
+ 'inbound_relationship_instances')
+
+ @declared_attr
+ def relationship(cls):
+ return one_to_many_relationship(cls, Relationship, cls.relationship_fk)
class ProviderContext(SQLModelBase):
@@ -643,7 +439,6 @@ class ProviderContext(SQLModelBase):
"""
__tablename__ = 'provider_context'
- id = Column(Text, primary_key=True)
name = Column(Text, nullable=False)
context = Column(MutableDict.as_mutable(Dict), nullable=False)
@@ -677,8 +472,18 @@ class Task(SQLModelBase):
"""
__tablename__ = 'task'
+ node_instance_fk = foreign_key(NodeInstance.storage_id, nullable=True)
+ relationship_instance_fk = foreign_key(RelationshipInstance.storage_id, nullable=True)
- _private_fields = ['node_instance_storage_id', 'relationship_instance_storage_id']
+ _private_fields = ['node_instance_fk', 'relationship_instance_fk']
+
+ @declared_attr
+ def node_instance(cls):
+ return one_to_many_relationship(cls, NodeInstance, cls.node_instance_fk)
+
+ @declared_attr
+ def relationship_instance(cls):
+ return one_to_many_relationship(cls, RelationshipInstance, cls.relationship_instance_fk)
PENDING = 'pending'
RETRYING = 'retrying'
@@ -698,25 +503,23 @@ class Task(SQLModelBase):
WAIT_STATES = [PENDING, RETRYING]
END_STATES = [SUCCESS, FAILED]
- class _Validation(object):
-
- @staticmethod
- def validate_max_attempts(_, value, *args):
- """Validates that max attempts is either -1 or a positive number"""
- if value < 1 and value != Task.INFINITE_RETRIES:
- raise ValueError('Max attempts can be either -1 (infinite) or any positive number. '
- 'Got {value}'.format(value=value))
+ @staticmethod
+ @validates('max_attempts')
+ def validate_max_attempts(_, value):
+ """Validates that max attempts is either -1 or a positive number"""
+ if value < 1 and value != Task.INFINITE_RETRIES:
+ raise ValueError('Max attempts can be either -1 (infinite) or any positive number. '
+ 'Got {value}'.format(value=value))
+ return value
INFINITE_RETRIES = -1
- id = Column(String, primary_key=True, default=uuid_generator)
status = Column(Enum(*STATES), name='status', default=PENDING)
execution_id = Column(String)
due_at = Column(DateTime, default=datetime.utcnow, nullable=True)
started_at = Column(DateTime, default=None, nullable=True)
ended_at = Column(DateTime, default=None, nullable=True)
- # , validation_func=_Validation.validate_max_attempts)
max_attempts = Column(Integer, default=1)
retry_count = Column(Integer, default=0)
retry_interval = Column(Float, default=0)
@@ -727,30 +530,13 @@ class Task(SQLModelBase):
operation_mapping = Column(String)
inputs = Column(MutableDict.as_mutable(Dict))
- node_instance_storage_id = foreign_key(NodeInstance, nullable=True)
- relationship_instance_storage_id = foreign_key(RelationshipInstance, nullable=True)
-
- node_instance = one_to_many_relationship(
- child_class_name='Task',
- column_name='node_instance_storage_id',
- parent_class_name='NodeInstance',
- back_reference_name='tasks',
- )
-
- relationship_instance = one_to_many_relationship(
- child_class_name='Task',
- column_name='relationship_instance_storage_id',
- parent_class_name='RelationshipInstance',
- back_reference_name='tasks',
- )
-
@property
def actor_storage_id(self):
"""
Return the actor storage id of the task
:return:
"""
- return self.node_instance_storage_id or self.relationship_instance_storage_id
+ return self.node_instance_fk or self.relationship_instance_fk
@property
def actor(self):
@@ -762,7 +548,7 @@ class Task(SQLModelBase):
def __init__(self, actor, **kwargs):
if isinstance(actor, RelationshipInstance):
- kwargs['relationship_instance_storage_id'] = actor.storage_id
+ kwargs['relationship_instance_fk'] = actor.storage_id
elif isinstance(actor, NodeInstance):
- kwargs['node_instance_storage_id'] = actor.storage_id
+ kwargs['node_instance_fk'] = actor.storage_id
super(Task, self).__init__(**kwargs)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/storage/rapi/__init__.py
----------------------------------------------------------------------
diff --git a/aria/storage/rapi/__init__.py b/aria/storage/rapi/__init__.py
deleted file mode 100644
index 2217281..0000000
--- a/aria/storage/rapi/__init__.py
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-A collection of RAPIs
-"""
-from .filesystem import FileSystemResourceAPI
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/30a43983/aria/storage/rapi/filesystem.py
----------------------------------------------------------------------
diff --git a/aria/storage/rapi/filesystem.py b/aria/storage/rapi/filesystem.py
deleted file mode 100644
index bb188e2..0000000
--- a/aria/storage/rapi/filesystem.py
+++ /dev/null
@@ -1,119 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-SQLalchemy based RAPI
-"""
-import os
-import shutil
-from distutils import dir_util # https://github.com/PyCQA/pylint/issues/73; pylint: disable=no-name-in-module
-from functools import partial
-
-from aria.storage import (
- api,
- filesystem_api,
- exceptions
-)
-
-
-class FileSystemResourceAPI(api.ResourceAPI, filesystem_api.BaseFileSystemAPI):
- """
- File system resource storage.
- """
-
- def __init__(self, directory, **kwargs):
- """
- File system implementation for storage api.
- :param str directory: root dir for storage.
- """
- super(FileSystemResourceAPI, self).__init__(**kwargs)
- self.directory = directory
- self.base_path = os.path.join(self.directory, self.name)
- self._join_path = partial(os.path.join, self.base_path)
-
- def __repr__(self):
- return '{cls.__name__}(directory={self.directory})'.format(
- cls=self.__class__, self=self)
-
- def create(self, **kwargs):
- """
- Create directory in storage by path.
- tries to create the root directory as well.
- :param str name: path of file in storage.
- """
- try:
- os.makedirs(self.directory)
- except (OSError, IOError):
- pass
- os.makedirs(self.base_path)
-
- def data(self, entry_id, path=None, **_):
- """
- Retrieve the content of a file system storage resource.
-
- :param str entry_type: the type of the entry.
- :param str entry_id: the id of the entry.
- :param str path: a path to a specific resource.
- :return: the content of the file
- :rtype: bytes
- """
- resource_relative_path = os.path.join(self.name, entry_id, path or '')
- resource = os.path.join(self.directory, resource_relative_path)
- if not os.path.exists(resource):
- raise exceptions.StorageError("Resource {0} does not exist".
- format(resource_relative_path))
- if not os.path.isfile(resource):
- resources = os.listdir(resource)
- if len(resources) != 1:
- raise exceptions.StorageError('No resource in path: {0}'.format(resource))
- resource = os.path.join(resource, resources[0])
- with open(resource, 'rb') as resource_file:
- return resource_file.read()
-
- def download(self, entry_id, destination, path=None, **_):
- """
- Download a specific file or dir from the file system resource storage.
-
- :param str entry_type: the name of the entry.
- :param str entry_id: the id of the entry
- :param str destination: the destination of the files.
- :param str path: a path on the remote machine relative to the root of the entry.
- """
- resource_relative_path = os.path.join(self.name, entry_id, path or '')
- resource = os.path.join(self.directory, resource_relative_path)
- if not os.path.exists(resource):
- raise exceptions.StorageError("Resource {0} does not exist".
- format(resource_relative_path))
- if os.path.isfile(resource):
- shutil.copy2(resource, destination)
- else:
- dir_util.copy_tree(resource, destination) # pylint: disable=no-member
-
- def upload(self, entry_id, source, path=None, **_):
- """
- Uploads a specific file or dir to the file system resource storage.
-
- :param str entry_type: the name of the entry.
- :param str entry_id: the id of the entry
- :param source: the source of the files to upload.
- :param path: the destination of the file/s relative to the entry root dir.
- """
- resource_directory = os.path.join(self.directory, self.name, entry_id)
- if not os.path.exists(resource_directory):
- os.makedirs(resource_directory)
- destination = os.path.join(resource_directory, path or '')
- if os.path.isfile(source):
- shutil.copy2(source, destination)
- else:
- dir_util.copy_tree(source, destination) # pylint: disable=no-member