You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2016/12/01 12:39:02 UTC
[3/3] incubator-ariatosca git commit: ARIA-30 SQL based storage
implementation
ARIA-30 SQL based storage implementation
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/fe944e58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/fe944e58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/fe944e58
Branch: refs/heads/ARIA-30-SQL-based-storage-implementation
Commit: fe944e58da8c492041aa2ffb364b27ecec0193ec
Parents: fe974e4
Author: mxmrlv <mx...@gmail.com>
Authored: Sun Nov 27 13:20:46 2016 +0200
Committer: mxmrlv <mx...@gmail.com>
Committed: Thu Dec 1 14:38:41 2016 +0200
----------------------------------------------------------------------
aria/__init__.py | 43 +-
aria/orchestrator/__init__.py | 4 +-
aria/orchestrator/context/common.py | 2 +-
aria/orchestrator/context/exceptions.py | 4 +-
aria/orchestrator/context/operation.py | 8 +-
aria/orchestrator/context/toolbelt.py | 13 +-
aria/orchestrator/context/workflow.py | 20 +-
aria/orchestrator/exceptions.py | 7 +-
aria/orchestrator/workflows/api/task.py | 10 +-
aria/orchestrator/workflows/builtin/heal.py | 25 +-
aria/orchestrator/workflows/builtin/install.py | 7 +-
.../orchestrator/workflows/builtin/uninstall.py | 5 +-
.../orchestrator/workflows/builtin/workflows.py | 4 +-
aria/orchestrator/workflows/core/task.py | 21 +-
aria/storage/__init__.py | 379 ++------
aria/storage/api.py | 219 +++++
aria/storage/drivers.py | 416 ---------
aria/storage/exceptions.py | 4 +-
aria/storage/filesystem_api.py | 39 +
aria/storage/mapi/__init__.py | 20 +
aria/storage/mapi/filesystem.py | 118 +++
aria/storage/mapi/inmemory.py | 148 +++
aria/storage/mapi/sql.py | 369 ++++++++
aria/storage/models.py | 912 +++++++++++++------
aria/storage/rapi/__init__.py | 18 +
aria/storage/rapi/filesystem.py | 119 +++
aria/storage/structures.py | 424 ++++-----
requirements.txt | 1 +
tests/mock/context.py | 50 +-
tests/mock/models.py | 68 +-
tests/orchestrator/context/test_operation.py | 36 +-
tests/orchestrator/context/test_toolbelt.py | 47 +-
tests/orchestrator/context/test_workflow.py | 10 +-
tests/orchestrator/workflows/api/test_task.py | 68 +-
.../orchestrator/workflows/builtin/__init__.py | 35 +-
.../workflows/builtin/test_execute_operation.py | 11 +-
.../orchestrator/workflows/builtin/test_heal.py | 18 +-
.../workflows/builtin/test_install.py | 14 +-
.../workflows/builtin/test_uninstall.py | 12 +-
.../orchestrator/workflows/core/test_engine.py | 71 +-
tests/orchestrator/workflows/core/test_task.py | 20 +-
.../test_task_graph_into_exececution_graph.py | 10 +-
tests/requirements.txt | 2 +-
tests/storage/__init__.py | 38 +-
tests/storage/test_drivers.py | 135 ---
tests/storage/test_field.py | 124 ---
tests/storage/test_model_storage.py | 167 ++--
tests/storage/test_models.py | 364 --------
tests/storage/test_models_api.py | 70 --
tests/storage/test_resource_storage.py | 57 +-
50 files changed, 2318 insertions(+), 2468 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/__init__.py
----------------------------------------------------------------------
diff --git a/aria/__init__.py b/aria/__init__.py
index 3f81f98..6e810f0 100644
--- a/aria/__init__.py
+++ b/aria/__init__.py
@@ -23,7 +23,6 @@ import pkgutil
from .VERSION import version as __version__
from .orchestrator.decorators import workflow, operation
-from .storage import ModelStorage, ResourceStorage, models, ModelDriver, ResourceDriver
from . import (
utils,
parser,
@@ -58,37 +57,37 @@ def install_aria_extensions():
del sys.modules[module_name]
-def application_model_storage(driver):
+def application_model_storage(api, api_params=None):
"""
Initiate model storage for the supplied storage driver
"""
-
- assert isinstance(driver, ModelDriver)
- if driver not in _model_storage:
- _model_storage[driver] = ModelStorage(
- driver, model_classes=[
- models.Node,
- models.NodeInstance,
- models.Plugin,
- models.Blueprint,
- models.Snapshot,
- models.Deployment,
- models.DeploymentUpdate,
- models.DeploymentModification,
- models.Execution,
- models.ProviderContext,
- models.Task,
- ])
- return _model_storage[driver]
+ models = [
+ storage.models.Blueprint,
+ storage.models.Deployment,
+ storage.models.Node,
+ storage.models.NodeInstance,
+ storage.models.Relationship,
+ storage.models.RelationshipInstance,
+ storage.models.Plugin,
+ storage.models.Snapshot,
+ storage.models.DeploymentUpdate,
+ storage.models.DeploymentUpdateStep,
+ storage.models.DeploymentModification,
+ storage.models.Execution,
+ storage.models.ProviderContext,
+ storage.models.Task,
+ ]
+ # if api not in _model_storage:
+ _model_storage[api] = storage.ModelStorage(api, items=models, api_params=api_params or {})
+ return _model_storage[api]
def application_resource_storage(driver):
"""
Initiate resource storage for the supplied storage driver
"""
- assert isinstance(driver, ResourceDriver)
if driver not in _resource_storage:
- _resource_storage[driver] = ResourceStorage(
+ _resource_storage[driver] = storage.ResourceStorage(
driver,
resources=[
'blueprint',
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/orchestrator/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/__init__.py b/aria/orchestrator/__init__.py
index a5aeec7..90d6442 100644
--- a/aria/orchestrator/__init__.py
+++ b/aria/orchestrator/__init__.py
@@ -12,7 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+"""
+Aria orchestrator
+"""
from .decorators import workflow, operation
from . import (
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index f2bf83b..7b65e2b 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -79,7 +79,7 @@ class BaseContext(logger.LoggerMixin):
"""
The blueprint model
"""
- return self.model.blueprint.get(self.deployment.blueprint_id)
+ return self.deployment.blueprint
@property
def deployment(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/orchestrator/context/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/exceptions.py b/aria/orchestrator/context/exceptions.py
index 6704bbc..fe762e1 100644
--- a/aria/orchestrator/context/exceptions.py
+++ b/aria/orchestrator/context/exceptions.py
@@ -12,7 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+"""
+Context based exceptions
+"""
from ..exceptions import OrchestratorError
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index bf3686d..f522111 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -84,7 +84,7 @@ class RelationshipOperationContext(BaseOperationContext):
The source node
:return:
"""
- return self.model.node.get(self.relationship.source_id)
+ return self.relationship.source_node
@property
def source_node_instance(self):
@@ -92,7 +92,7 @@ class RelationshipOperationContext(BaseOperationContext):
The source node instance
:return:
"""
- return self.model.node_instance.get(self.relationship_instance.source_id)
+ return self.relationship_instance.source_node_instance
@property
def target_node(self):
@@ -100,7 +100,7 @@ class RelationshipOperationContext(BaseOperationContext):
The target node
:return:
"""
- return self.model.node.get(self.relationship.target_id)
+ return self.relationship.target_node
@property
def target_node_instance(self):
@@ -108,7 +108,7 @@ class RelationshipOperationContext(BaseOperationContext):
The target node instance
:return:
"""
- return self.model.node_instance.get(self._actor.target_id)
+ return self.relationship_instance.target_node_instance
@property
def relationship(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/orchestrator/context/toolbelt.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/toolbelt.py b/aria/orchestrator/context/toolbelt.py
index 0aad89c..ae0e1ff 100644
--- a/aria/orchestrator/context/toolbelt.py
+++ b/aria/orchestrator/context/toolbelt.py
@@ -33,13 +33,10 @@ class NodeToolBelt(object):
:return:
"""
assert isinstance(self._op_context, operation.NodeOperationContext)
- node_instances = self._op_context.model.node_instance.iter(
- filters={'deployment_id': self._op_context.deployment.id}
- )
- for node_instance in node_instances:
- for relationship_instance in node_instance.relationship_instances:
- if relationship_instance.target_id == self._op_context.node_instance.id:
- yield node_instance
+ filters = {'target_node_instance_storage_id': self._op_context.node_instance.storage_id}
+ for relationship_instance in \
+ self._op_context.model.relationship_instance.iter(filters=filters):
+ yield relationship_instance.source_node_instance
@property
def host_ip(self):
@@ -48,7 +45,7 @@ class NodeToolBelt(object):
:return:
"""
assert isinstance(self._op_context, operation.NodeOperationContext)
- host_id = self._op_context._actor.host_id
+ host_id = self._op_context.node_instance.host_id
host_instance = self._op_context.model.node_instance.get(host_id)
return host_instance.runtime_properties.get('ip')
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index 3dc222b..8797271 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -19,6 +19,7 @@ Workflow and operation contexts
import threading
from contextlib import contextmanager
+from datetime import datetime
from aria import storage
@@ -49,13 +50,14 @@ class WorkflowContext(BaseContext):
def _create_execution(self):
execution_cls = self.model.execution.model_cls
+ now = datetime.utcnow()
execution = self.model.execution.model_cls(
id=self._execution_id,
- deployment_id=self.deployment.id,
workflow_id=self._workflow_id,
- blueprint_id=self.blueprint.id,
+ created_at=now,
status=execution_cls.PENDING,
parameters=self.parameters,
+ deployment_storage_id=self.deployment.storage_id
)
self.model.execution.store(execution)
@@ -64,19 +66,27 @@ class WorkflowContext(BaseContext):
"""
Iterator over nodes
"""
- return self.model.node.iter(filters={'blueprint_id': self.blueprint.id})
+ return self.model.node.iter(
+ filters={
+ 'deployment_storage_id': self.deployment.storage_id
+ }
+ )
@property
def node_instances(self):
"""
Iterator over node instances
"""
- return self.model.node_instance.iter(filters={'deployment_id': self.deployment.id})
+ return self.model.node_instance.iter(
+ filters={
+ 'deployment_storage_id': self.deployment.storage_id
+ }
+ )
class _CurrentContext(threading.local):
"""
- Provides thread-level context, which sugarcoats the task api.
+ Provides thread-level context, which sugarcoats the task mapi.
"""
def __init__(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/orchestrator/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py
index 75b37cf..1a48194 100644
--- a/aria/orchestrator/exceptions.py
+++ b/aria/orchestrator/exceptions.py
@@ -12,9 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+"""
+Orchestrator based exceptions
+"""
from aria.exceptions import AriaError
class OrchestratorError(AriaError):
+ """
+ Orchestrator based exception
+ """
pass
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index 4d36725..358315c 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -18,7 +18,7 @@ Provides the tasks to be entered into the task graph
"""
from uuid import uuid4
-import aria
+from aria import storage
from ... import context
from .. import exceptions
@@ -75,8 +75,8 @@ class OperationTask(BaseTask):
:param actor: the operation host on which this operation is registered.
:param inputs: operation inputs.
"""
- assert isinstance(actor, (aria.storage.models.NodeInstance,
- aria.storage.models.RelationshipInstance))
+ assert isinstance(actor, (storage.models.NodeInstance,
+ storage.models.RelationshipInstance))
super(OperationTask, self).__init__()
self.actor = actor
self.name = '{name}.{actor.id}'.format(name=name, actor=actor)
@@ -97,7 +97,7 @@ class OperationTask(BaseTask):
:param instance: the node of which this operation belongs to.
:param name: the name of the operation.
"""
- assert isinstance(instance, aria.storage.models.NodeInstance)
+ assert isinstance(instance, storage.models.NodeInstance)
operation_details = instance.node.operations[name]
operation_inputs = operation_details.get('inputs', {})
operation_inputs.update(inputs or {})
@@ -119,7 +119,7 @@ class OperationTask(BaseTask):
with 'source_operations' and 'target_operations'
:param inputs any additional inputs to the operation
"""
- assert isinstance(instance, aria.storage.models.RelationshipInstance)
+ assert isinstance(instance, storage.models.RelationshipInstance)
if operation_end not in [cls.TARGET_OPERATION, cls.SOURCE_OPERATION]:
raise exceptions.TaskException('The operation end should be {0} or {1}'.format(
cls.TARGET_OPERATION, cls.SOURCE_OPERATION
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/orchestrator/workflows/builtin/heal.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/heal.py b/aria/orchestrator/workflows/builtin/heal.py
index dbfc14e..650e664 100644
--- a/aria/orchestrator/workflows/builtin/heal.py
+++ b/aria/orchestrator/workflows/builtin/heal.py
@@ -84,16 +84,19 @@ def heal_uninstall(ctx, graph, failing_node_instances, targeted_node_instances):
# create dependencies between the node instance sub workflow
for node_instance in failing_node_instances:
node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
- for relationship_instance in reversed(node_instance.relationship_instances):
- graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id],
- node_instance_sub_workflow)
+ for relationship_instance in reversed(node_instance.relationship_instance_source):
+ graph.add_dependency(
+ node_instance_sub_workflows[relationship_instance.target_node_instance.id],
+ node_instance_sub_workflow)
# Add operations for intact nodes depending on a node instance belonging to node_instances
for node_instance in targeted_node_instances:
node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
- for relationship_instance in reversed(node_instance.relationship_instances):
- target_node_instance = ctx.model.node_instance.get(relationship_instance.target_id)
+ for relationship_instance in reversed(node_instance.relationship_instance_source):
+
+ target_node_instance = \
+ ctx.model.node_instance.get(relationship_instance.target_node_instance.id)
target_node_instance_subgraph = node_instance_sub_workflows[target_node_instance.id]
graph.add_dependency(target_node_instance_subgraph, node_instance_sub_workflow)
@@ -134,9 +137,10 @@ def heal_install(ctx, graph, failing_node_instances, targeted_node_instances):
# create dependencies between the node instance sub workflow
for node_instance in failing_node_instances:
node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
- if node_instance.relationship_instances:
- dependencies = [node_instance_sub_workflows[relationship_instance.target_id]
- for relationship_instance in node_instance.relationship_instances]
+ if node_instance.relationship_instance_source:
+ dependencies = \
+ [node_instance_sub_workflows[relationship_instance.target_node_instance.id]
+ for relationship_instance in node_instance.relationship_instance_source]
graph.add_dependency(node_instance_sub_workflow, dependencies)
# Add operations for intact nodes depending on a node instance
@@ -144,8 +148,9 @@ def heal_install(ctx, graph, failing_node_instances, targeted_node_instances):
for node_instance in targeted_node_instances:
node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
- for relationship_instance in node_instance.relationship_instances:
- target_node_instance = ctx.model.node_instance.get(relationship_instance.target_id)
+ for relationship_instance in node_instance.relationship_instance_source:
+ target_node_instance = ctx.model.node_instance.get(
+ relationship_instance.target_node_instance.id)
target_node_instance_subworkflow = node_instance_sub_workflows[target_node_instance.id]
graph.add_dependency(node_instance_sub_workflow, target_node_instance_subworkflow)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/orchestrator/workflows/builtin/install.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/install.py b/aria/orchestrator/workflows/builtin/install.py
index 0ab3ad6..634811f 100644
--- a/aria/orchestrator/workflows/builtin/install.py
+++ b/aria/orchestrator/workflows/builtin/install.py
@@ -47,7 +47,8 @@ def install(ctx, graph, node_instances=(), node_instance_sub_workflows=None):
# create dependencies between the node instance sub workflow
for node_instance in node_instances:
node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
- if node_instance.relationship_instances:
- dependencies = [node_instance_sub_workflows[relationship_instance.target_id]
- for relationship_instance in node_instance.relationship_instances]
+ if node_instance.relationship_instance_source:
+ dependencies = [
+ node_instance_sub_workflows[relationship_instance.target_node_instance.id]
+ for relationship_instance in node_instance.relationship_instance_source]
graph.add_dependency(node_instance_sub_workflow, dependencies)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/orchestrator/workflows/builtin/uninstall.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/uninstall.py b/aria/orchestrator/workflows/builtin/uninstall.py
index f4e965c..80fdc4e 100644
--- a/aria/orchestrator/workflows/builtin/uninstall.py
+++ b/aria/orchestrator/workflows/builtin/uninstall.py
@@ -47,6 +47,7 @@ def uninstall(ctx, graph, node_instances=(), node_instance_sub_workflows=None):
# create dependencies between the node instance sub workflow
for node_instance in node_instances:
node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
- for relationship_instance in reversed(node_instance.relationship_instances):
- graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id],
+ for relationship_instance in reversed(node_instance.relationship_instance_source):
+ target_id = relationship_instance.target_node_instance.id
+ graph.add_dependency(node_instance_sub_workflows[target_id],
node_instance_sub_workflow)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/orchestrator/workflows/builtin/workflows.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py
index 0eb8c34..02bfaf1 100644
--- a/aria/orchestrator/workflows/builtin/workflows.py
+++ b/aria/orchestrator/workflows/builtin/workflows.py
@@ -179,8 +179,8 @@ def relationships_tasks(graph, operation_name, node_instance):
:return:
"""
relationships_groups = groupby(
- node_instance.relationship_instances,
- key=lambda relationship_instance: relationship_instance.relationship.target_id)
+ node_instance.relationship_instance_source,
+ key=lambda relationship_instance: relationship_instance.target_node_instance.id)
sub_tasks = []
for _, (_, relationship_group) in enumerate(relationships_groups):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index a583cfc..fd00307 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -107,6 +107,15 @@ class OperationTask(BaseTask):
super(OperationTask, self).__init__(id=api_task.id, **kwargs)
self._workflow_context = api_task._workflow_context
task_model = api_task._workflow_context.model.task.model_cls
+
+ if isinstance(api_task.actor, models.NodeInstance):
+ context_class = operation_context.NodeOperationContext
+ elif isinstance(api_task.actor, models.RelationshipInstance):
+ context_class = operation_context.RelationshipOperationContext
+ else:
+ raise RuntimeError('No operation context could be created for {0}'
+ .format(api_task.actor.model_cls))
+
operation_task = task_model(
id=api_task.id,
name=api_task.name,
@@ -117,21 +126,13 @@ class OperationTask(BaseTask):
execution_id=self._workflow_context._execution_id,
max_attempts=api_task.max_attempts,
retry_interval=api_task.retry_interval,
- ignore_failure=api_task.ignore_failure
+ ignore_failure=api_task.ignore_failure,
)
-
- if isinstance(api_task.actor, models.NodeInstance):
- context_class = operation_context.NodeOperationContext
- elif isinstance(api_task.actor, models.RelationshipInstance):
- context_class = operation_context.RelationshipOperationContext
- else:
- raise RuntimeError('No operation context could be created for {0}'
- .format(api_task.actor.model_cls))
+ self._workflow_context.model.task.store(operation_task)
self._ctx = context_class(name=api_task.name,
workflow_context=self._workflow_context,
task=operation_task)
- self._workflow_context.model.task.store(operation_task)
self._task_id = operation_task.id
self._update_fields = None
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/storage/__init__.py
----------------------------------------------------------------------
diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py
index 2d142a5..6740cd0 100644
--- a/aria/storage/__init__.py
+++ b/aria/storage/__init__.py
@@ -20,14 +20,14 @@ Path: aria.storage
Storage package is a generic abstraction over different storage types.
We define this abstraction with the following components:
-1. storage: simple api to use
-2. driver: implementation of the database client api.
+1. storage: simple mapi to use
+2. driver: implementation of the database client mapi.
3. model: defines the structure of the table/document.
4. field: defines a field/item in the model.
API:
* application_storage_factory - function, default Aria storage factory.
- * Storage - class, simple storage api.
+ * Storage - class, simple storage mapi.
* models - module, default Aria standard models.
* structures - module, default Aria structures - holds the base model,
and different fields types.
@@ -37,354 +37,93 @@ API:
* drivers - module, a pool of Aria standard drivers.
* StorageDriver - class, abstract model implementation.
"""
-# todo: rewrite the above package documentation
-# (something like explaning the two types of storage - models and resources)
-from collections import namedtuple
-
-from .structures import Storage, Field, Model, IterField, PointerField
-from .drivers import (
- ModelDriver,
- ResourceDriver,
- FileSystemResourceDriver,
- FileSystemModelDriver,
+from aria.logger import LoggerMixin
+from . import (
+ models,
+ exceptions,
+ api as storage_api,
+ structures
)
-from . import models, exceptions
+
__all__ = (
'ModelStorage',
- 'ResourceStorage',
- 'FileSystemModelDriver',
'models',
'structures',
- 'Field',
- 'IterField',
- 'PointerField',
- 'Model',
- 'drivers',
- 'ModelDriver',
- 'ResourceDriver',
- 'FileSystemResourceDriver',
)
-# todo: think about package output api's...
-# todo: in all drivers name => entry_type
-# todo: change in documentation str => basestring
-class ModelStorage(Storage):
+class Storage(LoggerMixin):
"""
- Managing the models storage.
+ Represents the storage
"""
- def __init__(self, driver, model_classes=(), **kwargs):
- """
- Simple storage client api for Aria applications.
- The storage instance defines the tables/documents/code api.
-
- :param ModelDriver driver: model storage driver.
- :param model_classes: the models to register.
- """
- assert isinstance(driver, ModelDriver)
- super(ModelStorage, self).__init__(driver, model_classes, **kwargs)
-
- def __getattr__(self, table):
- """
- getattr is a shortcut to simple api
-
- for Example:
- >> storage = ModelStorage(driver=FileSystemModelDriver('/tmp'))
- >> node_table = storage.node
- >> for node in node_table:
- >> print node
-
- :param str table: table name to get
- :return: a storage object that mapped to the table name
- """
- return super(ModelStorage, self).__getattr__(table)
-
- def register(self, model_cls):
- """
- Registers the model type in the resource storage manager.
- :param model_cls: the model to register.
- """
- model_name = generate_lower_name(model_cls)
- model_api = _ModelApi(model_name, self.driver, model_cls)
- self.registered[model_name] = model_api
-
- for pointer_schema_register in model_api.pointer_mapping.values():
- model_cls = pointer_schema_register.model_cls
- self.register(model_cls)
-
-_Pointer = namedtuple('_Pointer', 'name, is_iter')
-
-
-class _ModelApi(object):
- def __init__(self, name, driver, model_cls):
- """
- Managing the model in the storage, using the driver.
-
- :param basestring name: the name of the model.
- :param ModelDriver driver: the driver which supports this model in the storage.
- :param Model model_cls: table/document class model.
- """
- assert isinstance(driver, ModelDriver)
- assert issubclass(model_cls, Model)
- self.name = name
- self.driver = driver
- self.model_cls = model_cls
- self.pointer_mapping = {}
- self._setup_pointers_mapping()
-
- def _setup_pointers_mapping(self):
- for field_name, field_cls in vars(self.model_cls).items():
- if not(isinstance(field_cls, PointerField) and field_cls.type):
- continue
- pointer_key = _Pointer(field_name, is_iter=isinstance(field_cls, IterField))
- self.pointer_mapping[pointer_key] = self.__class__(
- name=generate_lower_name(field_cls.type),
- driver=self.driver,
- model_cls=field_cls.type)
-
- def __iter__(self):
- return self.iter()
+ def __init__(self, api, items=(), api_params=None, **kwargs):
+ self._api_params = api_params or {}
+ super(Storage, self).__init__(**kwargs)
+ self.api = api
+ self.registered = {}
+ for item in items:
+ self.register(item)
+ self.logger.debug('{name} object is ready: {0!r}'.format(
+ self, name=self.__class__.__name__))
def __repr__(self):
- return '{self.name}(driver={self.driver}, model={self.model_cls})'.format(self=self)
-
- def create(self):
- """
- Creates the model in the storage.
- """
- with self.driver as connection:
- connection.create(self.name)
-
- def get(self, entry_id, **kwargs):
- """
- Getter for the model from the storage.
-
- :param basestring entry_id: the id of the table/document.
- :return: model instance
- :rtype: Model
- """
- with self.driver as connection:
- data = connection.get(
- name=self.name,
- entry_id=entry_id,
- **kwargs)
- data.update(self._get_pointers(data, **kwargs))
- return self.model_cls(**data)
+ return '{name}(api={self.api})'.format(name=self.__class__.__name__, self=self)
- def store(self, entry, **kwargs):
- """
- Setter for the model in the storage.
-
- :param Model entry: the table/document to store.
- """
- assert isinstance(entry, self.model_cls)
- with self.driver as connection:
- data = entry.fields_dict
- data.update(self._store_pointers(data, **kwargs))
- connection.store(
- name=self.name,
- entry_id=entry.id,
- entry=data,
- **kwargs)
-
- def delete(self, entry_id, **kwargs):
- """
- Delete the model from storage.
-
- :param basestring entry_id: id of the entity to delete from storage.
- """
- entry = self.get(entry_id)
- with self.driver as connection:
- self._delete_pointers(entry, **kwargs)
- connection.delete(
- name=self.name,
- entry_id=entry_id,
- **kwargs)
-
- def iter(self, **kwargs):
- """
- Generator over the entries of model in storage.
- """
- with self.driver as connection:
- for data in connection.iter(name=self.name, **kwargs):
- data.update(self._get_pointers(data, **kwargs))
- yield self.model_cls(**data)
+ def __getattr__(self, item):
+ try:
+ return self.registered[item]
+ except KeyError:
+ return super(Storage, self).__getattribute__(item)
- def update(self, entry_id, **kwargs):
+ def register(self, entry):
"""
- Updates and entry in storage.
-
- :param str entry_id: the id of the table/document.
- :param kwargs: the fields to update.
+ Register the entry to the storage
+ :param name:
:return:
"""
- with self.driver as connection:
- connection.update(
- name=self.name,
- entry_id=entry_id,
- **kwargs
- )
-
- def _get_pointers(self, data, **kwargs):
- pointers = {}
- for field, schema in self.pointer_mapping.items():
- if field.is_iter:
- pointers[field.name] = [
- schema.get(entry_id=pointer_id, **kwargs)
- for pointer_id in data[field.name]
- if pointer_id]
- elif data[field.name]:
- pointers[field.name] = schema.get(entry_id=data[field.name], **kwargs)
- return pointers
-
- def _store_pointers(self, data, **kwargs):
- pointers = {}
- for field, model_api in self.pointer_mapping.items():
- if field.is_iter:
- pointers[field.name] = []
- for iter_entity in data[field.name]:
- pointers[field.name].append(iter_entity.id)
- model_api.store(iter_entity, **kwargs)
- else:
- pointers[field.name] = data[field.name].id
- model_api.store(data[field.name], **kwargs)
- return pointers
+ raise NotImplementedError('Subclass must implement abstract register method')
- def _delete_pointers(self, entry, **kwargs):
- for field, schema in self.pointer_mapping.items():
- if field.is_iter:
- for iter_entry in getattr(entry, field.name):
- schema.delete(iter_entry.id, **kwargs)
- else:
- schema.delete(getattr(entry, field.name).id, **kwargs)
-
-class ResourceApi(object):
+class ResourceStorage(Storage):
"""
- Managing the resource in the storage, using the driver.
-
- :param basestring name: the name of the resource.
- :param ResourceDriver driver: the driver which supports this resource in the storage.
+ Represents resource storage.
"""
- def __init__(self, driver, resource_name):
- """
- Managing the resources in the storage, using the driver.
-
- :param ResourceDriver driver: the driver which supports this model in the storage.
- :param basestring resource_name: the type of the entry this resourceAPI manages.
- """
- assert isinstance(driver, ResourceDriver)
- self.driver = driver
- self.resource_name = resource_name
-
- def __repr__(self):
- return '{name}(driver={self.driver}, resource={self.resource_name})'.format(
- name=self.__class__.__name__, self=self)
-
- def create(self):
- """
- Create the resource dir in the storage.
- """
- with self.driver as connection:
- connection.create(self.resource_name)
-
- def data(self, entry_id, path=None, **kwargs):
+ def register(self, name):
"""
- Retrieve the content of a storage resource.
-
- :param basestring entry_id: the id of the entry.
- :param basestring path: path of the resource on the storage.
- :param kwargs: resources to be passed to the driver..
- :return the content of a single file:
- """
- with self.driver as connection:
- return connection.data(
- entry_type=self.resource_name,
- entry_id=entry_id,
- path=path,
- **kwargs)
-
- def download(self, entry_id, destination, path=None, **kwargs):
- """
- Download a file/dir from the resource storage.
-
- :param basestring entry_id: the id of the entry.
- :param basestring destination: the destination of the file/dir.
- :param basestring path: path of the resource on the storage.
- """
- with self.driver as connection:
- connection.download(
- entry_type=self.resource_name,
- entry_id=entry_id,
- destination=destination,
- path=path,
- **kwargs)
-
- def upload(self, entry_id, source, path=None, **kwargs):
- """
- Upload a file/dir from the resource storage.
-
- :param basestring entry_id: the id of the entry.
- :param basestring source: the source path of the file to upload.
- :param basestring path: the destination of the file, relative to the root dir
- of the resource
+ Register the resource type to resource storage.
+ :param name:
+ :return:
"""
- with self.driver as connection:
- connection.upload(
- entry_type=self.resource_name,
- entry_id=entry_id,
- source=source,
- path=path,
- **kwargs)
+ self.registered[name] = self.api(name=name, **self._api_params)
+ self.registered[name].create()
+ self.logger.debug('setup {name} in storage {self!r}'.format(name=name, self=self))
-def generate_lower_name(model_cls):
- """
- Generates the name of the class from the class object. e.g. SomeClass -> some_class
- :param model_cls: the class to evaluate.
- :return: lower name
- :rtype: basestring
- """
- return ''.join(
- character if character.islower() else '_{0}'.format(character.lower())
- for character in model_cls.__name__)[1:]
-
-
-class ResourceStorage(Storage):
+class ModelStorage(Storage):
"""
- Managing the resource storage.
+ Represents model storage.
"""
- def __init__(self, driver, resources=(), **kwargs):
- """
- Simple storage client api for Aria applications.
- The storage instance defines the tables/documents/code api.
-
- :param ResourceDriver driver: resource storage driver
- :param resources: the resources to register.
- """
- assert isinstance(driver, ResourceDriver)
- super(ResourceStorage, self).__init__(driver, resources, **kwargs)
-
- def register(self, resource):
+ def register(self, model):
"""
- Registers the resource type in the resource storage manager.
- :param resource: the resource to register.
+ Register the model into the model storage.
+ :param model: the model to register.
+ :return:
"""
- self.registered[resource] = ResourceApi(self.driver, resource_name=resource)
+ model_name = storage_api.generate_lower_name(model)
+ if model_name in self.registered:
+ self.logger.debug('{name} in already storage {self!r}'.format(name=model_name,
+ self=self))
+ return
+ self.registered[model_name] = self.api(name=model_name, model_cls=model, **self._api_params)
+ self.registered[model_name].create()
+ self.logger.debug('setup {name} in storage {self!r}'.format(name=model_name, self=self))
- def __getattr__(self, resource):
+ def drop(self):
"""
- getattr is a shortcut to simple api
-
- for Example:
- >> storage = ResourceStorage(driver=FileSystemResourceDriver('/tmp'))
- >> blueprint_resources = storage.blueprint
- >> blueprint_resources.download(blueprint_id, destination='~/blueprint/')
-
- :param str resource: resource name to download
- :return: a storage object that mapped to the resource name
- :rtype: ResourceApi
+ Drop all the tables from the model.
+ :return:
"""
- return super(ResourceStorage, self).__getattr__(resource)
+ for mapi in self.registered.values():
+ mapi.drop()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/storage/api.py
----------------------------------------------------------------------
diff --git a/aria/storage/api.py b/aria/storage/api.py
new file mode 100644
index 0000000..7bdbd5d
--- /dev/null
+++ b/aria/storage/api.py
@@ -0,0 +1,219 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+General storage API
+"""
+from contextlib import contextmanager
+
+from . import exceptions
+
+
+class StorageAPI(object):
+ """
+ General storage Base API
+ """
+ def create(self, **kwargs):
+ """
+ Create a storage API.
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract create method')
+
+ @contextmanager
+ def connect(self):
+ """
+ Established a connection and destroys it after use.
+ :return:
+ """
+ try:
+ self._establish_connection()
+ yield self
+ except BaseException as e:
+ raise exceptions.StorageError(str(e))
+ finally:
+ self._destroy_connection()
+
+ def _establish_connection(self):
+ """
+ Establish a conenction. used in the 'connect' contextmanager.
+ :return:
+ """
+ pass
+
+ def _destroy_connection(self):
+ """
+ Destroy a connection. used in the 'connect' contextmanager.
+ :return:
+ """
+ pass
+
+ def __getattr__(self, item):
+ try:
+ return self.registered[item]
+ except KeyError:
+ return super(StorageAPI, self).__getattribute__(item)
+
+
+class ModelAPI(StorageAPI):
+ """
+ A Base object for the model.
+ """
+ def __init__(self, model_cls, name=None, **kwargs):
+ """
+ Base model API
+
+ :param model_cls: the representing class of the model
+ :param str name: the name of the model
+ :param kwargs:
+ """
+ super(ModelAPI, self).__init__(**kwargs)
+ self._model_cls = model_cls
+ self._name = name or generate_lower_name(model_cls)
+
+ @property
+ def name(self):
+ """
+ The name of the class
+ :return: name of the class
+ """
+ return self._name
+
+ @property
+ def model_cls(self):
+ """
+ The class represting the model
+ :return:
+ """
+ return self._model_cls
+
+ def get(self, entry_id, filters=None, **kwargs):
+ """
+ Get entry from storage.
+
+ :param entry_id:
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract get method')
+
+ def store(self, entry, **kwargs):
+ """
+ Store entry in storage
+
+ :param entry:
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract store method')
+
+ def delete(self, entry_id, **kwargs):
+ """
+ Delete entry from storage.
+
+ :param entry_id:
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract delete method')
+
+ def __iter__(self):
+ return self.iter()
+
+ def iter(self, **kwargs):
+ """
+ Iter over the entries in storage.
+
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract iter method')
+
+ def update(self, entry, **kwargs):
+ """
+ Update entry in storage.
+
+ :param entry:
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract update method')
+
+
+class ResourceAPI(StorageAPI):
+ """
+ A Base object for the resource.
+ """
+ def __init__(self, name):
+ """
+ Base resource API
+ :param str name: the resource type
+ """
+ self._name = name
+
+ @property
+ def name(self):
+ """
+ The name of the resource
+ :return:
+ """
+ return self._name
+
+ def data(self, entry_id, path=None, **kwargs):
+ """
+ Get a bytesteam from the storagee.
+
+ :param entry_id:
+ :param path:
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract data method')
+
+ def download(self, entry_id, destination, path=None, **kwargs):
+ """
+ Download a resource from the storage.
+
+ :param entry_id:
+ :param destination:
+ :param path:
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract download method')
+
+ def upload(self, entry_id, source, path=None, **kwargs):
+ """
+ Upload a resource to the storage.
+
+ :param entry_id:
+ :param source:
+ :param path:
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract upload method')
+
+
+def generate_lower_name(model_cls):
+ """
+ Generates the name of the class from the class object. e.g. SomeClass -> some_class
+ :param model_cls: the class to evaluate.
+ :return: lower name
+ :rtype: basestring
+ """
+ return ''.join(
+ character if character.islower() else '_{0}'.format(character.lower())
+ for character in model_cls.__name__)[1:]
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/storage/drivers.py
----------------------------------------------------------------------
diff --git a/aria/storage/drivers.py b/aria/storage/drivers.py
deleted file mode 100644
index 1f96956..0000000
--- a/aria/storage/drivers.py
+++ /dev/null
@@ -1,416 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Aria's storage.drivers module
-Path: aria.storage.driver
-
-drivers module holds a generic abstract implementation of drivers.
-
-classes:
- * Driver - abstract storage driver implementation.
- * ModelDriver - abstract model base storage driver.
- * ResourceDriver - abstract resource base storage driver.
- * FileSystemModelDriver - file system implementation for model storage driver.
- * FileSystemResourceDriver - file system implementation for resource storage driver.
-"""
-
-import distutils.dir_util # pylint: disable=no-name-in-module, import-error
-import os
-import shutil
-from functools import partial
-from multiprocessing import RLock
-
-import jsonpickle
-
-from ..logger import LoggerMixin
-from .exceptions import StorageError
-
-__all__ = (
- 'ModelDriver',
- 'FileSystemModelDriver',
- 'ResourceDriver',
- 'FileSystemResourceDriver',
-)
-
-
-class Driver(LoggerMixin):
- """
- Driver: storage driver context manager - abstract driver implementation.
- In the implementation level, It is a good practice to raise StorageError on Errors.
- """
-
- def __enter__(self):
- """
- Context manager entry method, executes connect.
- :return: context manager instance
- :rtype: Driver
- """
- self.connect()
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- """
- Context manager exit method, executes disconnect.
- """
- self.disconnect()
- if not exc_type:
- return
- # self.logger.debug(
- # '{name} had an error'.format(name=self.__class__.__name__),
- # exc_info=(exc_type, exc_val, exc_tb))
- if StorageError in exc_type.mro():
- return
- raise StorageError('Exception had occurred, {type}: {message}'.format(
- type=exc_type, message=str(exc_val)))
-
- def connect(self):
- """
- Open storage connection.
- In some cases, This method can get the connection from a connection pool.
- """
- pass
-
- def disconnect(self):
- """
- Close storage connection.
- In some cases, This method can release the connection to the connection pool.
- """
- pass
-
- def create(self, name, *args, **kwargs):
- """
- Create table/document in storage by name.
- :param str name: name of table/document in storage.
- """
- pass
-
-
-class ModelDriver(Driver):
- """
- ModelDriver context manager.
- Base Driver for Model based storage.
- """
-
- def get(self, name, entry_id, **kwargs):
- """
- Getter from storage.
- :param str name: name of table/document in storage.
- :param str entry_id: id of the document to get from storage.
- :return: value of entity from the storage.
- """
- raise NotImplementedError('Subclass must implement abstract get method')
-
- def delete(self, name, entry_id, **kwargs):
- """
- Delete from storage.
- :param str name: name of table/document in storage.
- :param str entry_id: id of the entity to delete from storage.
- :param dict kwargs: extra kwargs if needed.
- """
- raise NotImplementedError('Subclass must implement abstract delete method')
-
- def store(self, name, entry_id, entry, **kwargs):
- """
- Setter to storage.
- :param str name: name of table/document in storage.
- :param str entry_id: id of the entity to store in the storage.
- :param dict entry: content to store.
- """
- raise NotImplementedError('Subclass must implement abstract store method')
-
- def iter(self, name, **kwargs):
- """
- Generator over the entries of table/document in storage.
- :param str name: name of table/document/file in storage to iter over.
- """
- raise NotImplementedError('Subclass must implement abstract iter method')
-
- def update(self, name, entry_id, **kwargs):
- """
- Updates and entry in storage.
-
- :param str name: name of table/document in storage.
- :param str entry_id: id of the document to get from storage.
- :param kwargs: the fields to update.
- :return:
- """
- raise NotImplementedError('Subclass must implement abstract store method')
-
-
-class ResourceDriver(Driver):
- """
- ResourceDriver context manager.
- Base Driver for Resource based storage.
-
- Resource storage structure is a file system base.
- <resource root directory>/<resource_name>/<entry_id>/<entry>
- entry: can be one single file or multiple files and directories.
- """
-
- def data(self, entry_type, entry_id, path=None, **kwargs):
- """
- Get the binary data from a file in a resource entry.
- If the entry is a single file no path needed,
- If the entry contain number of files the path will gide to the relevant file.
-
- resource path:
- <resource root directory>/<name>/<entry_id>/<path>
-
- :param basestring entry_type: resource name.
- :param basestring entry_id: id of the entity to resource in the storage.
- :param basestring path: path to resource relative to entry_id folder in the storage.
- :return: entry file object.
- :rtype: bytes
- """
- raise NotImplementedError('Subclass must implement abstract get method')
-
- def download(self, entry_type, entry_id, destination, path=None, **kwargs):
- """
- Download the resource to a destination.
- Like data method bat this method isn't returning data,
- Instead it create a new file in local file system.
-
- resource path:
- <resource root directory>/<name>/<entry_id>/<path>
- copy to:
- /<destination>
- destination can be file or directory
-
- :param basestring entry_type: resource name.
- :param basestring entry_id: id of the entity to resource in the storage.
- :param basestring destination: path in local file system to download to.
- :param basestring path: path to resource relative to entry_id folder in the storage.
- """
- raise NotImplementedError('Subclass must implement abstract get method')
-
- def upload(self, entry_type, entry_id, source, path=None, **kwargs):
- """
- Upload the resource from source.
- source can be file or directory with files.
-
- copy from:
- /<source>
- to resource path:
- <resource root directory>/<name>/<entry_id>/<path>
-
- :param basestring entry_type: resource name.
- :param basestring entry_id: id of the entity to resource in the storage.
- :param basestring source: source can be file or directory with files.
- :param basestring path: path to resource relative to entry_id folder in the storage.
- """
- raise NotImplementedError('Subclass must implement abstract get method')
-
-
-class BaseFileSystemDriver(Driver):
- """
- Base class which handles storage on the file system.
- """
- def __init__(self, *args, **kwargs):
- super(BaseFileSystemDriver, self).__init__(*args, **kwargs)
- self._lock = RLock()
-
- def connect(self):
- self._lock.acquire()
-
- def disconnect(self):
- self._lock.release()
-
- def __getstate__(self):
- obj_dict = super(BaseFileSystemDriver, self).__getstate__()
- del obj_dict['_lock']
- return obj_dict
-
- def __setstate__(self, obj_dict):
- super(BaseFileSystemDriver, self).__setstate__(obj_dict)
- vars(self).update(_lock=RLock(), **obj_dict)
-
-
-class FileSystemModelDriver(ModelDriver, BaseFileSystemDriver):
- """
- FileSystemModelDriver context manager.
- """
-
- def __init__(self, directory, **kwargs):
- """
- File system implementation for storage driver.
- :param str directory: root dir for storage.
- """
- super(FileSystemModelDriver, self).__init__(**kwargs)
- self.directory = directory
-
- self._join_path = partial(os.path.join, self.directory)
-
- def __repr__(self):
- return '{cls.__name__}(directory={self.directory})'.format(
- cls=self.__class__, self=self)
-
- def create(self, name):
- """
- Create directory in storage by path.
- tries to create the root directory as well.
- :param str name: path of file in storage.
- """
- try:
- os.makedirs(self.directory)
- except (OSError, IOError):
- pass
- os.makedirs(self._join_path(name))
-
- def get(self, name, entry_id, **kwargs):
- """
- Getter from storage.
- :param str name: name of directory in storage.
- :param str entry_id: id of the file to get from storage.
- :return: value of file from storage.
- :rtype: dict
- """
- with open(self._join_path(name, entry_id)) as file_obj:
- return jsonpickle.loads(file_obj.read())
-
- def store(self, name, entry_id, entry, **kwargs):
- """
- Delete from storage.
- :param str name: name of directory in storage.
- :param str entry_id: id of the file to delete from storage.
- """
- with open(self._join_path(name, entry_id), 'w') as file_obj:
- file_obj.write(jsonpickle.dumps(entry))
-
- def delete(self, name, entry_id, **kwargs):
- """
- Delete from storage.
- :param str name: name of directory in storage.
- :param str entry_id: id of the file to delete from storage.
- """
- os.remove(self._join_path(name, entry_id))
-
- def iter(self, name, filters=None, **kwargs):
- """
- Generator over the entries of directory in storage.
- :param str name: name of directory in storage to iter over.
- :param dict filters: filters for query
- """
- filters = filters or {}
-
- for entry_id in os.listdir(self._join_path(name)):
- value = self.get(name, entry_id=entry_id)
- for filter_name, filter_value in filters.items():
- if value.get(filter_name) != filter_value:
- break
- else:
- yield value
-
- def update(self, name, entry_id, **kwargs):
- """
- Updates and entry in storage.
-
- :param str name: name of table/document in storage.
- :param str entry_id: id of the document to get from storage.
- :param kwargs: the fields to update.
- :return:
- """
- entry_dict = self.get(name, entry_id)
- entry_dict.update(**kwargs)
- self.store(name, entry_id, entry_dict)
-
-
-class FileSystemResourceDriver(ResourceDriver, BaseFileSystemDriver):
- """
- FileSystemResourceDriver context manager.
- """
-
- def __init__(self, directory, **kwargs):
- """
- File system implementation for storage driver.
- :param str directory: root dir for storage.
- """
- super(FileSystemResourceDriver, self).__init__(**kwargs)
- self.directory = directory
- self._join_path = partial(os.path.join, self.directory)
-
- def __repr__(self):
- return '{cls.__name__}(directory={self.directory})'.format(
- cls=self.__class__, self=self)
-
- def create(self, name):
- """
- Create directory in storage by path.
- tries to create the root directory as well.
- :param basestring name: path of file in storage.
- """
- try:
- os.makedirs(self.directory)
- except (OSError, IOError):
- pass
- os.makedirs(self._join_path(name))
-
- def data(self, entry_type, entry_id, path=None):
- """
- Retrieve the content of a file system storage resource.
-
- :param basestring entry_type: the type of the entry.
- :param basestring entry_id: the id of the entry.
- :param basestring path: a path to a specific resource.
- :return: the content of the file
- :rtype: bytes
- """
- resource_relative_path = os.path.join(entry_type, entry_id, path or '')
- resource = os.path.join(self.directory, resource_relative_path)
- if not os.path.exists(resource):
- raise StorageError("Resource {0} does not exist".format(resource_relative_path))
- if not os.path.isfile(resource):
- resources = os.listdir(resource)
- if len(resources) != 1:
- raise StorageError('No resource in path: {0}'.format(resource))
- resource = os.path.join(resource, resources[0])
- with open(resource, 'rb') as resource_file:
- return resource_file.read()
-
- def download(self, entry_type, entry_id, destination, path=None):
- """
- Download a specific file or dir from the file system resource storage.
-
- :param basestring entry_type: the name of the entry.
- :param basestring entry_id: the id of the entry
- :param basestring destination: the destination of the files.
- :param basestring path: a path on the remote machine relative to the root of the entry.
- """
- resource_relative_path = os.path.join(entry_type, entry_id, path or '')
- resource = os.path.join(self.directory, resource_relative_path)
- if not os.path.exists(resource):
- raise StorageError("Resource {0} does not exist".format(resource_relative_path))
- if os.path.isfile(resource):
- shutil.copy2(resource, destination)
- else:
- distutils.dir_util.copy_tree(resource, destination) # pylint: disable=no-member
-
- def upload(self, entry_type, entry_id, source, path=None):
- """
- Uploads a specific file or dir to the file system resource storage.
-
- :param basestring entry_type: the name of the entry.
- :param basestring entry_id: the id of the entry
- :param source: the source of the files to upload.
- :param path: the destination of the file/s relative to the entry root dir.
- """
- resource_directory = os.path.join(self.directory, entry_type, entry_id)
- if not os.path.exists(resource_directory):
- os.makedirs(resource_directory)
- destination = os.path.join(resource_directory, path or '')
- if os.path.isfile(source):
- shutil.copy2(source, destination)
- else:
- distutils.dir_util.copy_tree(source, destination) # pylint: disable=no-member
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/storage/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/storage/exceptions.py b/aria/storage/exceptions.py
index 22dfc50..f982f63 100644
--- a/aria/storage/exceptions.py
+++ b/aria/storage/exceptions.py
@@ -12,7 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+"""
+Storage based exceptions
+"""
from .. import exceptions
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/storage/filesystem_api.py
----------------------------------------------------------------------
diff --git a/aria/storage/filesystem_api.py b/aria/storage/filesystem_api.py
new file mode 100644
index 0000000..f28d1f6
--- /dev/null
+++ b/aria/storage/filesystem_api.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Filesystem based API Base
+"""
+from multiprocessing import RLock
+
+from . import api
+
+
+class BaseFileSystemAPI(api.StorageAPI):
+ """
+ Base class which handles storage on the file system.
+ """
+
+ def create(self, **kwargs):
+ super(BaseFileSystemAPI, self).create(**kwargs)
+
+ def __init__(self, *args, **kwargs):
+ super(BaseFileSystemAPI, self).__init__(*args, **kwargs)
+ self._lock = RLock()
+
+ def _establish_connection(self):
+ self._lock.acquire()
+
+ def _destroy_connection(self):
+ self._lock.release()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/storage/mapi/__init__.py
----------------------------------------------------------------------
diff --git a/aria/storage/mapi/__init__.py b/aria/storage/mapi/__init__.py
new file mode 100644
index 0000000..d4a8c6e
--- /dev/null
+++ b/aria/storage/mapi/__init__.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+A collection of MAPIs
+"""
+from .filesystem import FileSystemModelAPI
+from .inmemory import InMemoryModelAPI
+from .sql import SQLAlchemyModelAPI
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/storage/mapi/filesystem.py
----------------------------------------------------------------------
diff --git a/aria/storage/mapi/filesystem.py b/aria/storage/mapi/filesystem.py
new file mode 100644
index 0000000..fa24869
--- /dev/null
+++ b/aria/storage/mapi/filesystem.py
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+SQLalchemy based MAPI
+"""
+import os
+from functools import partial
+
+import jsonpickle
+
+from .. import (
+ api,
+ filesystem_api
+)
+
+
+class FileSystemModelAPI(api.ModelAPI, filesystem_api.BaseFileSystemAPI):
+ """
+ File system model storage.
+ """
+
+ def __init__(self, directory, **kwargs):
+ """
+ File system implementation for storage api.
+ :param str directory: root dir for storage.
+ """
+ super(FileSystemModelAPI, self).__init__(**kwargs)
+ self.directory = directory
+ self.base_path = os.path.join(self.directory, self.name)
+ self._join_path = partial(os.path.join, self.base_path)
+
+ def __repr__(self):
+ return '{cls.__name__}(directory={self.directory})'.format(
+ cls=self.__class__, self=self)
+
+ def create(self, **kwargs):
+ """
+ Create directory in storage by path.
+ tries to create the root directory as well.
+ :param str name: path of file in storage.
+ """
+ with self.connect():
+ try:
+ os.makedirs(self.directory)
+ except (OSError, IOError):
+ pass
+ os.makedirs(self.base_path)
+
+ def get(self, entry_id, **kwargs):
+ """
+ Getter from storage.
+ :param str entry_id: id of the file to get from storage.
+ :return: value of file from storage.
+ :rtype: dict
+ """
+ with self.connect():
+ with open(self._join_path(entry_id)) as file_obj:
+ return jsonpickle.loads(file_obj.read())
+
+ def store(self, entry, **kwargs):
+ """
+ Delete from storage.
+ :param Model entry: name of directory in storage.
+ """
+ with self.connect():
+ with open(self._join_path(entry.id), 'w') as file_obj:
+ file_obj.write(jsonpickle.dumps(entry))
+
+ def delete(self, entry_id, **kwargs):
+ """
+ Delete from storage.
+ :param str name: name of directory in storage.
+ :param str entry_id: id of the file to delete from storage.
+ """
+ with self.connect():
+ os.remove(self._join_path(entry_id))
+
+ def iter(self, filters=None, **kwargs):
+ """
+ Generator over the entries of directory in storage.
+ :param dict filters: filters for query
+ """
+ filters = filters or {}
+ with self.connect():
+ for entry_id in os.listdir(self.base_path):
+ value = self.get(entry_id=entry_id)
+ for filter_name, filter_value in filters.items():
+ if value.get(filter_name) != filter_value:
+ break
+ else:
+ yield value
+
+ def update(self, entry_id, **kwargs):
+ """
+ Updates and entry in storage.
+
+ :param str name: name of table/document in storage.
+ :param str entry_id: id of the document to get from storage.
+ :param kwargs: the fields to update.
+ :return:
+ """
+ with self.connect():
+ entry = self.get(entry_id)
+ for key, value in kwargs.items():
+ setattr(entry, key, value)
+ self.store(entry)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fe944e58/aria/storage/mapi/inmemory.py
----------------------------------------------------------------------
diff --git a/aria/storage/mapi/inmemory.py b/aria/storage/mapi/inmemory.py
new file mode 100644
index 0000000..09dbcfc
--- /dev/null
+++ b/aria/storage/mapi/inmemory.py
@@ -0,0 +1,148 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# DEPRECATED
+#pylint: skip-file
+
+from collections import namedtuple
+
+
+from .. import api
+from ..structures import orm
+
+
+_Pointer = namedtuple('_Pointer', 'name, is_iter')
+
+storage = {}
+
+
+class InMemoryModelAPI(api.ModelAPI):
+ def __init__(self, *args, **kwargs):
+ """
+ Managing the model in the storage, using the driver.
+
+ :param basestring name: the name of the model.
+ :param ModelDriver driver: the driver which supports this model in the storage.
+ :param Model model_cls: table/document class model.
+ """
+ super(InMemoryModelAPI, self).__init__(*args, **kwargs)
+ self.pointer_mapping = {}
+
+ def create(self):
+ """
+ Creates the model in the storage.
+ """
+ with self.connect():
+ storage[self.name] = {}
+ self._setup_pointers_mapping()
+
+ def _setup_pointers_mapping(self):
+ for field_name, field_cls in vars(self.model_cls).items():
+ if not (getattr(field_cls, 'impl', None) is not None and
+ isinstance(field_cls.impl.parent_token, orm.RelationshipProperty)):
+ continue
+ pointer_key = _Pointer(field_name, is_iter=False)
+ self.pointer_mapping[pointer_key] = self.__class__(
+ name=api.generate_lower_name(field_cls.class_),
+ model_cls=field_cls.class_)
+
+ def get(self, entry_id, **kwargs):
+ """
+ Getter for the model from the storage.
+
+ :param basestring entry_id: the id of the table/document.
+ :return: model instance
+ :rtype: Model
+ """
+ with self.connect():
+ data = storage[self.name][entry_id]
+ data.update(self._get_pointers(data, **kwargs))
+ return self.model_cls(**data)
+
+ def store(self, entry, **kwargs):
+ """
+ Setter for the model in the storage.
+
+ :param Model entry: the table/document to store.
+ """
+ with self.connect():
+ data = entry.to_dict
+ data.update(self._store_pointers(data, **kwargs))
+ storage[self.name][entry.id] = data
+
+ def delete(self, entry_id, **kwargs):
+ """
+ Delete the model from storage.
+
+ :param basestring entry_id: id of the entity to delete from storage.
+ """
+ entry = self.get(entry_id)
+ with self.connect():
+ self._delete_pointers(entry, **kwargs)
+ storage[self.name].pop(entry_id)
+
+ def iter(self, **kwargs):
+ """
+ Generator over the entries of model in storage.
+ """
+ with self.connect():
+ for data in storage[self.name].values():
+ data.update(self._get_pointers(data, **kwargs))
+ yield self.model_cls(**data)
+
+ def update(self, entry_id, **kwargs):
+ """
+ Updates and entry in storage.
+
+ :param str entry_id: the id of the table/document.
+ :param kwargs: the fields to update.
+ :return:
+ """
+ with self.connect():
+ storage[self.name][entry_id].update(**kwargs)
+
+ def _get_pointers(self, data, **kwargs):
+ pointers = {}
+ for field, schema in self.pointer_mapping.items():
+ if field.is_iter:
+ pointers[field.name] = [
+ schema.get(entry_id=pointer_id, **kwargs)
+ for pointer_id in data[field.name]
+ if pointer_id]
+ elif data[field.name]:
+ pointers[field.name] = schema.get(entry_id=data[field.name], **kwargs)
+ return pointers
+
+ def _store_pointers(self, data, **kwargs):
+ pointers = {}
+ for field, model_api in self.pointer_mapping.items():
+ if field.is_iter:
+ pointers[field.name] = []
+ for iter_entity in data[field.name]:
+ pointers[field.name].append(iter_entity.id)
+ model_api.store(iter_entity, **kwargs)
+ else:
+ pointers[field.name] = data[field.name].id
+ model_api.store(data[field.name], **kwargs)
+ return pointers
+
+ def _delete_pointers(self, entry, **kwargs):
+ for field, schema in self.pointer_mapping.items():
+ if field.is_iter:
+ for iter_entry in getattr(entry, field.name):
+ schema.delete(iter_entry.id, **kwargs)
+ else:
+ schema.delete(getattr(entry, field.name).id, **kwargs)