You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by da...@apache.org on 2016/12/11 11:40:55 UTC
[6/7] incubator-ariatosca git commit: ARIA-30 SQL based storage
implementation
ARIA-30 SQL based storage implementation
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/c6c92ae5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/c6c92ae5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/c6c92ae5
Branch: refs/heads/ARIA-26-plugin-mechanism
Commit: c6c92ae5b25b49e62b06f8867f6a7b0046f04428
Parents: fe974e4
Author: mxmrlv <mx...@gmail.com>
Authored: Sun Nov 27 13:20:46 2016 +0200
Committer: mxmrlv <mx...@gmail.com>
Committed: Thu Dec 8 11:59:24 2016 +0200
----------------------------------------------------------------------
aria/__init__.py | 43 +-
aria/orchestrator/__init__.py | 4 +-
aria/orchestrator/context/common.py | 29 +-
aria/orchestrator/context/exceptions.py | 4 +-
aria/orchestrator/context/operation.py | 27 +-
aria/orchestrator/context/toolbelt.py | 20 +-
aria/orchestrator/context/workflow.py | 51 +-
aria/orchestrator/exceptions.py | 7 +-
aria/orchestrator/workflows/api/task.py | 10 +-
aria/orchestrator/workflows/builtin/heal.py | 25 +-
aria/orchestrator/workflows/builtin/install.py | 7 +-
.../orchestrator/workflows/builtin/uninstall.py | 7 +-
.../orchestrator/workflows/builtin/workflows.py | 13 +-
aria/orchestrator/workflows/core/engine.py | 6 +-
aria/orchestrator/workflows/core/task.py | 38 +-
aria/storage/__init__.py | 372 +-----
aria/storage/api.py | 182 +++
aria/storage/core.py | 125 ++
aria/storage/drivers.py | 416 -------
aria/storage/exceptions.py | 4 +-
aria/storage/filesystem_rapi.py | 150 +++
aria/storage/models.py | 702 ++++++-----
aria/storage/sql_mapi.py | 382 ++++++
aria/storage/structures.py | 399 +++---
aria/utils/application.py | 14 +-
requirements.txt | 1 +
tests/mock/context.py | 50 +-
tests/mock/models.py | 102 +-
tests/orchestrator/context/test_operation.py | 80 +-
tests/orchestrator/context/test_toolbelt.py | 92 +-
tests/orchestrator/context/test_workflow.py | 37 +-
tests/orchestrator/workflows/api/test_task.py | 76 +-
.../orchestrator/workflows/builtin/__init__.py | 35 +-
.../workflows/builtin/test_execute_operation.py | 17 +-
.../orchestrator/workflows/builtin/test_heal.py | 23 +-
.../workflows/builtin/test_install.py | 16 +-
.../workflows/builtin/test_uninstall.py | 13 +-
.../orchestrator/workflows/core/test_engine.py | 47 +-
tests/orchestrator/workflows/core/test_task.py | 37 +-
.../test_task_graph_into_exececution_graph.py | 15 +-
tests/requirements.txt | 2 +-
tests/storage/__init__.py | 75 +-
tests/storage/test_drivers.py | 135 ---
tests/storage/test_field.py | 124 --
tests/storage/test_model_storage.py | 134 +-
tests/storage/test_models.py | 1143 +++++++++++++-----
tests/storage/test_models_api.py | 70 --
tests/storage/test_resource_storage.py | 62 +-
48 files changed, 2854 insertions(+), 2569 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/__init__.py
----------------------------------------------------------------------
diff --git a/aria/__init__.py b/aria/__init__.py
index 3f81f98..b000397 100644
--- a/aria/__init__.py
+++ b/aria/__init__.py
@@ -23,7 +23,6 @@ import pkgutil
from .VERSION import version as __version__
from .orchestrator.decorators import workflow, operation
-from .storage import ModelStorage, ResourceStorage, models, ModelDriver, ResourceDriver
from . import (
utils,
parser,
@@ -37,7 +36,6 @@ __all__ = (
'operation',
)
-_model_storage = {}
_resource_storage = {}
@@ -58,37 +56,38 @@ def install_aria_extensions():
del sys.modules[module_name]
-def application_model_storage(driver):
+def application_model_storage(api, api_kwargs=None):
"""
Initiate model storage for the supplied storage driver
"""
+ models = [
+ storage.models.Plugin,
+ storage.models.ProviderContext,
- assert isinstance(driver, ModelDriver)
- if driver not in _model_storage:
- _model_storage[driver] = ModelStorage(
- driver, model_classes=[
- models.Node,
- models.NodeInstance,
- models.Plugin,
- models.Blueprint,
- models.Snapshot,
- models.Deployment,
- models.DeploymentUpdate,
- models.DeploymentModification,
- models.Execution,
- models.ProviderContext,
- models.Task,
- ])
- return _model_storage[driver]
+ storage.models.Blueprint,
+ storage.models.Deployment,
+ storage.models.DeploymentUpdate,
+ storage.models.DeploymentUpdateStep,
+ storage.models.DeploymentModification,
+
+ storage.models.Node,
+ storage.models.NodeInstance,
+ storage.models.Relationship,
+ storage.models.RelationshipInstance,
+
+ storage.models.Execution,
+ storage.models.Task,
+ ]
+ # if api not in _model_storage:
+ return storage.ModelStorage(api, items=models, api_kwargs=api_kwargs or {})
def application_resource_storage(driver):
"""
Initiate resource storage for the supplied storage driver
"""
- assert isinstance(driver, ResourceDriver)
if driver not in _resource_storage:
- _resource_storage[driver] = ResourceStorage(
+ _resource_storage[driver] = storage.ResourceStorage(
driver,
resources=[
'blueprint',
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/__init__.py b/aria/orchestrator/__init__.py
index a5aeec7..90d6442 100644
--- a/aria/orchestrator/__init__.py
+++ b/aria/orchestrator/__init__.py
@@ -12,7 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+"""
+Aria orchestrator
+"""
from .decorators import workflow, operation
from . import (
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index f2bf83b..14efd9d 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -32,8 +32,7 @@ class BaseContext(logger.LoggerMixin):
model_storage,
resource_storage,
deployment_id,
- workflow_id,
- execution_id=None,
+ workflow_name,
task_max_attempts=1,
task_retry_interval=0,
task_ignore_failure=False,
@@ -44,8 +43,7 @@ class BaseContext(logger.LoggerMixin):
self._model = model_storage
self._resource = resource_storage
self._deployment_id = deployment_id
- self._workflow_id = workflow_id
- self._execution_id = execution_id or str(uuid4())
+ self._workflow_name = workflow_name
self._task_max_attempts = task_max_attempts
self._task_retry_interval = task_retry_interval
self._task_ignore_failure = task_ignore_failure
@@ -54,8 +52,7 @@ class BaseContext(logger.LoggerMixin):
return (
'{name}(name={self.name}, '
'deployment_id={self._deployment_id}, '
- 'workflow_id={self._workflow_id}, '
- 'execution_id={self._execution_id})'
+ 'workflow_name={self._workflow_name}, '
.format(name=self.__class__.__name__, self=self))
@property
@@ -79,7 +76,7 @@ class BaseContext(logger.LoggerMixin):
"""
The blueprint model
"""
- return self.model.blueprint.get(self.deployment.blueprint_id)
+ return self.deployment.blueprint
@property
def deployment(self):
@@ -89,20 +86,6 @@ class BaseContext(logger.LoggerMixin):
return self.model.deployment.get(self._deployment_id)
@property
- def execution(self):
- """
- The execution model
- """
- return self.model.execution.get(self._execution_id)
-
- @execution.setter
- def execution(self, value):
- """
- Store the execution in the model storage
- """
- self.model.execution.store(value)
-
- @property
def name(self):
"""
The operation name
@@ -136,6 +119,6 @@ class BaseContext(logger.LoggerMixin):
Read a deployment resource as string from the resource storage
"""
try:
- return self.resource.deployment.data(entry_id=self.deployment.id, path=path)
+ return self.resource.deployment.read(entry_id=self.deployment.id, path=path)
except exceptions.StorageError:
- return self.resource.blueprint.data(entry_id=self.blueprint.id, path=path)
+ return self.resource.blueprint.read(entry_id=self.blueprint.id, path=path)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/context/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/exceptions.py b/aria/orchestrator/context/exceptions.py
index 6704bbc..fe762e1 100644
--- a/aria/orchestrator/context/exceptions.py
+++ b/aria/orchestrator/context/exceptions.py
@@ -12,7 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+"""
+Context based exceptions
+"""
from ..exceptions import OrchestratorError
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index bf3686d..a73bad1 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -26,17 +26,17 @@ class BaseOperationContext(BaseContext):
Context object used during operation creation and execution
"""
- def __init__(self, name, workflow_context, task, **kwargs):
+ def __init__(self, name, workflow_context, task, actor, **kwargs):
super(BaseOperationContext, self).__init__(
name=name,
model_storage=workflow_context.model,
resource_storage=workflow_context.resource,
deployment_id=workflow_context._deployment_id,
- workflow_id=workflow_context._workflow_id,
- execution_id=workflow_context._execution_id,
+ workflow_name=workflow_context._workflow_name,
**kwargs)
self._task_model = task
- self._actor = self.task.actor
+ self._task_id = task.id
+ self._actor_id = actor.id
def __repr__(self):
details = 'operation_mapping={task.operation_mapping}; ' \
@@ -50,7 +50,7 @@ class BaseOperationContext(BaseContext):
The task in the model storage
:return: Task model
"""
- return self._task_model
+ return self.model.task.get(self._task_id)
class NodeOperationContext(BaseOperationContext):
@@ -63,7 +63,7 @@ class NodeOperationContext(BaseOperationContext):
the node of the current operation
:return:
"""
- return self._actor.node
+ return self.node_instance.node
@property
def node_instance(self):
@@ -71,7 +71,7 @@ class NodeOperationContext(BaseOperationContext):
The node instance of the current operation
:return:
"""
- return self._actor
+ return self.model.node_instance.get(self._actor_id)
class RelationshipOperationContext(BaseOperationContext):
@@ -84,7 +84,7 @@ class RelationshipOperationContext(BaseOperationContext):
The source node
:return:
"""
- return self.model.node.get(self.relationship.source_id)
+ return self.relationship.source_node
@property
def source_node_instance(self):
@@ -92,7 +92,7 @@ class RelationshipOperationContext(BaseOperationContext):
The source node instance
:return:
"""
- return self.model.node_instance.get(self.relationship_instance.source_id)
+ return self.relationship_instance.source_node_instance
@property
def target_node(self):
@@ -100,7 +100,7 @@ class RelationshipOperationContext(BaseOperationContext):
The target node
:return:
"""
- return self.model.node.get(self.relationship.target_id)
+ return self.relationship.target_node
@property
def target_node_instance(self):
@@ -108,7 +108,7 @@ class RelationshipOperationContext(BaseOperationContext):
The target node instance
:return:
"""
- return self.model.node_instance.get(self._actor.target_id)
+ return self.relationship_instance.target_node_instance
@property
def relationship(self):
@@ -116,7 +116,8 @@ class RelationshipOperationContext(BaseOperationContext):
The relationship of the current operation
:return:
"""
- return self._actor.relationship
+
+ return self.relationship_instance.relationship
@property
def relationship_instance(self):
@@ -124,4 +125,4 @@ class RelationshipOperationContext(BaseOperationContext):
The relationship instance of the current operation
:return:
"""
- return self._actor
+ return self.model.relationship_instance.get(self._actor_id)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/context/toolbelt.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/toolbelt.py b/aria/orchestrator/context/toolbelt.py
index 0aad89c..301b013 100644
--- a/aria/orchestrator/context/toolbelt.py
+++ b/aria/orchestrator/context/toolbelt.py
@@ -27,30 +27,14 @@ class NodeToolBelt(object):
self._op_context = operation_context
@property
- def dependent_node_instances(self):
- """
- Any node instance which has a relationship to the current node instance.
- :return:
- """
- assert isinstance(self._op_context, operation.NodeOperationContext)
- node_instances = self._op_context.model.node_instance.iter(
- filters={'deployment_id': self._op_context.deployment.id}
- )
- for node_instance in node_instances:
- for relationship_instance in node_instance.relationship_instances:
- if relationship_instance.target_id == self._op_context.node_instance.id:
- yield node_instance
-
- @property
def host_ip(self):
"""
The host ip of the current node
:return:
"""
assert isinstance(self._op_context, operation.NodeOperationContext)
- host_id = self._op_context._actor.host_id
- host_instance = self._op_context.model.node_instance.get(host_id)
- return host_instance.runtime_properties.get('ip')
+ host = self._op_context.node_instance.host
+ return host.runtime_properties.get('ip')
class RelationshipToolBelt(object):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index 3dc222b..e2e8e25 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -19,8 +19,7 @@ Workflow and operation contexts
import threading
from contextlib import contextmanager
-
-from aria import storage
+from datetime import datetime
from .exceptions import ContextException
from .common import BaseContext
@@ -30,53 +29,73 @@ class WorkflowContext(BaseContext):
"""
Context object used during workflow creation and execution
"""
- def __init__(self, parameters=None, *args, **kwargs):
+ def __init__(self, parameters=None, execution_id=None, *args, **kwargs):
super(WorkflowContext, self).__init__(*args, **kwargs)
self.parameters = parameters or {}
# TODO: execution creation should happen somewhere else
# should be moved there, when such logical place exists
- try:
- self.model.execution.get(self._execution_id)
- except storage.exceptions.StorageError:
- self._create_execution()
+ self._execution_id = self._create_execution() if execution_id is None else execution_id
def __repr__(self):
return (
'{name}(deployment_id={self._deployment_id}, '
- 'workflow_id={self._workflow_id}, '
- 'execution_id={self._execution_id})'.format(
+ 'workflow_name={self._workflow_name}'.format(
name=self.__class__.__name__, self=self))
def _create_execution(self):
execution_cls = self.model.execution.model_cls
+ now = datetime.utcnow()
execution = self.model.execution.model_cls(
- id=self._execution_id,
- deployment_id=self.deployment.id,
- workflow_id=self._workflow_id,
blueprint_id=self.blueprint.id,
+ deployment_id=self.deployment.id,
+ workflow_name=self._workflow_name,
+ created_at=now,
status=execution_cls.PENDING,
parameters=self.parameters,
)
- self.model.execution.store(execution)
+ self.model.execution.put(execution)
+ return execution.id
+
+ @property
+ def execution(self):
+ """
+ The execution model
+ """
+ return self.model.execution.get(self._execution_id)
+
+ @execution.setter
+ def execution(self, value):
+ """
+ Store the execution in the model storage
+ """
+ self.model.execution.put(value)
@property
def nodes(self):
"""
Iterator over nodes
"""
- return self.model.node.iter(filters={'blueprint_id': self.blueprint.id})
+ return self.model.node.iter(
+ filters={
+ 'deployment_id': self.deployment.id
+ }
+ )
@property
def node_instances(self):
"""
Iterator over node instances
"""
- return self.model.node_instance.iter(filters={'deployment_id': self.deployment.id})
+ return self.model.node_instance.iter(
+ filters={
+ 'deployment_id': self.deployment.id
+ }
+ )
class _CurrentContext(threading.local):
"""
- Provides thread-level context, which sugarcoats the task api.
+ Provides thread-level context, which sugarcoats the task mapi.
"""
def __init__(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py
index 75b37cf..1a48194 100644
--- a/aria/orchestrator/exceptions.py
+++ b/aria/orchestrator/exceptions.py
@@ -12,9 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+"""
+Orchestrator based exceptions
+"""
from aria.exceptions import AriaError
class OrchestratorError(AriaError):
+ """
+ Orchestrator based exception
+ """
pass
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index 4d36725..1c12407 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -18,7 +18,7 @@ Provides the tasks to be entered into the task graph
"""
from uuid import uuid4
-import aria
+from aria.storage import models
from ... import context
from .. import exceptions
@@ -75,8 +75,8 @@ class OperationTask(BaseTask):
:param actor: the operation host on which this operation is registered.
:param inputs: operation inputs.
"""
- assert isinstance(actor, (aria.storage.models.NodeInstance,
- aria.storage.models.RelationshipInstance))
+ assert isinstance(actor, (models.NodeInstance,
+ models.RelationshipInstance))
super(OperationTask, self).__init__()
self.actor = actor
self.name = '{name}.{actor.id}'.format(name=name, actor=actor)
@@ -97,7 +97,7 @@ class OperationTask(BaseTask):
:param instance: the node of which this operation belongs to.
:param name: the name of the operation.
"""
- assert isinstance(instance, aria.storage.models.NodeInstance)
+ assert isinstance(instance, models.NodeInstance)
operation_details = instance.node.operations[name]
operation_inputs = operation_details.get('inputs', {})
operation_inputs.update(inputs or {})
@@ -119,7 +119,7 @@ class OperationTask(BaseTask):
with 'source_operations' and 'target_operations'
:param inputs any additional inputs to the operation
"""
- assert isinstance(instance, aria.storage.models.RelationshipInstance)
+ assert isinstance(instance, models.RelationshipInstance)
if operation_end not in [cls.TARGET_OPERATION, cls.SOURCE_OPERATION]:
raise exceptions.TaskException('The operation end should be {0} or {1}'.format(
cls.TARGET_OPERATION, cls.SOURCE_OPERATION
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/workflows/builtin/heal.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/heal.py b/aria/orchestrator/workflows/builtin/heal.py
index dbfc14e..de07095 100644
--- a/aria/orchestrator/workflows/builtin/heal.py
+++ b/aria/orchestrator/workflows/builtin/heal.py
@@ -84,16 +84,19 @@ def heal_uninstall(ctx, graph, failing_node_instances, targeted_node_instances):
# create dependencies between the node instance sub workflow
for node_instance in failing_node_instances:
node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
- for relationship_instance in reversed(node_instance.relationship_instances):
- graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id],
- node_instance_sub_workflow)
+ for relationship_instance in reversed(node_instance.outbound_relationship_instances):
+ graph.add_dependency(
+ node_instance_sub_workflows[relationship_instance.target_node_instance.id],
+ node_instance_sub_workflow)
# Add operations for intact nodes depending on a node instance belonging to node_instances
for node_instance in targeted_node_instances:
node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
- for relationship_instance in reversed(node_instance.relationship_instances):
- target_node_instance = ctx.model.node_instance.get(relationship_instance.target_id)
+ for relationship_instance in reversed(node_instance.outbound_relationship_instances):
+
+ target_node_instance = \
+ ctx.model.node_instance.get(relationship_instance.target_node_instance.id)
target_node_instance_subgraph = node_instance_sub_workflows[target_node_instance.id]
graph.add_dependency(target_node_instance_subgraph, node_instance_sub_workflow)
@@ -134,9 +137,10 @@ def heal_install(ctx, graph, failing_node_instances, targeted_node_instances):
# create dependencies between the node instance sub workflow
for node_instance in failing_node_instances:
node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
- if node_instance.relationship_instances:
- dependencies = [node_instance_sub_workflows[relationship_instance.target_id]
- for relationship_instance in node_instance.relationship_instances]
+ if node_instance.outbound_relationship_instances:
+ dependencies = \
+ [node_instance_sub_workflows[relationship_instance.target_node_instance.id]
+ for relationship_instance in node_instance.outbound_relationship_instances]
graph.add_dependency(node_instance_sub_workflow, dependencies)
# Add operations for intact nodes depending on a node instance
@@ -144,8 +148,9 @@ def heal_install(ctx, graph, failing_node_instances, targeted_node_instances):
for node_instance in targeted_node_instances:
node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
- for relationship_instance in node_instance.relationship_instances:
- target_node_instance = ctx.model.node_instance.get(relationship_instance.target_id)
+ for relationship_instance in node_instance.outbound_relationship_instances:
+ target_node_instance = ctx.model.node_instance.get(
+ relationship_instance.target_node_instance.id)
target_node_instance_subworkflow = node_instance_sub_workflows[target_node_instance.id]
graph.add_dependency(node_instance_sub_workflow, target_node_instance_subworkflow)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/workflows/builtin/install.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/install.py b/aria/orchestrator/workflows/builtin/install.py
index 0ab3ad6..eb5b4e8 100644
--- a/aria/orchestrator/workflows/builtin/install.py
+++ b/aria/orchestrator/workflows/builtin/install.py
@@ -47,7 +47,8 @@ def install(ctx, graph, node_instances=(), node_instance_sub_workflows=None):
# create dependencies between the node instance sub workflow
for node_instance in node_instances:
node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
- if node_instance.relationship_instances:
- dependencies = [node_instance_sub_workflows[relationship_instance.target_id]
- for relationship_instance in node_instance.relationship_instances]
+ if node_instance.outbound_relationship_instances:
+ dependencies = [
+ node_instance_sub_workflows[relationship_instance.target_node_instance.id]
+ for relationship_instance in node_instance.outbound_relationship_instances]
graph.add_dependency(node_instance_sub_workflow, dependencies)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/workflows/builtin/uninstall.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/uninstall.py b/aria/orchestrator/workflows/builtin/uninstall.py
index f4e965c..db1c0cc 100644
--- a/aria/orchestrator/workflows/builtin/uninstall.py
+++ b/aria/orchestrator/workflows/builtin/uninstall.py
@@ -27,7 +27,7 @@ from ..api import task
def uninstall(ctx, graph, node_instances=(), node_instance_sub_workflows=None):
"""
The uninstall workflow
- :param WorkflowContext context: the workflow context
+ :param WorkflowContext ctx: the workflow context
:param TaskGraph graph: the graph which will describe the workflow.
:param node_instances: the node instances on which to run the workflow
:param dict node_instance_sub_workflows: a dictionary of subworkflows with id as key and
@@ -47,6 +47,7 @@ def uninstall(ctx, graph, node_instances=(), node_instance_sub_workflows=None):
# create dependencies between the node instance sub workflow
for node_instance in node_instances:
node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
- for relationship_instance in reversed(node_instance.relationship_instances):
- graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id],
+ for relationship_instance in reversed(node_instance.outbound_relationship_instances):
+ target_id = relationship_instance.target_node_instance.id
+ graph.add_dependency(node_instance_sub_workflows[target_id],
node_instance_sub_workflow)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/workflows/builtin/workflows.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py
index 0eb8c34..4f765b3 100644
--- a/aria/orchestrator/workflows/builtin/workflows.py
+++ b/aria/orchestrator/workflows/builtin/workflows.py
@@ -37,7 +37,6 @@ __all__ = (
def install_node_instance(graph, node_instance, **kwargs):
"""
A workflow which installs a node instance.
- :param WorkflowContext ctx: the workflow context
:param TaskGraph graph: the tasks graph of which to edit
:param node_instance: the node instance to install
:return:
@@ -68,7 +67,6 @@ def install_node_instance(graph, node_instance, **kwargs):
def preconfigure_relationship(graph, node_instance, **kwargs):
"""
- :param context:
:param graph:
:param node_instance:
:return:
@@ -82,7 +80,6 @@ def preconfigure_relationship(graph, node_instance, **kwargs):
def postconfigure_relationship(graph, node_instance, **kwargs):
"""
- :param context:
:param graph:
:param node_instance:
:return:
@@ -96,7 +93,6 @@ def postconfigure_relationship(graph, node_instance, **kwargs):
def establish_relationship(graph, node_instance, **kwargs):
"""
- :param context:
:param graph:
:param node_instance:
:return:
@@ -113,7 +109,6 @@ def establish_relationship(graph, node_instance, **kwargs):
def uninstall_node_instance(graph, node_instance, **kwargs):
"""
A workflow which uninstalls a node instance.
- :param WorkflowContext context: the workflow context
:param TaskGraph graph: the tasks graph of which to edit
:param node_instance: the node instance to uninstall
:return:
@@ -135,7 +130,6 @@ def uninstall_node_instance(graph, node_instance, **kwargs):
def unlink_relationship(graph, node_instance):
"""
- :param context:
:param graph:
:param node_instance:
:return:
@@ -179,8 +173,8 @@ def relationships_tasks(graph, operation_name, node_instance):
:return:
"""
relationships_groups = groupby(
- node_instance.relationship_instances,
- key=lambda relationship_instance: relationship_instance.relationship.target_id)
+ node_instance.outbound_relationship_instances,
+ key=lambda relationship_instance: relationship_instance.target_node_instance.id)
sub_tasks = []
for _, (_, relationship_group) in enumerate(relationships_groups):
@@ -196,11 +190,8 @@ def relationships_tasks(graph, operation_name, node_instance):
def relationship_tasks(relationship_instance, operation_name):
"""
Creates a relationship task source and target.
- :param NodeInstance node_instance: the node instance of the relationship
:param RelationshipInstance relationship_instance: the relationship instance itself
- :param WorkflowContext context:
:param operation_name:
- :param index: the relationship index - enables pretty print
:return:
"""
source_operation = task.OperationTask.relationship_instance(
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 87ea8c6..2d26aeb 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -100,7 +100,11 @@ class Engine(logger.LoggerMixin):
return len(self._execution_graph.node) == 0
def _tasks_iter(self):
- return (data['task'] for _, data in self._execution_graph.nodes_iter(data=True))
+ for _, data in self._execution_graph.nodes_iter(data=True):
+ task = data['task']
+ if isinstance(task, engine_task.OperationTask):
+ self._workflow_context.model.task.refresh(task.model_task)
+ yield task
def _handle_executable_task(self, task):
if isinstance(task, engine_task.StubTask):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index a583cfc..0be17fe 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -106,32 +106,34 @@ class OperationTask(BaseTask):
def __init__(self, api_task, *args, **kwargs):
super(OperationTask, self).__init__(id=api_task.id, **kwargs)
self._workflow_context = api_task._workflow_context
- task_model = api_task._workflow_context.model.task.model_cls
- operation_task = task_model(
- id=api_task.id,
- name=api_task.name,
- operation_mapping=api_task.operation_mapping,
- actor=api_task.actor,
- inputs=api_task.inputs,
- status=task_model.PENDING,
- execution_id=self._workflow_context._execution_id,
- max_attempts=api_task.max_attempts,
- retry_interval=api_task.retry_interval,
- ignore_failure=api_task.ignore_failure
- )
+ base_task_model = api_task._workflow_context.model.task.model_cls
if isinstance(api_task.actor, models.NodeInstance):
context_class = operation_context.NodeOperationContext
+ task_model_cls = base_task_model.as_node_instance
elif isinstance(api_task.actor, models.RelationshipInstance):
context_class = operation_context.RelationshipOperationContext
+ task_model_cls = base_task_model.as_relationship_instance
else:
- raise RuntimeError('No operation context could be created for {0}'
- .format(api_task.actor.model_cls))
+ raise RuntimeError('No operation context could be created for {actor.model_cls}'
+ .format(actor=api_task.actor))
+
+ operation_task = task_model_cls(
+ name=api_task.name,
+ operation_mapping=api_task.operation_mapping,
+ instance_id=api_task.actor.id,
+ inputs=api_task.inputs,
+ status=base_task_model.PENDING,
+ max_attempts=api_task.max_attempts,
+ retry_interval=api_task.retry_interval,
+ ignore_failure=api_task.ignore_failure,
+ )
+ self._workflow_context.model.task.put(operation_task)
self._ctx = context_class(name=api_task.name,
workflow_context=self._workflow_context,
- task=operation_task)
- self._workflow_context.model.task.store(operation_task)
+ task=operation_task,
+ actor=operation_task.actor)
self._task_id = operation_task.id
self._update_fields = None
@@ -161,7 +163,7 @@ class OperationTask(BaseTask):
@model_task.setter
def model_task(self, value):
- self._workflow_context.model.task.store(value)
+ self._workflow_context.model.task.put(value)
@property
def context(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/storage/__init__.py
----------------------------------------------------------------------
diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py
index 2d142a5..fd69d47 100644
--- a/aria/storage/__init__.py
+++ b/aria/storage/__init__.py
@@ -20,14 +20,14 @@ Path: aria.storage
Storage package is a generic abstraction over different storage types.
We define this abstraction with the following components:
-1. storage: simple api to use
-2. driver: implementation of the database client api.
+1. storage: simple mapi to use
+2. driver: implementation of the database client mapi.
3. model: defines the structure of the table/document.
4. field: defines a field/item in the model.
API:
* application_storage_factory - function, default Aria storage factory.
- * Storage - class, simple storage api.
+ * Storage - class, simple storage mapi.
* models - module, default Aria standard models.
* structures - module, default Aria structures - holds the base model,
and different fields types.
@@ -37,354 +37,28 @@ API:
* drivers - module, a pool of Aria standard drivers.
* StorageDriver - class, abstract model implementation.
"""
-# todo: rewrite the above package documentation
-# (something like explaning the two types of storage - models and resources)
-
-from collections import namedtuple
-
-from .structures import Storage, Field, Model, IterField, PointerField
-from .drivers import (
- ModelDriver,
- ResourceDriver,
- FileSystemResourceDriver,
- FileSystemModelDriver,
+from .core import (
+ Storage,
+ ModelStorage,
+ ResourceStorage,
+)
+from . import (
+ exceptions,
+ api,
+ structures,
+ core,
+ filesystem_rapi,
+ sql_mapi,
+ models
)
-from . import models, exceptions
__all__ = (
- 'ModelStorage',
- 'ResourceStorage',
- 'FileSystemModelDriver',
- 'models',
+ 'exceptions',
'structures',
- 'Field',
- 'IterField',
- 'PointerField',
- 'Model',
- 'drivers',
- 'ModelDriver',
- 'ResourceDriver',
- 'FileSystemResourceDriver',
+ # 'Storage',
+ # 'ModelStorage',
+ # 'ResourceStorage',
+ 'filesystem_rapi',
+ 'sql_mapi',
+ 'api'
)
-# todo: think about package output api's...
-# todo: in all drivers name => entry_type
-# todo: change in documentation str => basestring
-
-
-class ModelStorage(Storage):
- """
- Managing the models storage.
- """
- def __init__(self, driver, model_classes=(), **kwargs):
- """
- Simple storage client api for Aria applications.
- The storage instance defines the tables/documents/code api.
-
- :param ModelDriver driver: model storage driver.
- :param model_classes: the models to register.
- """
- assert isinstance(driver, ModelDriver)
- super(ModelStorage, self).__init__(driver, model_classes, **kwargs)
-
- def __getattr__(self, table):
- """
- getattr is a shortcut to simple api
-
- for Example:
- >> storage = ModelStorage(driver=FileSystemModelDriver('/tmp'))
- >> node_table = storage.node
- >> for node in node_table:
- >> print node
-
- :param str table: table name to get
- :return: a storage object that mapped to the table name
- """
- return super(ModelStorage, self).__getattr__(table)
-
- def register(self, model_cls):
- """
- Registers the model type in the resource storage manager.
- :param model_cls: the model to register.
- """
- model_name = generate_lower_name(model_cls)
- model_api = _ModelApi(model_name, self.driver, model_cls)
- self.registered[model_name] = model_api
-
- for pointer_schema_register in model_api.pointer_mapping.values():
- model_cls = pointer_schema_register.model_cls
- self.register(model_cls)
-
-_Pointer = namedtuple('_Pointer', 'name, is_iter')
-
-
-class _ModelApi(object):
- def __init__(self, name, driver, model_cls):
- """
- Managing the model in the storage, using the driver.
-
- :param basestring name: the name of the model.
- :param ModelDriver driver: the driver which supports this model in the storage.
- :param Model model_cls: table/document class model.
- """
- assert isinstance(driver, ModelDriver)
- assert issubclass(model_cls, Model)
- self.name = name
- self.driver = driver
- self.model_cls = model_cls
- self.pointer_mapping = {}
- self._setup_pointers_mapping()
-
- def _setup_pointers_mapping(self):
- for field_name, field_cls in vars(self.model_cls).items():
- if not(isinstance(field_cls, PointerField) and field_cls.type):
- continue
- pointer_key = _Pointer(field_name, is_iter=isinstance(field_cls, IterField))
- self.pointer_mapping[pointer_key] = self.__class__(
- name=generate_lower_name(field_cls.type),
- driver=self.driver,
- model_cls=field_cls.type)
-
- def __iter__(self):
- return self.iter()
-
- def __repr__(self):
- return '{self.name}(driver={self.driver}, model={self.model_cls})'.format(self=self)
-
- def create(self):
- """
- Creates the model in the storage.
- """
- with self.driver as connection:
- connection.create(self.name)
-
- def get(self, entry_id, **kwargs):
- """
- Getter for the model from the storage.
-
- :param basestring entry_id: the id of the table/document.
- :return: model instance
- :rtype: Model
- """
- with self.driver as connection:
- data = connection.get(
- name=self.name,
- entry_id=entry_id,
- **kwargs)
- data.update(self._get_pointers(data, **kwargs))
- return self.model_cls(**data)
-
- def store(self, entry, **kwargs):
- """
- Setter for the model in the storage.
-
- :param Model entry: the table/document to store.
- """
- assert isinstance(entry, self.model_cls)
- with self.driver as connection:
- data = entry.fields_dict
- data.update(self._store_pointers(data, **kwargs))
- connection.store(
- name=self.name,
- entry_id=entry.id,
- entry=data,
- **kwargs)
-
- def delete(self, entry_id, **kwargs):
- """
- Delete the model from storage.
-
- :param basestring entry_id: id of the entity to delete from storage.
- """
- entry = self.get(entry_id)
- with self.driver as connection:
- self._delete_pointers(entry, **kwargs)
- connection.delete(
- name=self.name,
- entry_id=entry_id,
- **kwargs)
-
- def iter(self, **kwargs):
- """
- Generator over the entries of model in storage.
- """
- with self.driver as connection:
- for data in connection.iter(name=self.name, **kwargs):
- data.update(self._get_pointers(data, **kwargs))
- yield self.model_cls(**data)
-
- def update(self, entry_id, **kwargs):
- """
- Updates and entry in storage.
-
- :param str entry_id: the id of the table/document.
- :param kwargs: the fields to update.
- :return:
- """
- with self.driver as connection:
- connection.update(
- name=self.name,
- entry_id=entry_id,
- **kwargs
- )
-
- def _get_pointers(self, data, **kwargs):
- pointers = {}
- for field, schema in self.pointer_mapping.items():
- if field.is_iter:
- pointers[field.name] = [
- schema.get(entry_id=pointer_id, **kwargs)
- for pointer_id in data[field.name]
- if pointer_id]
- elif data[field.name]:
- pointers[field.name] = schema.get(entry_id=data[field.name], **kwargs)
- return pointers
-
- def _store_pointers(self, data, **kwargs):
- pointers = {}
- for field, model_api in self.pointer_mapping.items():
- if field.is_iter:
- pointers[field.name] = []
- for iter_entity in data[field.name]:
- pointers[field.name].append(iter_entity.id)
- model_api.store(iter_entity, **kwargs)
- else:
- pointers[field.name] = data[field.name].id
- model_api.store(data[field.name], **kwargs)
- return pointers
-
- def _delete_pointers(self, entry, **kwargs):
- for field, schema in self.pointer_mapping.items():
- if field.is_iter:
- for iter_entry in getattr(entry, field.name):
- schema.delete(iter_entry.id, **kwargs)
- else:
- schema.delete(getattr(entry, field.name).id, **kwargs)
-
-
-class ResourceApi(object):
- """
- Managing the resource in the storage, using the driver.
-
- :param basestring name: the name of the resource.
- :param ResourceDriver driver: the driver which supports this resource in the storage.
- """
- def __init__(self, driver, resource_name):
- """
- Managing the resources in the storage, using the driver.
-
- :param ResourceDriver driver: the driver which supports this model in the storage.
- :param basestring resource_name: the type of the entry this resourceAPI manages.
- """
- assert isinstance(driver, ResourceDriver)
- self.driver = driver
- self.resource_name = resource_name
-
- def __repr__(self):
- return '{name}(driver={self.driver}, resource={self.resource_name})'.format(
- name=self.__class__.__name__, self=self)
-
- def create(self):
- """
- Create the resource dir in the storage.
- """
- with self.driver as connection:
- connection.create(self.resource_name)
-
- def data(self, entry_id, path=None, **kwargs):
- """
- Retrieve the content of a storage resource.
-
- :param basestring entry_id: the id of the entry.
- :param basestring path: path of the resource on the storage.
- :param kwargs: resources to be passed to the driver..
- :return the content of a single file:
- """
- with self.driver as connection:
- return connection.data(
- entry_type=self.resource_name,
- entry_id=entry_id,
- path=path,
- **kwargs)
-
- def download(self, entry_id, destination, path=None, **kwargs):
- """
- Download a file/dir from the resource storage.
-
- :param basestring entry_id: the id of the entry.
- :param basestring destination: the destination of the file/dir.
- :param basestring path: path of the resource on the storage.
- """
- with self.driver as connection:
- connection.download(
- entry_type=self.resource_name,
- entry_id=entry_id,
- destination=destination,
- path=path,
- **kwargs)
-
- def upload(self, entry_id, source, path=None, **kwargs):
- """
- Upload a file/dir from the resource storage.
-
- :param basestring entry_id: the id of the entry.
- :param basestring source: the source path of the file to upload.
- :param basestring path: the destination of the file, relative to the root dir
- of the resource
- """
- with self.driver as connection:
- connection.upload(
- entry_type=self.resource_name,
- entry_id=entry_id,
- source=source,
- path=path,
- **kwargs)
-
-
-def generate_lower_name(model_cls):
- """
- Generates the name of the class from the class object. e.g. SomeClass -> some_class
- :param model_cls: the class to evaluate.
- :return: lower name
- :rtype: basestring
- """
- return ''.join(
- character if character.islower() else '_{0}'.format(character.lower())
- for character in model_cls.__name__)[1:]
-
-
-class ResourceStorage(Storage):
- """
- Managing the resource storage.
- """
- def __init__(self, driver, resources=(), **kwargs):
- """
- Simple storage client api for Aria applications.
- The storage instance defines the tables/documents/code api.
-
- :param ResourceDriver driver: resource storage driver
- :param resources: the resources to register.
- """
- assert isinstance(driver, ResourceDriver)
- super(ResourceStorage, self).__init__(driver, resources, **kwargs)
-
- def register(self, resource):
- """
- Registers the resource type in the resource storage manager.
- :param resource: the resource to register.
- """
- self.registered[resource] = ResourceApi(self.driver, resource_name=resource)
-
- def __getattr__(self, resource):
- """
- getattr is a shortcut to simple api
-
- for Example:
- >> storage = ResourceStorage(driver=FileSystemResourceDriver('/tmp'))
- >> blueprint_resources = storage.blueprint
- >> blueprint_resources.download(blueprint_id, destination='~/blueprint/')
-
- :param str resource: resource name to download
- :return: a storage object that mapped to the resource name
- :rtype: ResourceApi
- """
- return super(ResourceStorage, self).__getattr__(resource)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/storage/api.py
----------------------------------------------------------------------
diff --git a/aria/storage/api.py b/aria/storage/api.py
new file mode 100644
index 0000000..d6fc3b8
--- /dev/null
+++ b/aria/storage/api.py
@@ -0,0 +1,182 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+General storage API
+"""
+
+
+class StorageAPI(object):
+ """
+ General storage Base API
+ """
+ def create(self, **kwargs):
+ """
+ Create a storage API.
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract create method')
+
+
+class ModelAPI(StorageAPI):
+ """
+ A Base object for the model.
+ """
+ def __init__(self, model_cls, name=None, **kwargs):
+ """
+ Base model API
+
+ :param model_cls: the representing class of the model
+ :param str name: the name of the model
+ :param kwargs:
+ """
+ super(ModelAPI, self).__init__(**kwargs)
+ self._model_cls = model_cls
+ self._name = name or generate_lower_name(model_cls)
+
+ @property
+ def name(self):
+ """
+ The name of the class
+ :return: name of the class
+ """
+ return self._name
+
+ @property
+ def model_cls(self):
+ """
+ The class represting the model
+ :return:
+ """
+ return self._model_cls
+
+ def get(self, entry_id, filters=None, **kwargs):
+ """
+ Get entry from storage.
+
+ :param entry_id:
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract get method')
+
+ def put(self, entry, **kwargs):
+ """
+ Store entry in storage
+
+ :param entry:
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract store method')
+
+ def delete(self, entry_id, **kwargs):
+ """
+ Delete entry from storage.
+
+ :param entry_id:
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract delete method')
+
+ def __iter__(self):
+ return self.iter()
+
+ def iter(self, **kwargs):
+ """
+ Iter over the entries in storage.
+
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract iter method')
+
+ def update(self, entry, **kwargs):
+ """
+ Update entry in storage.
+
+ :param entry:
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract update method')
+
+
+class ResourceAPI(StorageAPI):
+ """
+ A Base object for the resource.
+ """
+ def __init__(self, name):
+ """
+ Base resource API
+ :param str name: the resource type
+ """
+ self._name = name
+
+ @property
+ def name(self):
+ """
+ The name of the resource
+ :return:
+ """
+ return self._name
+
+ def read(self, entry_id, path=None, **kwargs):
+ """
+ Get a bytesteam from the storage.
+
+ :param entry_id:
+ :param path:
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract data method')
+
+ def download(self, entry_id, destination, path=None, **kwargs):
+ """
+ Download a resource from the storage.
+
+ :param entry_id:
+ :param destination:
+ :param path:
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract download method')
+
+ def upload(self, entry_id, source, path=None, **kwargs):
+ """
+ Upload a resource to the storage.
+
+ :param entry_id:
+ :param source:
+ :param path:
+ :param kwargs:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract upload method')
+
+
+def generate_lower_name(model_cls):
+ """
+ Generates the name of the class from the class object. e.g. SomeClass -> some_class
+ :param model_cls: the class to evaluate.
+ :return: lower name
+ :rtype: basestring
+ """
+ return ''.join(
+ character if character.islower() else '_{0}'.format(character.lower())
+ for character in model_cls.__name__)[1:]
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/storage/core.py
----------------------------------------------------------------------
diff --git a/aria/storage/core.py b/aria/storage/core.py
new file mode 100644
index 0000000..a5d3210
--- /dev/null
+++ b/aria/storage/core.py
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Aria's storage Sub-Package
+Path: aria.storage
+
+Storage package is a generic abstraction over different storage types.
+We define this abstraction with the following components:
+
+1. storage: simple mapi to use
+2. driver: implementation of the database client mapi.
+3. model: defines the structure of the table/document.
+4. field: defines a field/item in the model.
+
+API:
+ * application_storage_factory - function, default Aria storage factory.
+ * Storage - class, simple storage mapi.
+ * models - module, default Aria standard models.
+ * structures - module, default Aria structures - holds the base model,
+ and different fields types.
+ * Model - class, abstract model implementation.
+ * Field - class, base field implementation.
+ * IterField - class, base iterable field implementation.
+ * drivers - module, a pool of Aria standard drivers.
+ * StorageDriver - class, abstract model implementation.
+"""
+
+from aria.logger import LoggerMixin
+from . import api as storage_api
+
+__all__ = (
+ 'Storage',
+ 'ModelStorage',
+ 'ResourceStorage'
+)
+
+
+class Storage(LoggerMixin):
+ """
+ Represents the storage
+ """
+ def __init__(self, api_cls, api_kwargs=None, items=(), **kwargs):
+ self._api_kwargs = api_kwargs or {}
+ super(Storage, self).__init__(**kwargs)
+ self.api = api_cls
+ self.registered = {}
+ for item in items:
+ self.register(item)
+ self.logger.debug('{name} object is ready: {0!r}'.format(
+ self, name=self.__class__.__name__))
+
+ def __repr__(self):
+ return '{name}(api={self.api})'.format(name=self.__class__.__name__, self=self)
+
+ def __getattr__(self, item):
+ try:
+ return self.registered[item]
+ except KeyError:
+ return super(Storage, self).__getattribute__(item)
+
+ def register(self, entry):
+ """
+ Register the entry to the storage
+ :param name:
+ :return:
+ """
+ raise NotImplementedError('Subclass must implement abstract register method')
+
+
+class ResourceStorage(Storage):
+ """
+ Represents resource storage.
+ """
+ def register(self, name):
+ """
+ Register the resource type to resource storage.
+ :param name:
+ :return:
+ """
+ self.registered[name] = self.api(name=name, **self._api_kwargs)
+ self.registered[name].create()
+ self.logger.debug('setup {name} in storage {self!r}'.format(name=name, self=self))
+
+
+class ModelStorage(Storage):
+ """
+ Represents model storage.
+ """
+ def register(self, model_cls):
+ """
+ Register the model into the model storage.
+ :param model_cls: the model to register.
+ :return:
+ """
+ model_name = storage_api.generate_lower_name(model_cls)
+ if model_name in self.registered:
+ self.logger.debug('{name} in already storage {self!r}'.format(name=model_name,
+ self=self))
+ return
+ self.registered[model_name] = self.api(name=model_name,
+ model_cls=model_cls,
+ **self._api_kwargs)
+ self.registered[model_name].create()
+ self.logger.debug('setup {name} in storage {self!r}'.format(name=model_name, self=self))
+
+ def drop(self):
+ """
+ Drop all the tables from the model.
+ :return:
+ """
+ for mapi in self.registered.values():
+ mapi.drop()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/storage/drivers.py
----------------------------------------------------------------------
diff --git a/aria/storage/drivers.py b/aria/storage/drivers.py
deleted file mode 100644
index 1f96956..0000000
--- a/aria/storage/drivers.py
+++ /dev/null
@@ -1,416 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Aria's storage.drivers module
-Path: aria.storage.driver
-
-drivers module holds a generic abstract implementation of drivers.
-
-classes:
- * Driver - abstract storage driver implementation.
- * ModelDriver - abstract model base storage driver.
- * ResourceDriver - abstract resource base storage driver.
- * FileSystemModelDriver - file system implementation for model storage driver.
- * FileSystemResourceDriver - file system implementation for resource storage driver.
-"""
-
-import distutils.dir_util # pylint: disable=no-name-in-module, import-error
-import os
-import shutil
-from functools import partial
-from multiprocessing import RLock
-
-import jsonpickle
-
-from ..logger import LoggerMixin
-from .exceptions import StorageError
-
-__all__ = (
- 'ModelDriver',
- 'FileSystemModelDriver',
- 'ResourceDriver',
- 'FileSystemResourceDriver',
-)
-
-
-class Driver(LoggerMixin):
- """
- Driver: storage driver context manager - abstract driver implementation.
- In the implementation level, It is a good practice to raise StorageError on Errors.
- """
-
- def __enter__(self):
- """
- Context manager entry method, executes connect.
- :return: context manager instance
- :rtype: Driver
- """
- self.connect()
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- """
- Context manager exit method, executes disconnect.
- """
- self.disconnect()
- if not exc_type:
- return
- # self.logger.debug(
- # '{name} had an error'.format(name=self.__class__.__name__),
- # exc_info=(exc_type, exc_val, exc_tb))
- if StorageError in exc_type.mro():
- return
- raise StorageError('Exception had occurred, {type}: {message}'.format(
- type=exc_type, message=str(exc_val)))
-
- def connect(self):
- """
- Open storage connection.
- In some cases, This method can get the connection from a connection pool.
- """
- pass
-
- def disconnect(self):
- """
- Close storage connection.
- In some cases, This method can release the connection to the connection pool.
- """
- pass
-
- def create(self, name, *args, **kwargs):
- """
- Create table/document in storage by name.
- :param str name: name of table/document in storage.
- """
- pass
-
-
-class ModelDriver(Driver):
- """
- ModelDriver context manager.
- Base Driver for Model based storage.
- """
-
- def get(self, name, entry_id, **kwargs):
- """
- Getter from storage.
- :param str name: name of table/document in storage.
- :param str entry_id: id of the document to get from storage.
- :return: value of entity from the storage.
- """
- raise NotImplementedError('Subclass must implement abstract get method')
-
- def delete(self, name, entry_id, **kwargs):
- """
- Delete from storage.
- :param str name: name of table/document in storage.
- :param str entry_id: id of the entity to delete from storage.
- :param dict kwargs: extra kwargs if needed.
- """
- raise NotImplementedError('Subclass must implement abstract delete method')
-
- def store(self, name, entry_id, entry, **kwargs):
- """
- Setter to storage.
- :param str name: name of table/document in storage.
- :param str entry_id: id of the entity to store in the storage.
- :param dict entry: content to store.
- """
- raise NotImplementedError('Subclass must implement abstract store method')
-
- def iter(self, name, **kwargs):
- """
- Generator over the entries of table/document in storage.
- :param str name: name of table/document/file in storage to iter over.
- """
- raise NotImplementedError('Subclass must implement abstract iter method')
-
- def update(self, name, entry_id, **kwargs):
- """
- Updates and entry in storage.
-
- :param str name: name of table/document in storage.
- :param str entry_id: id of the document to get from storage.
- :param kwargs: the fields to update.
- :return:
- """
- raise NotImplementedError('Subclass must implement abstract store method')
-
-
-class ResourceDriver(Driver):
- """
- ResourceDriver context manager.
- Base Driver for Resource based storage.
-
- Resource storage structure is a file system base.
- <resource root directory>/<resource_name>/<entry_id>/<entry>
- entry: can be one single file or multiple files and directories.
- """
-
- def data(self, entry_type, entry_id, path=None, **kwargs):
- """
- Get the binary data from a file in a resource entry.
- If the entry is a single file no path needed,
- If the entry contain number of files the path will gide to the relevant file.
-
- resource path:
- <resource root directory>/<name>/<entry_id>/<path>
-
- :param basestring entry_type: resource name.
- :param basestring entry_id: id of the entity to resource in the storage.
- :param basestring path: path to resource relative to entry_id folder in the storage.
- :return: entry file object.
- :rtype: bytes
- """
- raise NotImplementedError('Subclass must implement abstract get method')
-
- def download(self, entry_type, entry_id, destination, path=None, **kwargs):
- """
- Download the resource to a destination.
- Like data method bat this method isn't returning data,
- Instead it create a new file in local file system.
-
- resource path:
- <resource root directory>/<name>/<entry_id>/<path>
- copy to:
- /<destination>
- destination can be file or directory
-
- :param basestring entry_type: resource name.
- :param basestring entry_id: id of the entity to resource in the storage.
- :param basestring destination: path in local file system to download to.
- :param basestring path: path to resource relative to entry_id folder in the storage.
- """
- raise NotImplementedError('Subclass must implement abstract get method')
-
- def upload(self, entry_type, entry_id, source, path=None, **kwargs):
- """
- Upload the resource from source.
- source can be file or directory with files.
-
- copy from:
- /<source>
- to resource path:
- <resource root directory>/<name>/<entry_id>/<path>
-
- :param basestring entry_type: resource name.
- :param basestring entry_id: id of the entity to resource in the storage.
- :param basestring source: source can be file or directory with files.
- :param basestring path: path to resource relative to entry_id folder in the storage.
- """
- raise NotImplementedError('Subclass must implement abstract get method')
-
-
-class BaseFileSystemDriver(Driver):
- """
- Base class which handles storage on the file system.
- """
- def __init__(self, *args, **kwargs):
- super(BaseFileSystemDriver, self).__init__(*args, **kwargs)
- self._lock = RLock()
-
- def connect(self):
- self._lock.acquire()
-
- def disconnect(self):
- self._lock.release()
-
- def __getstate__(self):
- obj_dict = super(BaseFileSystemDriver, self).__getstate__()
- del obj_dict['_lock']
- return obj_dict
-
- def __setstate__(self, obj_dict):
- super(BaseFileSystemDriver, self).__setstate__(obj_dict)
- vars(self).update(_lock=RLock(), **obj_dict)
-
-
-class FileSystemModelDriver(ModelDriver, BaseFileSystemDriver):
- """
- FileSystemModelDriver context manager.
- """
-
- def __init__(self, directory, **kwargs):
- """
- File system implementation for storage driver.
- :param str directory: root dir for storage.
- """
- super(FileSystemModelDriver, self).__init__(**kwargs)
- self.directory = directory
-
- self._join_path = partial(os.path.join, self.directory)
-
- def __repr__(self):
- return '{cls.__name__}(directory={self.directory})'.format(
- cls=self.__class__, self=self)
-
- def create(self, name):
- """
- Create directory in storage by path.
- tries to create the root directory as well.
- :param str name: path of file in storage.
- """
- try:
- os.makedirs(self.directory)
- except (OSError, IOError):
- pass
- os.makedirs(self._join_path(name))
-
- def get(self, name, entry_id, **kwargs):
- """
- Getter from storage.
- :param str name: name of directory in storage.
- :param str entry_id: id of the file to get from storage.
- :return: value of file from storage.
- :rtype: dict
- """
- with open(self._join_path(name, entry_id)) as file_obj:
- return jsonpickle.loads(file_obj.read())
-
- def store(self, name, entry_id, entry, **kwargs):
- """
- Delete from storage.
- :param str name: name of directory in storage.
- :param str entry_id: id of the file to delete from storage.
- """
- with open(self._join_path(name, entry_id), 'w') as file_obj:
- file_obj.write(jsonpickle.dumps(entry))
-
- def delete(self, name, entry_id, **kwargs):
- """
- Delete from storage.
- :param str name: name of directory in storage.
- :param str entry_id: id of the file to delete from storage.
- """
- os.remove(self._join_path(name, entry_id))
-
- def iter(self, name, filters=None, **kwargs):
- """
- Generator over the entries of directory in storage.
- :param str name: name of directory in storage to iter over.
- :param dict filters: filters for query
- """
- filters = filters or {}
-
- for entry_id in os.listdir(self._join_path(name)):
- value = self.get(name, entry_id=entry_id)
- for filter_name, filter_value in filters.items():
- if value.get(filter_name) != filter_value:
- break
- else:
- yield value
-
- def update(self, name, entry_id, **kwargs):
- """
- Updates and entry in storage.
-
- :param str name: name of table/document in storage.
- :param str entry_id: id of the document to get from storage.
- :param kwargs: the fields to update.
- :return:
- """
- entry_dict = self.get(name, entry_id)
- entry_dict.update(**kwargs)
- self.store(name, entry_id, entry_dict)
-
-
-class FileSystemResourceDriver(ResourceDriver, BaseFileSystemDriver):
- """
- FileSystemResourceDriver context manager.
- """
-
- def __init__(self, directory, **kwargs):
- """
- File system implementation for storage driver.
- :param str directory: root dir for storage.
- """
- super(FileSystemResourceDriver, self).__init__(**kwargs)
- self.directory = directory
- self._join_path = partial(os.path.join, self.directory)
-
- def __repr__(self):
- return '{cls.__name__}(directory={self.directory})'.format(
- cls=self.__class__, self=self)
-
- def create(self, name):
- """
- Create directory in storage by path.
- tries to create the root directory as well.
- :param basestring name: path of file in storage.
- """
- try:
- os.makedirs(self.directory)
- except (OSError, IOError):
- pass
- os.makedirs(self._join_path(name))
-
- def data(self, entry_type, entry_id, path=None):
- """
- Retrieve the content of a file system storage resource.
-
- :param basestring entry_type: the type of the entry.
- :param basestring entry_id: the id of the entry.
- :param basestring path: a path to a specific resource.
- :return: the content of the file
- :rtype: bytes
- """
- resource_relative_path = os.path.join(entry_type, entry_id, path or '')
- resource = os.path.join(self.directory, resource_relative_path)
- if not os.path.exists(resource):
- raise StorageError("Resource {0} does not exist".format(resource_relative_path))
- if not os.path.isfile(resource):
- resources = os.listdir(resource)
- if len(resources) != 1:
- raise StorageError('No resource in path: {0}'.format(resource))
- resource = os.path.join(resource, resources[0])
- with open(resource, 'rb') as resource_file:
- return resource_file.read()
-
- def download(self, entry_type, entry_id, destination, path=None):
- """
- Download a specific file or dir from the file system resource storage.
-
- :param basestring entry_type: the name of the entry.
- :param basestring entry_id: the id of the entry
- :param basestring destination: the destination of the files.
- :param basestring path: a path on the remote machine relative to the root of the entry.
- """
- resource_relative_path = os.path.join(entry_type, entry_id, path or '')
- resource = os.path.join(self.directory, resource_relative_path)
- if not os.path.exists(resource):
- raise StorageError("Resource {0} does not exist".format(resource_relative_path))
- if os.path.isfile(resource):
- shutil.copy2(resource, destination)
- else:
- distutils.dir_util.copy_tree(resource, destination) # pylint: disable=no-member
-
- def upload(self, entry_type, entry_id, source, path=None):
- """
- Uploads a specific file or dir to the file system resource storage.
-
- :param basestring entry_type: the name of the entry.
- :param basestring entry_id: the id of the entry
- :param source: the source of the files to upload.
- :param path: the destination of the file/s relative to the entry root dir.
- """
- resource_directory = os.path.join(self.directory, entry_type, entry_id)
- if not os.path.exists(resource_directory):
- os.makedirs(resource_directory)
- destination = os.path.join(resource_directory, path or '')
- if os.path.isfile(source):
- shutil.copy2(source, destination)
- else:
- distutils.dir_util.copy_tree(source, destination) # pylint: disable=no-member
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/storage/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/storage/exceptions.py b/aria/storage/exceptions.py
index 22dfc50..f982f63 100644
--- a/aria/storage/exceptions.py
+++ b/aria/storage/exceptions.py
@@ -12,7 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+"""
+Storage based exceptions
+"""
from .. import exceptions
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/storage/filesystem_rapi.py
----------------------------------------------------------------------
diff --git a/aria/storage/filesystem_rapi.py b/aria/storage/filesystem_rapi.py
new file mode 100644
index 0000000..f810f58
--- /dev/null
+++ b/aria/storage/filesystem_rapi.py
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+SQLalchemy based RAPI
+"""
+import os
+import shutil
+from contextlib import contextmanager
+from functools import partial
+from distutils import dir_util # https://github.com/PyCQA/pylint/issues/73; pylint: disable=no-name-in-module
+from multiprocessing import RLock
+
+from aria.storage import (
+ api,
+ exceptions
+)
+
+
+class FileSystemResourceAPI(api.ResourceAPI):
+ """
+ File system resource storage.
+ """
+
+ def __init__(self, directory, **kwargs):
+ """
+ File system implementation for storage api.
+ :param str directory: root dir for storage.
+ """
+ super(FileSystemResourceAPI, self).__init__(**kwargs)
+ self.directory = directory
+ self.base_path = os.path.join(self.directory, self.name)
+ self._join_path = partial(os.path.join, self.base_path)
+ self._lock = RLock()
+
+ @contextmanager
+ def connect(self):
+ """
+ Established a connection and destroys it after use.
+ :return:
+ """
+ try:
+ self._establish_connection()
+ yield self
+ except BaseException as e:
+ raise exceptions.StorageError(str(e))
+ finally:
+ self._destroy_connection()
+
+ def _establish_connection(self):
+ """
+ Establish a conenction. used in the 'connect' contextmanager.
+ :return:
+ """
+ self._lock.acquire()
+
+
+ def _destroy_connection(self):
+ """
+ Destroy a connection. used in the 'connect' contextmanager.
+ :return:
+ """
+ self._lock.release()
+
+ def __repr__(self):
+ return '{cls.__name__}(directory={self.directory})'.format(
+ cls=self.__class__, self=self)
+
+ def create(self, **kwargs):
+ """
+ Create directory in storage by path.
+ tries to create the root directory as well.
+ :param str name: path of file in storage.
+ """
+ try:
+ os.makedirs(self.directory)
+ except (OSError, IOError):
+ pass
+ os.makedirs(self.base_path)
+
+ def read(self, entry_id, path=None, **_):
+ """
+ Retrieve the content of a file system storage resource.
+
+ :param str entry_type: the type of the entry.
+ :param str entry_id: the id of the entry.
+ :param str path: a path to a specific resource.
+ :return: the content of the file
+ :rtype: bytes
+ """
+ resource_relative_path = os.path.join(self.name, entry_id, path or '')
+ resource = os.path.join(self.directory, resource_relative_path)
+ if not os.path.exists(resource):
+ raise exceptions.StorageError("Resource {0} does not exist".
+ format(resource_relative_path))
+ if not os.path.isfile(resource):
+ resources = os.listdir(resource)
+ if len(resources) != 1:
+ raise exceptions.StorageError('No resource in path: {0}'.format(resource))
+ resource = os.path.join(resource, resources[0])
+ with open(resource, 'rb') as resource_file:
+ return resource_file.read()
+
+ def download(self, entry_id, destination, path=None, **_):
+ """
+ Download a specific file or dir from the file system resource storage.
+
+ :param str entry_type: the name of the entry.
+ :param str entry_id: the id of the entry
+ :param str destination: the destination of the files.
+ :param str path: a path on the remote machine relative to the root of the entry.
+ """
+ resource_relative_path = os.path.join(self.name, entry_id, path or '')
+ resource = os.path.join(self.directory, resource_relative_path)
+ if not os.path.exists(resource):
+ raise exceptions.StorageError("Resource {0} does not exist".
+ format(resource_relative_path))
+ if os.path.isfile(resource):
+ shutil.copy2(resource, destination)
+ else:
+ dir_util.copy_tree(resource, destination) # pylint: disable=no-member
+
+ def upload(self, entry_id, source, path=None, **_):
+ """
+ Uploads a specific file or dir to the file system resource storage.
+
+ :param str entry_type: the name of the entry.
+ :param str entry_id: the id of the entry
+ :param source: the source of the files to upload.
+ :param path: the destination of the file/s relative to the entry root dir.
+ """
+ resource_directory = os.path.join(self.directory, self.name, entry_id)
+ if not os.path.exists(resource_directory):
+ os.makedirs(resource_directory)
+ destination = os.path.join(resource_directory, path or '')
+ if os.path.isfile(source):
+ shutil.copy2(source, destination)
+ else:
+ dir_util.copy_tree(source, destination) # pylint: disable=no-member