You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/02/21 14:24:34 UTC

incubator-ariatosca git commit: finalyzing [Forced Update!]

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-106-Create-sqla-logging-handler 4e3fb0074 -> 4995bc307 (forced update)


finalyzing


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4995bc30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4995bc30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4995bc30

Branch: refs/heads/ARIA-106-Create-sqla-logging-handler
Commit: 4995bc3074540431e2039aa6af23081001133d59
Parents: eba3e87
Author: mxmrlv <mx...@gmail.com>
Authored: Tue Feb 21 14:29:53 2017 +0200
Committer: mxmrlv <mx...@gmail.com>
Committed: Tue Feb 21 16:24:08 2017 +0200

----------------------------------------------------------------------
 aria/logger.py                                  | 29 ++++++---
 aria/orchestrator/context/common.py             | 27 ++++----
 aria/orchestrator/context/operation.py          |  4 ++
 aria/orchestrator/decorators.py                 | 16 +++--
 .../orchestrator/execution_plugin/operations.py |  4 +-
 aria/orchestrator/workflows/executor/process.py |  7 +-
 aria/orchestrator/workflows/executor/thread.py  | 15 ++---
 aria/storage/__init__.py                        |  3 -
 aria/storage/core.py                            |  4 +-
 aria/storage/modeling/orchestrator_elements.py  |  2 +-
 tests/orchestrator/context/test_operation.py    | 68 ++++++++++++--------
 .../workflows/executor/test_executor.py         |  6 +-
 tests/test_logger.py                            |  3 +-
 13 files changed, 109 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index 9bb62d7..ae6d4a6 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -99,6 +99,10 @@ def create_console_log_handler(level=logging.DEBUG, formatter=None):
     return console
 
 
+def create_sqla_log_handler(session, engine, level=logging.DEBUG):
+    return _SQLAlchemyHandler(session=session, engine=engine, level=level)
+
+
 class _DefaultConsoleFormat(logging.Formatter):
     """
     _DefaultConsoleFormat class
@@ -108,10 +112,11 @@ class _DefaultConsoleFormat(logging.Formatter):
     """
     def format(self, record):
         try:
-            if record.levelno == logging.INFO:
-                self._fmt = '%(message)s'
+            if hasattr(record, 'prefix'):
+                self._fmt = '%(asctime)s: [%(levelname)s] @%(prefix)s ->%(message)s'
             else:
-                self._fmt = '%(levelname)s: %(message)s'
+                self._fmt = '%(asctime)s: [%(levelname)s] %(message)s'
+
         except AttributeError:
             return record.message
         return logging.Formatter.format(self, record)
