You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/02/21 14:24:34 UTC
incubator-ariatosca git commit: finalyzing [Forced Update!]
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-106-Create-sqla-logging-handler 4e3fb0074 -> 4995bc307 (forced update)
finalyzing
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4995bc30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4995bc30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4995bc30
Branch: refs/heads/ARIA-106-Create-sqla-logging-handler
Commit: 4995bc3074540431e2039aa6af23081001133d59
Parents: eba3e87
Author: mxmrlv <mx...@gmail.com>
Authored: Tue Feb 21 14:29:53 2017 +0200
Committer: mxmrlv <mx...@gmail.com>
Committed: Tue Feb 21 16:24:08 2017 +0200
----------------------------------------------------------------------
aria/logger.py | 29 ++++++---
aria/orchestrator/context/common.py | 27 ++++----
aria/orchestrator/context/operation.py | 4 ++
aria/orchestrator/decorators.py | 16 +++--
.../orchestrator/execution_plugin/operations.py | 4 +-
aria/orchestrator/workflows/executor/process.py | 7 +-
aria/orchestrator/workflows/executor/thread.py | 15 ++---
aria/storage/__init__.py | 3 -
aria/storage/core.py | 4 +-
aria/storage/modeling/orchestrator_elements.py | 2 +-
tests/orchestrator/context/test_operation.py | 68 ++++++++++++--------
.../workflows/executor/test_executor.py | 6 +-
tests/test_logger.py | 3 +-
13 files changed, 109 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index 9bb62d7..ae6d4a6 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -99,6 +99,10 @@ def create_console_log_handler(level=logging.DEBUG, formatter=None):
return console
+def create_sqla_log_handler(session, engine, level=logging.DEBUG):
+ return _SQLAlchemyHandler(session=session, engine=engine, level=level)
+
+
class _DefaultConsoleFormat(logging.Formatter):
"""
_DefaultConsoleFormat class
@@ -108,10 +112,11 @@ class _DefaultConsoleFormat(logging.Formatter):
"""
def format(self, record):
try:
- if record.levelno == logging.INFO:
- self._fmt = '%(message)s'
+ if hasattr(record, 'prefix'):
+ self._fmt = '%(asctime)s: [%(levelname)s] @%(prefix)s ->%(message)s'
else:
- self._fmt = '%(levelname)s: %(message)s'
+ self._fmt = '%(asctime)s: [%(levelname)s] %(message)s'
+
except AttributeError:
return record.message
return logging.Formatter.format(self, record)
@@ -137,24 +142,28 @@ def create_file_log_handler(
return rotating_file
-class SQLAlchemyHandler(logging.Handler):
+class _SQLAlchemyHandler(logging.Handler):
- def __init__(self, session, engine, log_cls, **kwargs):
+ def __init__(self, session, engine, **kwargs):
logging.Handler.__init__(self, **kwargs)
self._session = session
self._engine = engine
- self._cls = log_cls
- def emit(self, record):
- # CHECK: why is this needed?
- self._cls.__table__.create(bind=self._engine, checkfirst=True)
+ # Cyclic dependency
+ from aria.storage.modeling.model import Log
+ self._cls = Log
+ def emit(self, record):
+ # pylint fails to recognize that this class does indeed have __table__
+ self._cls.__table__.create(bind=self._engine, checkfirst=True) # pylint: disable=no-member
+ created_at = datetime.strptime(logging.Formatter('%(asctime)s').formatTime(record),
+ '%Y-%m-%d %H:%M:%S,%f')
log = self._cls(
prefix=record.prefix,
logger=record.name,
level=record.levelname,
msg=record.msg,
- created_at=datetime.utcnow(),
+ created_at=created_at,
)
self._session.add(log)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 0c8c2f0..902598e 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -23,7 +23,7 @@ import logging
import jinja2
from aria import logger as aria_logger
-from aria.storage import exceptions, modeling
+from aria.storage import exceptions
class BaseContext(aria_logger.LoggerMixin):
@@ -62,7 +62,7 @@ class BaseContext(aria_logger.LoggerMixin):
def _get_sqla_handler(self):
api_kwargs = self._model._initiator(**self._model._initiator_kwargs)
api_kwargs.update(**self._model._api_kwargs)
- return aria_logger.SQLAlchemyHandler(log_cls=modeling.model.Log, **api_kwargs)
+ return aria_logger.create_sqla_log_handler(**api_kwargs)
def __repr__(self):
return (
@@ -71,21 +71,18 @@ class BaseContext(aria_logger.LoggerMixin):
.format(name=self.__class__.__name__, self=self))
@contextmanager
- def self_logging(self, logger=None):
- if not logger:
- sqla_handler = self._get_sqla_handler()
- console_handler = aria_logger.create_console_log_handler()
- if sqla_handler not in self.logger.handlers:
- # self.logger.addHandler(aria_logger.create_console_log_handler())
- self.logger.addHandler(sqla_handler)
+ def self_logging(self, handlers=None):
+ handlers = set(handlers) if handlers else set()
+ try:
+ handlers.add(aria_logger.create_console_log_handler())
+ handlers.add(self._get_sqla_handler())
+ for handler in handlers:
+ self.logger.addHandler(handler)
self.logger = self.PrefixedLogger(self.logger, self.logging_id)
yield self.logger
- sqla_handler._session.close()
- self.logger.removeHandler(sqla_handler)
- self.logger.removeHandler(console_handler)
- else:
- self.logger = logger
- yield self.logger
+ finally:
+ for handler in handlers:
+ self.logger.removeHandler(handler)
@property
def logging_id(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 766ebb5..97a09aa 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -52,6 +52,10 @@ class BaseOperationContext(BaseContext):
return '{name}({0})'.format(details, name=self.name)
@property
+ def logging_id(self):
+ raise NotImplementedError
+
+ @property
def task(self):
"""
The task in the model storage
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/orchestrator/decorators.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/decorators.py b/aria/orchestrator/decorators.py
index 1cc97db..f915813 100644
--- a/aria/orchestrator/decorators.py
+++ b/aria/orchestrator/decorators.py
@@ -29,7 +29,7 @@ from .workflows.api import task_graph
WORKFLOW_DECORATOR_RESERVED_ARGUMENTS = ('ctx', 'graph')
-def workflow(func=None, suffix_template='', logger=None):
+def workflow(func=None, suffix_template=''):
"""
Workflow decorator
"""
@@ -49,18 +49,21 @@ def workflow(func=None, suffix_template='', logger=None):
workflow_parameters.setdefault('graph', task_graph.TaskGraph(workflow_name))
validate_function_arguments(func, workflow_parameters)
with context.workflow.current.push(ctx):
- with ctx.self_logging(logger):
- func(**workflow_parameters)
+ func(**workflow_parameters)
return workflow_parameters['graph']
return _wrapper
-def operation(func=None, toolbelt=False, suffix_template='', logger=False):
+def operation(func=None, toolbelt=False, suffix_template='', logging_handlers=None):
"""
Operation decorator
"""
+
if func is None:
- return partial(operation, suffix_template=suffix_template, toolbelt=toolbelt)
+ return partial(operation,
+ suffix_template=suffix_template,
+ toolbelt=toolbelt,
+ logging_handlers=logging_handlers)
@wraps(func)
def _wrapper(**func_kwargs):
@@ -68,7 +71,8 @@ def operation(func=None, toolbelt=False, suffix_template='', logger=False):
operation_toolbelt = context.toolbelt(func_kwargs['ctx'])
func_kwargs.setdefault('toolbelt', operation_toolbelt)
validate_function_arguments(func, func_kwargs)
- with func_kwargs['ctx'].self_logging(logger):
+
+ with func_kwargs['ctx'].self_logging(handlers=logging_handlers):
return func(**func_kwargs)
return _wrapper
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/orchestrator/execution_plugin/operations.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/operations.py b/aria/orchestrator/execution_plugin/operations.py
index c86f697..5effa8a 100644
--- a/aria/orchestrator/execution_plugin/operations.py
+++ b/aria/orchestrator/execution_plugin/operations.py
@@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
-
from aria.orchestrator import operation
from . import local as local_operations
from .ssh import operations as ssh_operations
@@ -50,7 +48,7 @@ def run_script_with_ssh(ctx,
**kwargs)
-@operation(logger=logging.getLogger())
+@operation
def run_commands_with_ssh(ctx,
commands,
fabric_env=None,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 5f8a813..991fec6 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -200,7 +200,9 @@ class ProcessExecutor(base.BaseExecutor):
request_handler = self._request_handlers.get(request_type)
if not request_handler:
raise RuntimeError('Invalid request type: {0}'.format(request_type))
- request_handler(task_id=request['task_id'], request=request, response=response)
+ task_id = request['task_id']
+ # with self._tasks[task_id].self_logging():
+ request_handler(task_id=task_id, request=request, response=response)
except BaseException as e:
self.logger.debug('Error in process executor listener: {0}'.format(e))
@@ -252,7 +254,8 @@ class ProcessExecutor(base.BaseExecutor):
def _send_message(connection, message):
- # Packing the length of the entire msg using struct.pack. This enables later reading of the content.
+ # Packing the length of the entire msg using struct.pack.
+ # This enables later reading of the content.
def _pack(data):
return struct.pack(_INT_FMT, len(data))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 359513e..6c59986 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -55,14 +55,13 @@ class ThreadExecutor(BaseExecutor):
while not self._stopped:
try:
task = self._queue.get(timeout=1)
- with task.self_logging():
- self._task_started(task)
- try:
- task_func = imports.load_attribute(task.implementation)
- task_func(ctx=task.context, **task.inputs)
- self._task_succeeded(task)
- except BaseException as e:
- self._task_failed(task, exception=e)
+ self._task_started(task)
+ try:
+ task_func = imports.load_attribute(task.implementation)
+ task_func(ctx=task.context, **task.inputs)
+ self._task_succeeded(task)
+ except BaseException as e:
+ self._task_failed(task, exception=e)
# Daemon threads
except BaseException as e:
pass
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/storage/__init__.py
----------------------------------------------------------------------
diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py
index 5500358..45af1be 100644
--- a/aria/storage/__init__.py
+++ b/aria/storage/__init__.py
@@ -52,13 +52,10 @@ from . import (
__all__ = (
'exceptions',
- 'structure',
'Storage',
'ModelStorage',
'ResourceStorage',
'filesystem_rapi',
'sql_mapi',
'api',
- 'model',
- 'type',
)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/storage/core.py
----------------------------------------------------------------------
diff --git a/aria/storage/core.py b/aria/storage/core.py
index eaf2bac..7d70070 100644
--- a/aria/storage/core.py
+++ b/aria/storage/core.py
@@ -141,7 +141,7 @@ class ModelStorage(Storage):
kwargs['initiator'] = sql_mapi.init_storage
super(ModelStorage, self).__init__(*args, **kwargs)
- def register(self, model_cls, create_all=True):
+ def register(self, model_cls):
"""
Register the model into the model storage.
:param model_cls: the model to register.
@@ -155,7 +155,7 @@ class ModelStorage(Storage):
self.registered[model_name] = self.api(name=model_name,
model_cls=model_cls,
**self.all_api_kwargs)
- self.registered[model_name].create(create_all=create_all)
+ self.registered[model_name].create()
self.logger.debug('setup {name} in storage {self!r}'.format(name=model_name, self=self))
def drop(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/aria/storage/modeling/orchestrator_elements.py
----------------------------------------------------------------------
diff --git a/aria/storage/modeling/orchestrator_elements.py b/aria/storage/modeling/orchestrator_elements.py
index 366defc..47fe49f 100644
--- a/aria/storage/modeling/orchestrator_elements.py
+++ b/aria/storage/modeling/orchestrator_elements.py
@@ -479,5 +479,5 @@ class LogBase(ModelMixin):
description = Column(String)
def __repr__(self):
- return "<[{self.level}] {self.created_at}@{self.prefix}> {msg}".format(
+ return "{self.created_at}: [{self.level}] @{self.prefix} ->{msg}".format(
self=self, msg=self.msg[:50])
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index f6cc499..c2f5fd0 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -15,7 +15,11 @@
import os
+import logging
+import tempfile
+
import pytest
+
from aria.orchestrator.workflows.executor import process, thread
from aria import (
@@ -25,6 +29,7 @@ from aria import (
from aria.orchestrator import context
from aria.orchestrator.workflows import api
+import tests
from tests import mock, storage
from . import (
op_path,
@@ -45,6 +50,18 @@ def ctx(tmpdir):
storage.release_sqlite_storage(context.model)
+tmp_file = os.path.join(tempfile.gettempdir(), 'op_testing_const_file_name')
+
+
+@pytest.fixture(autouse=True)
+def tmpfile_cleanup():
+ try:
+ yield
+ finally:
+ if os.path.isfile(tmp_file):
+ os.remove(tmp_file)
+
+
@pytest.fixture
def thread_executor():
result = thread.ThreadExecutor()
@@ -54,13 +71,14 @@ def thread_executor():
result.close()
-@pytest.fixture
-def process_executor():
- result = process.ProcessExecutor()
+@pytest.fixture(params=[(thread.ThreadExecutor()),
+ (process.ProcessExecutor(python_path=tests.ROOT_DIR))])
+def executor(request):
+ ex = request.param
try:
- yield result
+ yield ex
finally:
- result.close()
+ ex.close()
def test_node_operation_task_execution(ctx, thread_executor):
@@ -218,7 +236,7 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
assert expected_file.read() == content
-def test_operation_logging(ctx, process_executor):
+def test_operation_logging(ctx, executor):
operation_name = 'aria.interfaces.lifecycle.create'
node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
@@ -229,17 +247,13 @@ def test_operation_logging(ctx, process_executor):
node.interfaces = [interface]
ctx.model.node.update(node)
- wf_start = 'wf_start'
- wf_end = 'wf_end'
-
inputs = {
'op_start': 'op_start',
'op_end': 'op_end',
}
@workflow
- def basic_workflow(graph, ctx, **_):
- ctx.logger.info(wf_start)
+ def basic_workflow(graph, **_):
graph.add_tasks(
api.task.OperationTask.node(
name=operation_name,
@@ -247,40 +261,40 @@ def test_operation_logging(ctx, process_executor):
inputs=inputs
)
)
- ctx.logger.info(wf_end)
- execute(workflow_func=basic_workflow, workflow_context=ctx, executor=process_executor)
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
logs = ctx.model.log.list()
- assert len(logs) == 4
+ assert len(logs) == 2
op_start_log = [l for l in logs if inputs['op_start'] in l.msg and l.level.lower() == 'info']
assert len(op_start_log) == 1
op_start_log = op_start_log[0]
- op_end_log = [l for l in logs if inputs['op_end'] in l.msg and l.level.lower() == 'info']
+ op_end_log = [l for l in logs if inputs['op_end'] in l.msg and l.level.lower() == 'debug']
assert len(op_end_log) == 1
op_end_log = op_end_log[0]
- wf_start_log = [l for l in logs if wf_start in l.msg and l.level.lower() == 'info']
- assert len(wf_start_log) == 1
- wf_start_log = wf_start_log[0]
+ assert op_start_log.created_at < op_end_log.created_at
- wf_end_log = [l for l in logs if wf_end in l.msg and l.level.lower() == 'info']
- assert len(wf_end_log) == 1
- wf_end_log = wf_end_log[0]
+ with open(tmp_file, 'r') as f:
+ logs = [l.strip() for l in f.readlines()]
- assert (wf_start_log.created_at <
- wf_end_log.created_at <
- op_start_log.created_at <
- op_end_log.created_at)
+ assert inputs['op_start'] in logs
+ assert inputs['op_end'] in logs
-@operation
+class MockLogHandler(logging.Handler):
+ def emit(self, record):
+ with open(tmp_file, 'a+') as f:
+ f.write(record.msg + '\n')
+
+
+@operation(logging_handlers=[MockLogHandler()])
def logged_operation(ctx, **_):
ctx.logger.info(ctx.task.inputs['op_start'])
- ctx.logger.info(ctx.task.inputs['op_end'])
+ ctx.logger.debug(ctx.task.inputs['op_end'])
@operation
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 953efb2..64dfb66 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -83,7 +83,7 @@ class MockException(Exception):
class MockContext(object):
def __init__(self, *args, **kwargs):
- pass
+ self.logger = logging.getLogger()
def __getattr__(self, item):
if item == 'serialization_dict':
@@ -95,6 +95,10 @@ class MockContext(object):
def deserialize_from_dict(cls, **kwargs):
return cls()
+ @contextmanager
+ def self_logging(self):
+ yield self.logger
+
class MockTask(object):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4995bc30/tests/test_logger.py
----------------------------------------------------------------------
diff --git a/tests/test_logger.py b/tests/test_logger.py
index 70f08bb..1ad055c 100644
--- a/tests/test_logger.py
+++ b/tests/test_logger.py
@@ -53,7 +53,8 @@ def test_create_console_log_handler(capsys):
logger.info(info_test_string)
logger.debug(debug_test_string)
_, err = capsys.readouterr()
- assert err.count('DEBUG: {test_string}'.format(test_string=debug_test_string)) == 1
+
+ assert err.count('[DEBUG] {test_string}'.format(test_string=debug_test_string))
assert err.count(info_test_string) == 1
# Custom handler