You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/02/13 18:09:47 UTC

[7/7] incubator-ariatosca git commit: wip

wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/67b75425
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/67b75425
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/67b75425

Branch: refs/heads/ARIA-106-Create-sqla-logging-handler
Commit: 67b754254760db0f1276bc10a293c80c1f5f8a54
Parents: b619335
Author: mxmrlv <mx...@gmail.com>
Authored: Mon Feb 13 12:28:27 2017 +0200
Committer: mxmrlv <mx...@gmail.com>
Committed: Mon Feb 13 20:09:21 2017 +0200

----------------------------------------------------------------------
 aria/__init__.py                                |  3 +-
 aria/logger.py                                  | 42 ++++++++++-
 aria/orchestrator/context/common.py             | 22 +++++-
 aria/orchestrator/workflows/core/engine.py      |  5 ++
 aria/orchestrator/workflows/executor/process.py |  1 -
 aria/storage/__init__.py                        |  6 --
 aria/storage/core.py                            | 16 +++--
 aria/storage/modeling/model.py                  |  4 ++
 aria/storage/modeling/orchestrator_elements.py  | 12 ++++
 tests/mock/topology.py                          |  2 +-
 tests/orchestrator/context/test_operation.py    | 74 ++++++++++++++++++--
 tests/storage/__init__.py                       | 23 +++---
 tests/storage/test_instrumentation.py           |  6 +-
 13 files changed, 176 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/__init__.py
----------------------------------------------------------------------
diff --git a/aria/__init__.py b/aria/__init__.py
index 18eaa56..43529f0 100644
--- a/aria/__init__.py
+++ b/aria/__init__.py
@@ -97,7 +97,8 @@ def application_model_storage(api, api_kwargs=None, initiator=None, initiator_kw
         storage.modeling.model.ServiceInstanceUpdateStep,
         storage.modeling.model.ServiceInstanceModification,
         storage.modeling.model.Plugin,
-        storage.modeling.model.Task
+        storage.modeling.model.Task,
+        storage.modeling.model.Log
     ]
     return storage.ModelStorage(api_cls=api,
                                 api_kwargs=api_kwargs,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index 0002cb5..f19d286 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -17,8 +17,10 @@
 Logging related mixins and functions
 """
 
+
 import logging
-from logging.handlers import RotatingFileHandler
+from logging import handlers as logging_handlers
+from datetime import datetime
 
 _base_logger = logging.getLogger('aria')
 
@@ -124,7 +126,7 @@ def create_file_log_handler(
     """
     Create a logging.handlers.RotatingFileHandler
     """
-    rotating_file = RotatingFileHandler(
+    rotating_file = logging_handlers.RotatingFileHandler(
         filename=file_path,
         maxBytes=max_bytes,
         backupCount=backup_count,
@@ -135,5 +137,41 @@ def create_file_log_handler(
     return rotating_file
 
 
+class SQLAlchemyHandler(logging.Handler):
+    def __init__(self, session, engine, log_cls, **kwargs):
+        self._session = session
+        self._engine = engine
+        self._cls = log_cls
+        super(SQLAlchemyHandler, self).__init__(**kwargs)
+
+    def emit(self, record):
+        log = self._cls(
+            logger=record.name,
+            level=record.levelname,
+            msg=record.msg,
+            created_at=datetime.utcnow()
+        )
+        self._session.add(log)
+        self._session.commit()
+
+
+class _SQLAlchemyHandlerFactory(object):
+    from aria.storage.model import Log
+
+    def __init__(self):
+        self._handler = None
+
+    def __call__(self, session, engine, model_cls=Log, level=logging.DEBUG):
+        if self._handler is None or not self._is_eq(session, engine, model_cls):
+            self._handler = SQLAlchemyHandler(session, engine, model_cls, level=level)
+        return self._handler
+
+    def _is_eq(self, session, engine, model_cls):
+        return all([self._handler._session == session,
+                    self._handler._engine == engine,
+                    self._handler._cls == model_cls])
+
+create_sqla_log_handler = _SQLAlchemyHandlerFactory()
+
 _default_file_formatter = logging.Formatter(
     '%(asctime)s [%(name)s:%(levelname)s] %(message)s <%(pathname)s:%(lineno)d>')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 37482cf..d5ef42c 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -17,13 +17,15 @@ A common context for both workflow and operation
 """
 from uuid import uuid4
 
+import logging
+
 import jinja2
 
 from aria import logger
 from aria.storage import exceptions
 
 
-class BaseContext(logger.LoggerMixin):
+class BaseContext(object):
     """
     Base context object for workflow and operation
     """
@@ -34,6 +36,7 @@ class BaseContext(logger.LoggerMixin):
             service_instance_id,
             model_storage,
             resource_storage,
+            ctx_logger=None,
             workdir=None,
             **kwargs):
         super(BaseContext, self).__init__(**kwargs)
