You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/03/07 10:16:09 UTC
incubator-ariatosca git commit: initial commit
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-117-Log-model-should-have-an-Task-field c0d76adaf -> b4896a504
initial commit
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b4896a50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b4896a50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b4896a50
Branch: refs/heads/ARIA-117-Log-model-should-have-an-Task-field
Commit: b4896a504d2d267e89c6c0a6882bc40b40d9315f
Parents: c0d76ad
Author: max-orlov <ma...@gigaspaces.com>
Authored: Mon Mar 6 16:43:58 2017 +0200
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Mar 7 12:00:17 2017 +0200
----------------------------------------------------------------------
aria/logger.py | 1 +
aria/orchestrator/context/common.py | 11 ++++---
aria/orchestrator/context/operation.py | 2 +-
aria/orchestrator/workflows/executor/thread.py | 4 +++
aria/storage/modeling/orchestrator_elements.py | 8 +++++
tests/orchestrator/context/test_operation.py | 36 +++++++++++++++------
6 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b4896a50/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index 6f0b84a..42e3679 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -164,6 +164,7 @@ class _SQLAlchemyHandler(logging.Handler):
'%Y-%m-%d %H:%M:%S,%f')
log = self._cls(
execution_fk=self._execution_id,
+ task_fk=record.task_id,
actor=record.prefix,
level=record.levelname,
msg=record.msg,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b4896a50/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 1d228b6..bb9d839 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -36,13 +36,15 @@ class BaseContext(object):
"""
class PrefixedLogger(object):
- def __init__(self, logger, prefix=''):
+ def __init__(self, logger, prefix='', task_id=None):
self._logger = logger
self._prefix = prefix
+ self._task_id = task_id
def __getattr__(self, item):
if item.upper() in logging._levelNames:
- return partial(getattr(self._logger, item), extra={'prefix': self._prefix})
+ return partial(getattr(self._logger, item),
+ extra={'prefix': self._prefix, 'task_id': self._task_id})
else:
return getattr(self._logger, item)
@@ -74,9 +76,10 @@ class BaseContext(object):
self.model.execution.put(execution)
return execution.id
- def _register_logger(self, logger_name=None, level=None):
+ def _register_logger(self, logger_name=None, level=None, task_id=None):
self.logger = self.PrefixedLogger(logging.getLogger(logger_name or self.__class__.__name__),
- self.logging_id)
+ self.logging_id,
+ task_id=task_id)
self.logger.addHandler(aria_logger.create_console_log_handler())
self.logger.addHandler(self._get_sqla_handler())
self.logger.setLevel(level or logging.DEBUG)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b4896a50/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index cbaa462..ed0791c 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -46,7 +46,7 @@ class BaseOperationContext(BaseContext):
self._actor_id = actor_id
self._task = None
self._execution_id = execution_id
- self._register_logger()
+ self._register_logger(task_id=self.task.id)
def __repr__(self):
details = 'implementation={task.implementation}; ' \
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b4896a50/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 6c59986..16b22e3 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -17,6 +17,7 @@
Thread based executor
"""
+import time
import Queue
import threading
@@ -58,6 +59,9 @@ class ThreadExecutor(BaseExecutor):
self._task_started(task)
try:
task_func = imports.load_attribute(task.implementation)
+ # Some of the changes (mainly the logs fail to propagate if not enough time
+ # is given
+ time.sleep(0.1)
task_func(ctx=task.context, **task.inputs)
self._task_succeeded(task)
except BaseException as e:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b4896a50/aria/storage/modeling/orchestrator_elements.py
----------------------------------------------------------------------
diff --git a/aria/storage/modeling/orchestrator_elements.py b/aria/storage/modeling/orchestrator_elements.py
index d06b5d0..ef773ed 100644
--- a/aria/storage/modeling/orchestrator_elements.py
+++ b/aria/storage/modeling/orchestrator_elements.py
@@ -479,6 +479,14 @@ class LogBase(ModelMixin):
def execution(cls):
return cls.many_to_one_relationship('execution')
+ @declared_attr
+ def task_fk(cls):
+ return cls.foreign_key('task', nullable=True)
+
+ @declared_attr
+ def task(cls):
+ return cls.many_to_one_relationship('task')
+
level = Column(String)
msg = Column(String)
created_at = Column(DateTime, index=True)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b4896a50/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index 8db2bc6..b49b1cb 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -49,12 +49,21 @@ def ctx(tmpdir):
@pytest.fixture
+def process_executor():
+ ex = process.ProcessExecutor(**dict(python_path=tests.ROOT_DIR))
+ try:
+ yield ex
+ finally:
+ ex.close()
+
+
+@pytest.fixture
def thread_executor():
- result = thread.ThreadExecutor()
+ ex = thread.ThreadExecutor()
try:
- yield result
+ yield ex
finally:
- result.close()
+ ex.close()
def test_node_operation_task_execution(ctx, thread_executor):
@@ -213,16 +222,16 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
@pytest.fixture(params=[
- # (thread.ThreadExecutor, dict()),
- (process.ProcessExecutor, dict(python_path=tests.ROOT_DIR))
+ (thread.ThreadExecutor, {}),
+ (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
])
def executor(request):
- ex_cls, kwargs = request.param
- ex = ex_cls(**kwargs)
+ executor_cls, executor_kwargs = request.param
+ result = executor_cls(**executor_kwargs)
try:
- yield ex
+ yield result
finally:
- ex.close()
+ result.close()
def test_node_operation_logging(ctx, executor):
@@ -295,10 +304,17 @@ def _assert_loggins(ctx, inputs):
assert len(executions) == 1
execution = executions[0]
+ tasks = ctx.model.task.list()
+ assert len(tasks) == 1
+ task = tasks[0]
+ assert task.logs.count() == 4
+
logs = ctx.model.log.list()
assert len(logs) == execution.logs.count() == 6
- assert all(l in logs for l in execution.logs)
+ assert set(logs) == set(execution.logs)
+
assert all(l.execution == execution for l in logs)
+ assert all(l in logs and l.task == task for l in task.logs)
op_start_log = [l for l in logs if inputs['op_start'] in l.msg and l.level.lower() == 'info']
assert len(op_start_log) == 1