You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/02/13 18:09:47 UTC
[7/7] incubator-ariatosca git commit: wip
wip
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/67b75425
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/67b75425
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/67b75425
Branch: refs/heads/ARIA-106-Create-sqla-logging-handler
Commit: 67b754254760db0f1276bc10a293c80c1f5f8a54
Parents: b619335
Author: mxmrlv <mx...@gmail.com>
Authored: Mon Feb 13 12:28:27 2017 +0200
Committer: mxmrlv <mx...@gmail.com>
Committed: Mon Feb 13 20:09:21 2017 +0200
----------------------------------------------------------------------
aria/__init__.py | 3 +-
aria/logger.py | 42 ++++++++++-
aria/orchestrator/context/common.py | 22 +++++-
aria/orchestrator/workflows/core/engine.py | 5 ++
aria/orchestrator/workflows/executor/process.py | 1 -
aria/storage/__init__.py | 6 --
aria/storage/core.py | 16 +++--
aria/storage/modeling/model.py | 4 ++
aria/storage/modeling/orchestrator_elements.py | 12 ++++
tests/mock/topology.py | 2 +-
tests/orchestrator/context/test_operation.py | 74 ++++++++++++++++++--
tests/storage/__init__.py | 23 +++---
tests/storage/test_instrumentation.py | 6 +-
13 files changed, 176 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/__init__.py
----------------------------------------------------------------------
diff --git a/aria/__init__.py b/aria/__init__.py
index 18eaa56..43529f0 100644
--- a/aria/__init__.py
+++ b/aria/__init__.py
@@ -97,7 +97,8 @@ def application_model_storage(api, api_kwargs=None, initiator=None, initiator_kw
storage.modeling.model.ServiceInstanceUpdateStep,
storage.modeling.model.ServiceInstanceModification,
storage.modeling.model.Plugin,
- storage.modeling.model.Task
+ storage.modeling.model.Task,
+ storage.modeling.model.Log
]
return storage.ModelStorage(api_cls=api,
api_kwargs=api_kwargs,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index 0002cb5..f19d286 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -17,8 +17,10 @@
Logging related mixins and functions
"""
+
import logging
-from logging.handlers import RotatingFileHandler
+from logging import handlers as logging_handlers
+from datetime import datetime
_base_logger = logging.getLogger('aria')
@@ -124,7 +126,7 @@ def create_file_log_handler(
"""
Create a logging.handlers.RotatingFileHandler
"""
- rotating_file = RotatingFileHandler(
+ rotating_file = logging_handlers.RotatingFileHandler(
filename=file_path,
maxBytes=max_bytes,
backupCount=backup_count,
@@ -135,5 +137,41 @@ def create_file_log_handler(
return rotating_file
+class SQLAlchemyHandler(logging.Handler):
+ def __init__(self, session, engine, log_cls, **kwargs):
+ self._session = session
+ self._engine = engine
+ self._cls = log_cls
+ super(SQLAlchemyHandler, self).__init__(**kwargs)
+
+ def emit(self, record):
+ log = self._cls(
+ logger=record.name,
+ level=record.levelname,
+ msg=record.msg,
+ created_at=datetime.utcnow()
+ )
+ self._session.add(log)
+ self._session.commit()
+
+
+class _SQLAlchemyHandlerFactory(object):
+ from aria.storage.model import Log
+
+ def __init__(self):
+ self._handler = None
+
+ def __call__(self, session, engine, model_cls=Log, level=logging.DEBUG):
+ if self._handler is None or not self._is_eq(session, engine, model_cls):
+ self._handler = SQLAlchemyHandler(session, engine, model_cls, level=level)
+ return self._handler
+
+ def _is_eq(self, session, engine, model_cls):
+ return all([self._handler._session == session,
+ self._handler._engine == engine,
+ self._handler._cls == model_cls])
+
+create_sqla_log_handler = _SQLAlchemyHandlerFactory()
+
_default_file_formatter = logging.Formatter(
'%(asctime)s [%(name)s:%(levelname)s] %(message)s <%(pathname)s:%(lineno)d>')
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 37482cf..d5ef42c 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -17,13 +17,15 @@ A common context for both workflow and operation
"""
from uuid import uuid4
+import logging
+
import jinja2
from aria import logger
from aria.storage import exceptions
-class BaseContext(logger.LoggerMixin):
+class BaseContext(object):
"""
Base context object for workflow and operation
"""
@@ -34,6 +36,7 @@ class BaseContext(logger.LoggerMixin):
service_instance_id,
model_storage,
resource_storage,
+ ctx_logger=None,
workdir=None,
**kwargs):
super(BaseContext, self).__init__(**kwargs)
@@ -42,8 +45,21 @@ class BaseContext(logger.LoggerMixin):
self._model = model_storage
self._resource = resource_storage
self._service_instance_id = service_instance_id
+ self._logger = self._init_logger(ctx_logger)
self._workdir = workdir
+ def _init_logger(self, ctx_logger=None):
+ ctx_logger = ctx_logger or logging.getLogger('aria_ctx')
+
+ # A handler should be registered only once.
+ sqla_handler = logger.create_sqla_log_handler(**self._model.all_api_kwargs)
+ if sqla_handler not in ctx_logger.handlers:
+ ctx_logger.addHandler(sqla_handler)
+
+ ctx_logger.setLevel(logging.DEBUG)
+
+ return ctx_logger
+
def __repr__(self):
return (
'{name}(name={self.name}, '
@@ -51,6 +67,10 @@ class BaseContext(logger.LoggerMixin):
.format(name=self.__class__.__name__, self=self))
@property
+ def logger(self):
+ return self._logger
+
+ @property
def model(self):
"""
Access to the model storage
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 55b4159..7148dd1 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -71,6 +71,11 @@ class Engine(logger.LoggerMixin):
except BaseException as e:
events.on_failure_workflow_signal.send(self._workflow_context, exception=e)
raise
+ finally:
+ # Each context creates its own handlers an assign them to the logger.
+ # This enables easy serialization. In order the handlers would not overlap, we
+ # need to clear them each execution.
+ self._workflow_context.logger.handlers = []
def cancel_execution(self):
"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 560ac43..acc0828 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -324,7 +324,6 @@ def _main():
# This is required for the instrumentation work properly.
# See docstring of `remove_mutable_association_listener` for further details
storage_type.remove_mutable_association_listener()
-
with instrumentation.track_changes() as instrument:
try:
ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/storage/__init__.py
----------------------------------------------------------------------
diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py
index eaadc7e..b76bdf2 100644
--- a/aria/storage/__init__.py
+++ b/aria/storage/__init__.py
@@ -42,12 +42,6 @@ from .core import (
ModelStorage,
ResourceStorage,
)
-from .modeling import (
- structure,
- model,
- model_base,
- type
-)
from . import (
exceptions,
api,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/storage/core.py
----------------------------------------------------------------------
diff --git a/aria/storage/core.py b/aria/storage/core.py
index 0e189e6..7d70070 100644
--- a/aria/storage/core.py
+++ b/aria/storage/core.py
@@ -84,6 +84,12 @@ class Storage(LoggerMixin):
self.logger.debug('{name} object is ready: {0!r}'.format(
self, name=self.__class__.__name__))
+ @property
+ def all_api_kwargs(self):
+ kwargs = self._api_kwargs.copy()
+ kwargs.update(self._additional_api_kwargs)
+ return kwargs
+
def __repr__(self):
return '{name}(api={self.api})'.format(name=self.__class__.__name__, self=self)
@@ -121,9 +127,7 @@ class ResourceStorage(Storage):
:param name:
:return:
"""
- kwargs = self._api_kwargs.copy()
- kwargs.update(self._additional_api_kwargs)
- self.registered[name] = self.api(name=name, **kwargs)
+ self.registered[name] = self.api(name=name, **self.all_api_kwargs)
self.registered[name].create()
self.logger.debug('setup {name} in storage {self!r}'.format(name=name, self=self))
@@ -148,9 +152,9 @@ class ModelStorage(Storage):
self.logger.debug('{name} in already storage {self!r}'.format(name=model_name,
self=self))
return
- kwargs = self._api_kwargs.copy()
- kwargs.update(self._additional_api_kwargs)
- self.registered[model_name] = self.api(name=model_name, model_cls=model_cls, **kwargs)
+ self.registered[model_name] = self.api(name=model_name,
+ model_cls=model_cls,
+ **self.all_api_kwargs)
self.registered[model_name].create()
self.logger.debug('setup {name} in storage {self!r}'.format(name=model_name, self=self))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/storage/modeling/model.py
----------------------------------------------------------------------
diff --git a/aria/storage/modeling/model.py b/aria/storage/modeling/model.py
index 62b90b3..cf7d933 100644
--- a/aria/storage/modeling/model.py
+++ b/aria/storage/modeling/model.py
@@ -216,4 +216,8 @@ class Plugin(aria_declarative_base, orchestrator_elements.PluginBase):
class Task(aria_declarative_base, orchestrator_elements.TaskBase):
pass
+
+
+class Log(aria_declarative_base, orchestrator_elements.LogBase):
+ pass
# endregion
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/storage/modeling/orchestrator_elements.py
----------------------------------------------------------------------
diff --git a/aria/storage/modeling/orchestrator_elements.py b/aria/storage/modeling/orchestrator_elements.py
index 5f7a3f2..8efc147 100644
--- a/aria/storage/modeling/orchestrator_elements.py
+++ b/aria/storage/modeling/orchestrator_elements.py
@@ -466,3 +466,15 @@ class TaskBase(ModelMixin):
@staticmethod
def retry(message=None, retry_interval=None):
raise TaskRetryException(message, retry_interval=retry_interval)
+
+
+class LogBase(ModelMixin):
+ __tablename__ = 'log'
+
+ logger = Column(String)
+ level = Column(String)
+ msg = Column(String)
+ created_at = Column(DateTime, index=True)
+
+ def __repr__(self):
+ return "<Log: {0} - {1}>".format(self.created_at, self.msg[:50])
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/tests/mock/topology.py
----------------------------------------------------------------------
diff --git a/tests/mock/topology.py b/tests/mock/topology.py
index b04fb46..d3e8b7b 100644
--- a/tests/mock/topology.py
+++ b/tests/mock/topology.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from aria.storage import model
+from aria.storage.modeling import model
from . import models
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index 3f39979..08d360c 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -38,8 +38,7 @@ global_test_holder = {}
@pytest.fixture
def ctx(tmpdir):
context = mock.context.simple(
- str(tmpdir.join('workdir')),
- inmemory=True,
+ str(tmpdir),
context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
)
yield context
@@ -61,7 +60,7 @@ def test_node_operation_task_execution(ctx, executor):
node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
interface = mock.models.get_interface(
operation_name,
- operation_kwargs=dict(implementation=op_path(my_operation, module_path=__name__))
+ operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__))
)
node.interfaces = [interface]
ctx.model.node.update(node)
@@ -102,7 +101,7 @@ def test_relationship_operation_task_execution(ctx, executor):
interface = mock.models.get_interface(
operation_name=operation_name,
- operation_kwargs=dict(implementation=op_path(my_operation, module_path=__name__)),
+ operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__)),
edge='source'
)
@@ -210,8 +209,73 @@ def test_plugin_workdir(ctx, executor, tmpdir):
assert expected_file.read() == content
+def test_operation_logging(ctx, executor):
+ operation_name = 'aria.interfaces.lifecycle.create'
+
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+ interface = mock.models.get_interface(
+ operation_name,
+ operation_kwargs=dict(implementation=op_path(logged_operation, module_path=__name__))
+ )
+ node.interfaces = [interface]
+ ctx.model.node.update(node)
+
+ wf_start = 'wf_start'
+ wf_end = 'wf_end'
+
+ inputs = {
+ 'op_start': 'op_start',
+ 'op_end': 'op_end',
+ }
+
+ @workflow
+ def basic_workflow(graph, ctx, **_):
+ ctx.logger.info(wf_start)
+ graph.add_tasks(
+ api.task.OperationTask.node(
+ name=operation_name,
+ instance=node,
+ inputs=inputs
+ )
+ )
+ ctx.logger.debug(wf_end)
+
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+
+ op_start_log = ctx.model.log.list(filters=dict(msg=inputs['op_start']))
+ assert len(op_start_log) == 1
+ op_start_log = op_start_log[0]
+ assert op_start_log.level.lower() == 'info'
+
+ op_end_log = ctx.model.log.list(filters=dict(msg=inputs['op_end']))
+ assert len(op_end_log) == 1
+ op_end_log = op_end_log[0]
+ assert op_end_log.level.lower() == 'debug'
+
+ wf_start_log = ctx.model.log.list(filters=dict(msg=wf_start))
+ assert len(wf_start_log) == 1
+ wf_start_log = wf_start_log[0]
+ assert wf_start_log.level.lower() == 'info'
+
+ wf_end_log = ctx.model.log.list(filters=dict(msg=wf_end))
+ assert len(wf_end_log) == 1
+ wf_end_log = wf_end_log[0]
+ assert wf_end_log.level.lower() == 'debug'
+
+ assert (wf_start_log.created_at <
+ wf_end_log.created_at <
+ op_start_log.created_at <
+ op_end_log.created_at)
+
+
+@operation
+def logged_operation(ctx, **_):
+ ctx.logger.info(ctx.task.inputs['op_start'])
+ ctx.logger.debug(ctx.task.inputs['op_end'])
+
+
@operation
-def my_operation(ctx, **_):
+def basic_operation(ctx, **_):
global_test_holder[ctx.name] = ctx
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/tests/storage/__init__.py
----------------------------------------------------------------------
diff --git a/tests/storage/__init__.py b/tests/storage/__init__.py
index 4278831..5323d01 100644
--- a/tests/storage/__init__.py
+++ b/tests/storage/__init__.py
@@ -12,8 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
-import platform
+
from shutil import rmtree
from tempfile import mkdtemp
@@ -23,19 +22,19 @@ from sqlalchemy import (
Column,
Text,
Integer,
- pool
+ pool,
+ MetaData
)
-from aria.storage import (
+from aria.storage.modeling import (
model,
- type as aria_type,
structure,
- modeling
+ type as aria_type
)
-class MockModel(model.aria_declarative_base, structure.ModelMixin): #pylint: disable=abstract-method
+class MockModel(structure.ModelMixin, model.aria_declarative_base): #pylint: disable=abstract-method
__tablename__ = 'mock_model'
model_dict = Column(aria_type.Dict)
model_list = Column(aria_type.List)
@@ -58,14 +57,8 @@ def release_sqlite_storage(storage):
:param storage:
:return:
"""
- mapis = storage.registered.values()
-
- if mapis:
- for session in set(mapi._session for mapi in mapis):
- session.rollback()
- session.close()
- for engine in set(mapi._engine for mapi in mapis):
- model.aria_declarative_base.metadata.drop_all(engine)
+ storage.all_api_kwargs['session'].close()
+ MetaData(bind=storage.all_api_kwargs['engine']).drop_all()
def init_inmemory_model_storage():
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/tests/storage/test_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py
index 08d5ae0..7f0eb02 100644
--- a/tests/storage/test_instrumentation.py
+++ b/tests/storage/test_instrumentation.py
@@ -17,13 +17,15 @@ import pytest
from sqlalchemy import Column, Text, Integer, event
from aria.storage import (
- structure,
ModelStorage,
sql_mapi,
instrumentation,
exceptions,
+)
+from aria.storage.modeling import (
+ model,
type as aria_type,
- model
+ structure,
)
from ..storage import release_sqlite_storage, init_inmemory_model_storage