@@ -42,8 +45,21 @@ class BaseContext(logger.LoggerMixin):
         self._model = model_storage
         self._resource = resource_storage
         self._service_instance_id = service_instance_id
+        self._logger = self._init_logger(ctx_logger)
         self._workdir = workdir
 
+    def _init_logger(self, ctx_logger=None):
+        ctx_logger = ctx_logger or logging.getLogger('aria_ctx')
+
+        # A handler should be registered only once.
+        sqla_handler = logger.create_sqla_log_handler(**self._model.all_api_kwargs)
+        if sqla_handler not in ctx_logger.handlers:
+            ctx_logger.addHandler(sqla_handler)
+
+        ctx_logger.setLevel(logging.DEBUG)
+
+        return ctx_logger
+
     def __repr__(self):
         return (
             '{name}(name={self.name}, '
@@ -51,6 +67,10 @@ class BaseContext(logger.LoggerMixin):
             .format(name=self.__class__.__name__, self=self))
 
     @property
+    def logger(self):
+        return self._logger
+
+    @property
     def model(self):
         """
         Access to the model storage

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 55b4159..7148dd1 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -71,6 +71,11 @@ class Engine(logger.LoggerMixin):
         except BaseException as e:
             events.on_failure_workflow_signal.send(self._workflow_context, exception=e)
             raise
+        finally:
+            # Each context creates its own handlers an assign them to the logger.
+            # This enables easy serialization. In order the handlers would not overlap, we
+            # need to clear them each execution.
+            self._workflow_context.logger.handlers = []
 
     def cancel_execution(self):
         """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 560ac43..acc0828 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -324,7 +324,6 @@ def _main():
     # This is required for the instrumentation work properly.
     # See docstring of `remove_mutable_association_listener` for further details
     storage_type.remove_mutable_association_listener()
-
     with instrumentation.track_changes() as instrument:
         try:
             ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/storage/__init__.py
----------------------------------------------------------------------
diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py
index eaadc7e..b76bdf2 100644
--- a/aria/storage/__init__.py
+++ b/aria/storage/__init__.py
@@ -42,12 +42,6 @@ from .core import (
     ModelStorage,
     ResourceStorage,
 )