@@ -137,24 +142,28 @@ def create_file_log_handler(
     return rotating_file
 
 
-class SQLAlchemyHandler(logging.Handler):
+class _SQLAlchemyHandler(logging.Handler):
 
-    def __init__(self, session, engine, log_cls, **kwargs):
+    def __init__(self, session, engine, **kwargs):
         logging.Handler.__init__(self, **kwargs)
         self._session = session
         self._engine = engine
-        self._cls = log_cls
 
-    def emit(self, record):
-        # CHECK: why is this needed?
-        self._cls.__table__.create(bind=self._engine, checkfirst=True)
+        # Cyclic dependency
+        from aria.storage.modeling.model import Log
+        self._cls = Log
 
+    def emit(self, record):
+        # pylint fails to recognize that this class does indeed have __table__
+        self._cls.__table__.create(bind=self._engine, checkfirst=True)                              # pylint: disable=no-member
+        created_at = datetime.strptime(logging.Formatter('%(asctime)s').formatTime(record),
+                                       '%Y-%m-%d %H:%M:%S,%f')
         log = self._cls(
             prefix=record.prefix,
             logger=record.name,
             level=record.levelname,
             msg=record.msg,
-            created_at=datetime.utcnow(),
+            created_at=created_at,
         )
         self._session.add(log)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 0c8c2f0..902598e 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -23,7 +23,7 @@ import logging
 import jinja2
 
 from aria import logger as aria_logger
-from aria.storage import exceptions, modeling
+from aria.storage import exceptions
 
 
 class BaseContext(aria_logger.LoggerMixin):
@@ -62,7 +62,7 @@ class BaseContext(aria_logger.LoggerMixin):
     def _get_sqla_handler(self):
         api_kwargs = self._model._initiator(**self._model._initiator_kwargs)
         api_kwargs.update(**self._model._api_kwargs)
-        return aria_logger.SQLAlchemyHandler(log_cls=modeling.model.Log, **api_kwargs)
+        return aria_logger.create_sqla_log_handler(**api_kwargs)
 
     def __repr__(self):
         return (
@@ -71,21 +71,18 @@ class BaseContext(aria_logger.LoggerMixin):
             .format(name=self.__class__.__name__, self=self))
 
     @contextmanager
-    def self_logging(self, logger=None):
-        if not logger:
-            sqla_handler = self._get_sqla_handler()
-            console_handler = aria_logger.create_console_log_handler()
-            if sqla_handler not in self.logger.handlers:
-                # self.logger.addHandler(aria_logger.create_console_log_handler())
-                self.logger.addHandler(sqla_handler)
+    def self_logging(self, handlers=None):
+        handlers = set(handlers) if handlers else set()
+        try:
+            handlers.add(aria_logger.create_console_log_handler())
+            handlers.add(self._get_sqla_handler())
+            for handler in handlers:
+                self.logger.addHandler(handler)
             self.logger = self.PrefixedLogger(self.logger, self.logging_id)
             yield self.logger
-            sqla_handler._session.close()
-            self.logger.removeHandler(sqla_handler)
-            self.logger.removeHandler(console_handler)
-        else:
-            self.logger = logger
-            yield self.logger
+        finally:
+            for handler in handlers:
+                self.logger.removeHandler(handler)
 
     @property
     def logging_id(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 766ebb5..97a09aa 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -52,6 +52,10 @@ class BaseOperationContext(BaseContext):
         return '{name}({0})'.format(details, name=self.name)
 
     @property
+    def logging_id(self):
+        raise NotImplementedError
+
+    @property
     def task(self):
         """
         The task in the model storage

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/orchestrator/decorators.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/decorators.py b/aria/orchestrator/decorators.py
index 1cc97db..f915813 100644
--- a/aria/orchestrator/decorators.py
+++ b/aria/orchestrator/decorators.py
@@ -29,7 +29,7 @@ from .workflows.api import task_graph
 WORKFLOW_DECORATOR_RESERVED_ARGUMENTS = ('ctx', 'graph')
 
 
-def workflow(func=None, suffix_template='', logger=None):
+def workflow(func=None, suffix_template=''):
     """
     Workflow decorator
     """
@@ -49,18 +49,21 @@ def workflow(func=None, suffix_template='', logger=None):
         workflow_parameters.setdefault('graph', task_graph.TaskGraph(workflow_name))
         validate_function_arguments(func, workflow_parameters)
         with context.workflow.current.push(ctx):
-            with ctx.self_logging(logger):
-                func(**workflow_parameters)
+            func(**workflow_parameters)
         return workflow_parameters['graph']
     return _wrapper
 
 
-def operation(func=None, toolbelt=False, suffix_template='', logger=False):
+def operation(func=None, toolbelt=False, suffix_template='', logging_handlers=None):
     """
     Operation decorator
     """
+
     if func is None:
-        return partial(operation, suffix_template=suffix_template, toolbelt=toolbelt)
+        return partial(operation,
+                       suffix_template=suffix_template,
+                       toolbelt=toolbelt,
+                       logging_handlers=logging_handlers)
 
     @wraps(func)
     def _wrapper(**func_kwargs):
@@ -68,7 +71,8 @@ def operation(func=None, toolbelt=False, suffix_template='', logger=False):
             operation_toolbelt = context.toolbelt(func_kwargs['ctx'])
             func_kwargs.setdefault('toolbelt', operation_toolbelt)
         validate_function_arguments(func, func_kwargs)
-        with func_kwargs['ctx'].self_logging(logger):
+
+        with func_kwargs['ctx'].self_logging(handlers=logging_handlers):
             return func(**func_kwargs)
     return _wrapper
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/orchestrator/execution_plugin/operations.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/operations.py b/aria/orchestrator/execution_plugin/operations.py
index c86f697..5effa8a 100644
--- a/aria/orchestrator/execution_plugin/operations.py
+++ b/aria/orchestrator/execution_plugin/operations.py
@@ -13,8 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
-
 from aria.orchestrator import operation
 from . import local as local_operations
 from .ssh import operations as ssh_operations
@@ -50,7 +48,7 @@ def run_script_with_ssh(ctx,
         **kwargs)
 
 
-@operation(logger=logging.getLogger())
+@operation
 def run_commands_with_ssh(ctx,
                           commands,
                           fabric_env=None,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 5f8a813..991fec6 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -200,7 +200,9 @@ class ProcessExecutor(base.BaseExecutor):
                     request_handler = self._request_handlers.get(request_type)
                     if not request_handler:
                         raise RuntimeError('Invalid request type: {0}'.format(request_type))
-                    request_handler(task_id=request['task_id'], request=request, response=response)
+                    task_id = request['task_id']
+                    # with self._tasks[task_id].self_logging():
+                    request_handler(task_id=task_id, request=request, response=response)
             except BaseException as e:
                 self.logger.debug('Error in process executor listener: {0}'.format(e))
 
@@ -252,7 +254,8 @@ class ProcessExecutor(base.BaseExecutor):
 
 def _send_message(connection, message):
 
-    # Packing the length of the entire msg using struct.pack. This enables later reading of the content.
+    # Packing the length of the entire msg using struct.pack.
+    # This enables later reading of the content.
     def _pack(data):
         return struct.pack(_INT_FMT, len(data))
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 359513e..6c59986 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -55,14 +55,13 @@ class ThreadExecutor(BaseExecutor):
         while not self._stopped:
             try:
                 task = self._queue.get(timeout=1)
-                with task.self_logging():
-                    self._task_started(task)
-                    try:
-                        task_func = imports.load_attribute(task.implementation)
-                        task_func(ctx=task.context, **task.inputs)
-                        self._task_succeeded(task)
-                    except BaseException as e:
-                        self._task_failed(task, exception=e)
+                self._task_started(task)
+                try:
+                    task_func = imports.load_attribute(task.implementation)
+                    task_func(ctx=task.context, **task.inputs)
+                    self._task_succeeded(task)
+                except BaseException as e:
+                    self._task_failed(task, exception=e)
             # Daemon threads
             except BaseException as e:
                 pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/storage/__init__.py
----------------------------------------------------------------------
diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py
index 5500358..45af1be 100644
--- a/aria/storage/__init__.py
+++ b/aria/storage/__init__.py
@@ -52,13 +52,10 @@ from . import (
 
 __all__ = (
     'exceptions',
-    'structure',
     'Storage',
     'ModelStorage',
     'ResourceStorage',
     'filesystem_rapi',
     'sql_mapi',
     'api',
-    'model',
-    'type',
 )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/storage/core.py
----------------------------------------------------------------------
diff --git a/aria/storage/core.py b/aria/storage/core.py
index eaf2bac..7d70070 100644
--- a/aria/storage/core.py
+++ b/aria/storage/core.py
@@ -141,7 +141,7 @@ class ModelStorage(Storage):
             kwargs['initiator'] = sql_mapi.init_storage
         super(ModelStorage, self).__init__(*args, **kwargs)
 
-    def register(self, model_cls, create_all=True):
+    def register(self, model_cls):
         """
         Register the model into the model storage.
         :param model_cls: the model to register.
@@ -155,7 +155,7 @@ class ModelStorage(Storage):
         self.registered[model_name] = self.api(name=model_name,
                                                model_cls=model_cls,
                                                **self.all_api_kwargs)
-        self.registered[model_name].create(create_all=create_all)
+        self.registered[model_name].create()
         self.logger.debug('setup {name} in storage {self!r}'.format(name=model_name, self=self))
 
     def drop(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/storage/modeling/orchestrator_elements.py
----------------------------------------------------------------------
diff --git a/aria/storage/modeling/orchestrator_elements.py b/aria/storage/modeling/orchestrator_elements.py
index 366defc..47fe49f 100644
--- a/aria/storage/modeling/orchestrator_elements.py
+++ b/aria/storage/modeling/orchestrator_elements.py
@@ -479,5 +479,5 @@ class LogBase(ModelMixin):
     description = Column(String)
 
     def __repr__(self):
-        return "<[{self.level}] {self.created_at}@{self.prefix}> {msg}".format(
+        return "{self.created_at}: [{self.level}] @{self.prefix} ->{msg}".format(
             self=self, msg=self.msg[:50])

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index f6cc499..c2f5fd0 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -15,7 +15,11 @@
 
 import os
 
+import logging
+import tempfile
+
 import pytest
+
 from aria.orchestrator.workflows.executor import process, thread
 
 from aria import (
@@ -25,6 +29,7 @@ from aria import (
 from aria.orchestrator import context
 from aria.orchestrator.workflows import api
 
+import tests
 from tests import mock, storage
 from . import (
     op_path,
@@ -45,6 +50,18 @@ def ctx(tmpdir):
     storage.release_sqlite_storage(context.model)
 
 
+tmp_file = os.path.join(tempfile.gettempdir(), 'op_testing_const_file_name')
+
+
+@pytest.fixture(autouse=True)
+def tmpfile_cleanup():
+    try:
+        yield
+    finally:
+        if os.path.isfile(tmp_file):
+            os.remove(tmp_file)
+
+
 @pytest.fixture
 def thread_executor():
     result = thread.ThreadExecutor()
@@ -54,13 +71,14 @@ def thread_executor():
         result.close()
 
 
-@pytest.fixture
-def process_executor():
-    result = process.ProcessExecutor()
+@pytest.fixture(params=[(thread.ThreadExecutor()),
+                        (process.ProcessExecutor(python_path=tests.ROOT_DIR))])
+def executor(request):
+    ex = request.param
     try:
-        yield result
+        yield ex
     finally:
-        result.close()
+        ex.close()
 
 
 def test_node_operation_task_execution(ctx, thread_executor):
@@ -218,7 +236,7 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
     assert expected_file.read() == content
 
 
-def test_operation_logging(ctx, process_executor):
+def test_operation_logging(ctx, executor):
     operation_name = 'aria.interfaces.lifecycle.create'
 
     node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
@@ -229,17 +247,13 @@ def test_operation_logging(ctx, process_executor):
     node.interfaces = [interface]
     ctx.model.node.update(node)
 
-    wf_start = 'wf_start'
-    wf_end = 'wf_end'
-
     inputs = {
         'op_start': 'op_start',
         'op_end': 'op_end',
     }
 
     @workflow
-    def basic_workflow(graph, ctx, **_):
-        ctx.logger.info(wf_start)
+    def basic_workflow(graph, **_):
         graph.add_tasks(
             api.task.OperationTask.node(
                 name=operation_name,
@@ -247,40 +261,40 @@ def test_operation_logging(ctx, process_executor):
                 inputs=inputs
             )
         )
-        ctx.logger.info(wf_end)
 
-    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=process_executor)
+    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
 
     logs = ctx.model.log.list()
 
-    assert len(logs) == 4
+    assert len(logs) == 2
 
     op_start_log = [l for l in logs if inputs['op_start'] in l.msg and l.level.lower() == 'info']
     assert len(op_start_log) == 1
     op_start_log = op_start_log[0]
 
-    op_end_log = [l for l in logs if inputs['op_end'] in l.msg and l.level.lower() == 'info']
+    op_end_log = [l for l in logs if inputs['op_end'] in l.msg and l.level.lower() == 'debug']
     assert len(op_end_log) == 1
     op_end_log = op_end_log[0]
 
-    wf_start_log = [l for l in logs if wf_start in l.msg and l.level.lower() == 'info']
-    assert len(wf_start_log) == 1
-    wf_start_log = wf_start_log[0]
+    assert op_start_log.created_at < op_end_log.created_at
 
-    wf_end_log = [l for l in logs if wf_end in l.msg and l.level.lower() == 'info']
-    assert len(wf_end_log) == 1
-    wf_end_log = wf_end_log[0]
+    with open(tmp_file, 'r') as f:
+        logs = [l.strip() for l in f.readlines()]
 
-    assert (wf_start_log.created_at <
-            wf_end_log.created_at <
-            op_start_log.created_at <
-            op_end_log.created_at)
+    assert inputs['op_start'] in logs
+    assert inputs['op_end'] in logs
 
 
-@operation
+class MockLogHandler(logging.Handler):
+    def emit(self, record):
+        with open(tmp_file, 'a+') as f:
+            f.write(record.msg + '\n')
+
+
+@operation(logging_handlers=[MockLogHandler()])
 def logged_operation(ctx, **_):
     ctx.logger.info(ctx.task.inputs['op_start'])
-    ctx.logger.info(ctx.task.inputs['op_end'])
+    ctx.logger.debug(ctx.task.inputs['op_end'])
 
 
 @operation

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 953efb2..64dfb66 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -83,7 +83,7 @@ class MockException(Exception):
 class MockContext(object):
 
     def __init__(self, *args, **kwargs):
-        pass
+        self.logger = logging.getLogger()
 
     def __getattr__(self, item):
         if item == 'serialization_dict':
@@ -95,6 +95,10 @@ class MockContext(object):
     def deserialize_from_dict(cls, **kwargs):
         return cls()
 
+    @contextmanager
+    def self_logging(self):
+        yield self.logger
+
 
 class MockTask(object):
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/tests/test_logger.py
----------------------------------------------------------------------
diff --git a/tests/test_logger.py b/tests/test_logger.py
index 70f08bb..1ad055c 100644
--- a/tests/test_logger.py
+++ b/tests/test_logger.py
@@ -53,7 +53,8 @@ def test_create_console_log_handler(capsys):
     logger.info(info_test_string)
     logger.debug(debug_test_string)
     _, err = capsys.readouterr()
-    assert err.count('DEBUG: {test_string}'.format(test_string=debug_test_string)) == 1
+
+    assert err.count('[DEBUG] {test_string}'.format(test_string=debug_test_string))
     assert err.count(info_test_string) == 1
 
     # Custom handler