-from .modeling import (
-    structure,
-    model,
-    model_base,
-    type
-)
 from . import (
     exceptions,
     api,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/storage/core.py
----------------------------------------------------------------------
diff --git a/aria/storage/core.py b/aria/storage/core.py
index 0e189e6..7d70070 100644
--- a/aria/storage/core.py
+++ b/aria/storage/core.py
@@ -84,6 +84,12 @@ class Storage(LoggerMixin):
         self.logger.debug('{name} object is ready: {0!r}'.format(
             self, name=self.__class__.__name__))
 
+    @property
+    def all_api_kwargs(self):
+        kwargs = self._api_kwargs.copy()
+        kwargs.update(self._additional_api_kwargs)
+        return kwargs
+
     def __repr__(self):
         return '{name}(api={self.api})'.format(name=self.__class__.__name__, self=self)
 
@@ -121,9 +127,7 @@ class ResourceStorage(Storage):
         :param name:
         :return:
         """
-        kwargs = self._api_kwargs.copy()
-        kwargs.update(self._additional_api_kwargs)
-        self.registered[name] = self.api(name=name, **kwargs)
+        self.registered[name] = self.api(name=name, **self.all_api_kwargs)
         self.registered[name].create()
         self.logger.debug('setup {name} in storage {self!r}'.format(name=name, self=self))
 
@@ -148,9 +152,9 @@ class ModelStorage(Storage):
             self.logger.debug('{name} in already storage {self!r}'.format(name=model_name,
                                                                           self=self))
             return
-        kwargs = self._api_kwargs.copy()
-        kwargs.update(self._additional_api_kwargs)
-        self.registered[model_name] = self.api(name=model_name, model_cls=model_cls, **kwargs)
+        self.registered[model_name] = self.api(name=model_name,
+                                               model_cls=model_cls,
+                                               **self.all_api_kwargs)
         self.registered[model_name].create()
         self.logger.debug('setup {name} in storage {self!r}'.format(name=model_name, self=self))
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/storage/modeling/model.py
----------------------------------------------------------------------
diff --git a/aria/storage/modeling/model.py b/aria/storage/modeling/model.py
index 62b90b3..cf7d933 100644
--- a/aria/storage/modeling/model.py
+++ b/aria/storage/modeling/model.py
@@ -216,4 +216,8 @@ class Plugin(aria_declarative_base, orchestrator_elements.PluginBase):
 
 class Task(aria_declarative_base, orchestrator_elements.TaskBase):
     pass
+
+
+class Log(aria_declarative_base, orchestrator_elements.LogBase):
+    pass
 # endregion

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/storage/modeling/orchestrator_elements.py
----------------------------------------------------------------------
diff --git a/aria/storage/modeling/orchestrator_elements.py b/aria/storage/modeling/orchestrator_elements.py
index 5f7a3f2..8efc147 100644
--- a/aria/storage/modeling/orchestrator_elements.py
+++ b/aria/storage/modeling/orchestrator_elements.py
@@ -466,3 +466,15 @@ class TaskBase(ModelMixin):
     @staticmethod
     def retry(message=None, retry_interval=None):
         raise TaskRetryException(message, retry_interval=retry_interval)
+
+
+class LogBase(ModelMixin):
+    __tablename__ = 'log'
+
+    logger = Column(String)
+    level = Column(String)
+    msg = Column(String)
+    created_at = Column(DateTime, index=True)
+
+    def __repr__(self):
+        return "<Log: {0} - {1}>".format(self.created_at, self.msg[:50])

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/tests/mock/topology.py
----------------------------------------------------------------------
diff --git a/tests/mock/topology.py b/tests/mock/topology.py
index b04fb46..d3e8b7b 100644
--- a/tests/mock/topology.py
+++ b/tests/mock/topology.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from aria.storage import model
+from aria.storage.modeling import model
 
 from . import models
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index 3f39979..08d360c 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -38,8 +38,7 @@ global_test_holder = {}
 @pytest.fixture
 def ctx(tmpdir):
     context = mock.context.simple(
-        str(tmpdir.join('workdir')),
-        inmemory=True,
+        str(tmpdir),
         context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
     )
     yield context
@@ -61,7 +60,7 @@ def test_node_operation_task_execution(ctx, executor):
     node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
     interface = mock.models.get_interface(
         operation_name,
-        operation_kwargs=dict(implementation=op_path(my_operation, module_path=__name__))
+        operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__))
     )
     node.interfaces = [interface]
     ctx.model.node.update(node)
@@ -102,7 +101,7 @@ def test_relationship_operation_task_execution(ctx, executor):
 
     interface = mock.models.get_interface(
         operation_name=operation_name,
-        operation_kwargs=dict(implementation=op_path(my_operation, module_path=__name__)),
+        operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__)),
         edge='source'
     )
 
@@ -210,8 +209,73 @@ def test_plugin_workdir(ctx, executor, tmpdir):
     assert expected_file.read() == content
 
 
+def test_operation_logging(ctx, executor):
+    operation_name = 'aria.interfaces.lifecycle.create'
+
+    node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+    interface = mock.models.get_interface(
+        operation_name,
+        operation_kwargs=dict(implementation=op_path(logged_operation, module_path=__name__))
+    )
+    node.interfaces = [interface]
+    ctx.model.node.update(node)
+
+    wf_start = 'wf_start'
+    wf_end = 'wf_end'
+
+    inputs = {
+        'op_start': 'op_start',
+        'op_end': 'op_end',
+    }
+
+    @workflow
+    def basic_workflow(graph, ctx, **_):
+        ctx.logger.info(wf_start)
+        graph.add_tasks(
+            api.task.OperationTask.node(
+                name=operation_name,
+                instance=node,
+                inputs=inputs
+            )
+        )
+        ctx.logger.debug(wf_end)
+
+    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+
+    op_start_log = ctx.model.log.list(filters=dict(msg=inputs['op_start']))
+    assert len(op_start_log) == 1
+    op_start_log = op_start_log[0]
+    assert op_start_log.level.lower() == 'info'
+
+    op_end_log = ctx.model.log.list(filters=dict(msg=inputs['op_end']))
+    assert len(op_end_log) == 1
+    op_end_log = op_end_log[0]
+    assert op_end_log.level.lower() == 'debug'
+
+    wf_start_log = ctx.model.log.list(filters=dict(msg=wf_start))
+    assert len(wf_start_log) == 1
+    wf_start_log = wf_start_log[0]
+    assert wf_start_log.level.lower() == 'info'
+
+    wf_end_log = ctx.model.log.list(filters=dict(msg=wf_end))
+    assert len(wf_end_log) == 1
+    wf_end_log = wf_end_log[0]
+    assert wf_end_log.level.lower() == 'debug'
+
+    assert (wf_start_log.created_at <
+            wf_end_log.created_at <
+            op_start_log.created_at <
+            op_end_log.created_at)
+
+
+@operation
+def logged_operation(ctx, **_):
+    ctx.logger.info(ctx.task.inputs['op_start'])
+    ctx.logger.debug(ctx.task.inputs['op_end'])
+
+
 @operation
-def my_operation(ctx, **_):
+def basic_operation(ctx, **_):
     global_test_holder[ctx.name] = ctx
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/tests/storage/__init__.py
----------------------------------------------------------------------
diff --git a/tests/storage/__init__.py b/tests/storage/__init__.py
index 4278831..5323d01 100644
--- a/tests/storage/__init__.py
+++ b/tests/storage/__init__.py
@@ -12,8 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import os
-import platform
+
 from shutil import rmtree
 from tempfile import mkdtemp
 
@@ -23,19 +22,19 @@ from sqlalchemy import (
     Column,
     Text,
     Integer,
-    pool
+    pool,
+    MetaData
 )
 
 
-from aria.storage import (
+from aria.storage.modeling import (
     model,
-    type as aria_type,
     structure,
-    modeling
+    type as aria_type
 )
 
 
-class MockModel(model.aria_declarative_base, structure.ModelMixin): #pylint: disable=abstract-method
+class MockModel(structure.ModelMixin, model.aria_declarative_base): #pylint: disable=abstract-method
     __tablename__ = 'mock_model'
     model_dict = Column(aria_type.Dict)
     model_list = Column(aria_type.List)
@@ -58,14 +57,8 @@ def release_sqlite_storage(storage):
     :param storage:
     :return:
     """
-    mapis = storage.registered.values()
-
-    if mapis:
-        for session in set(mapi._session for mapi in mapis):
-            session.rollback()
-            session.close()
-        for engine in set(mapi._engine for mapi in mapis):
-            model.aria_declarative_base.metadata.drop_all(engine)
+    storage.all_api_kwargs['session'].close()
+    MetaData(bind=storage.all_api_kwargs['engine']).drop_all()
 
 
 def init_inmemory_model_storage():

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/tests/storage/test_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py
index 08d5ae0..7f0eb02 100644
--- a/tests/storage/test_instrumentation.py
+++ b/tests/storage/test_instrumentation.py
@@ -17,13 +17,15 @@ import pytest
 from sqlalchemy import Column, Text, Integer, event
 
 from aria.storage import (
-    structure,
     ModelStorage,
     sql_mapi,
     instrumentation,
     exceptions,
+)
+from aria.storage.modeling import (
+    model,
     type as aria_type,
-    model
+    structure,
 )
 from ..storage import release_sqlite_storage, init_inmemory_model_storage