You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/03/07 10:16:09 UTC

incubator-ariatosca git commit: initial commit

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-117-Log-model-should-have-an-Task-field c0d76adaf -> b4896a504


initial commit


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b4896a50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b4896a50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b4896a50

Branch: refs/heads/ARIA-117-Log-model-should-have-an-Task-field
Commit: b4896a504d2d267e89c6c0a6882bc40b40d9315f
Parents: c0d76ad
Author: max-orlov <ma...@gigaspaces.com>
Authored: Mon Mar 6 16:43:58 2017 +0200
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Mar 7 12:00:17 2017 +0200

----------------------------------------------------------------------
 aria/logger.py                                 |  1 +
 aria/orchestrator/context/common.py            | 11 ++++---
 aria/orchestrator/context/operation.py         |  2 +-
 aria/orchestrator/workflows/executor/thread.py |  4 +++
 aria/storage/modeling/orchestrator_elements.py |  8 +++++
 tests/orchestrator/context/test_operation.py   | 36 +++++++++++++++------
 6 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b4896a50/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index 6f0b84a..42e3679 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -164,6 +164,7 @@ class _SQLAlchemyHandler(logging.Handler):
                                        '%Y-%m-%d %H:%M:%S,%f')
         log = self._cls(
             execution_fk=self._execution_id,
+            task_fk=record.task_id,
             actor=record.prefix,
             level=record.levelname,
             msg=record.msg,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b4896a50/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 1d228b6..bb9d839 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -36,13 +36,15 @@ class BaseContext(object):
     """
 
     class PrefixedLogger(object):
-        def __init__(self, logger, prefix=''):
+        def __init__(self, logger, prefix='', task_id=None):
             self._logger = logger
             self._prefix = prefix
+            self._task_id = task_id
 
         def __getattr__(self, item):
             if item.upper() in logging._levelNames:
-                return partial(getattr(self._logger, item), extra={'prefix': self._prefix})
+                return partial(getattr(self._logger, item),
+                               extra={'prefix': self._prefix, 'task_id': self._task_id})
             else:
                 return getattr(self._logger, item)
 
@@ -74,9 +76,10 @@ class BaseContext(object):
         self.model.execution.put(execution)
         return execution.id
 
-    def _register_logger(self, logger_name=None, level=None):
+    def _register_logger(self, logger_name=None, level=None, task_id=None):
         self.logger = self.PrefixedLogger(logging.getLogger(logger_name or self.__class__.__name__),
-                                          self.logging_id)
+                                          self.logging_id,
+                                          task_id=task_id)
         self.logger.addHandler(aria_logger.create_console_log_handler())
         self.logger.addHandler(self._get_sqla_handler())
         self.logger.setLevel(level or logging.DEBUG)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b4896a50/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index cbaa462..ed0791c 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -46,7 +46,7 @@ class BaseOperationContext(BaseContext):
         self._actor_id = actor_id
         self._task = None
         self._execution_id = execution_id
-        self._register_logger()
+        self._register_logger(task_id=self.task.id)
 
     def __repr__(self):
         details = 'implementation={task.implementation}; ' \

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b4896a50/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 6c59986..16b22e3 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -17,6 +17,7 @@
 Thread based executor
 """
 
+import time
 import Queue
 import threading
 
@@ -58,6 +59,9 @@ class ThreadExecutor(BaseExecutor):
                 self._task_started(task)
                 try:
                     task_func = imports.load_attribute(task.implementation)
+                    # Some of the changes (mainly the logs fail to propagate if not enough time
+                    # is given
+                    time.sleep(0.1)
                     task_func(ctx=task.context, **task.inputs)
                     self._task_succeeded(task)
                 except BaseException as e:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b4896a50/aria/storage/modeling/orchestrator_elements.py
----------------------------------------------------------------------
diff --git a/aria/storage/modeling/orchestrator_elements.py b/aria/storage/modeling/orchestrator_elements.py
index d06b5d0..ef773ed 100644
--- a/aria/storage/modeling/orchestrator_elements.py
+++ b/aria/storage/modeling/orchestrator_elements.py
@@ -479,6 +479,14 @@ class LogBase(ModelMixin):
     def execution(cls):
         return cls.many_to_one_relationship('execution')
 
+    @declared_attr
+    def task_fk(cls):
+        return cls.foreign_key('task', nullable=True)
+
+    @declared_attr
+    def task(cls):
+        return cls.many_to_one_relationship('task')
+
     level = Column(String)
     msg = Column(String)
     created_at = Column(DateTime, index=True)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b4896a50/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index 8db2bc6..b49b1cb 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -49,12 +49,21 @@ def ctx(tmpdir):
 
 
 @pytest.fixture
+def process_executor():
+    ex = process.ProcessExecutor(**dict(python_path=tests.ROOT_DIR))
+    try:
+        yield ex
+    finally:
+        ex.close()
+
+
+@pytest.fixture
 def thread_executor():
-    result = thread.ThreadExecutor()
+    ex = thread.ThreadExecutor()
     try:
-        yield result
+        yield ex
     finally:
-        result.close()
+        ex.close()
 
 
 def test_node_operation_task_execution(ctx, thread_executor):
@@ -213,16 +222,16 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
 
 
 @pytest.fixture(params=[
-    # (thread.ThreadExecutor, dict()),
-    (process.ProcessExecutor, dict(python_path=tests.ROOT_DIR))
+    (thread.ThreadExecutor, {}),
+    (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
 ])
 def executor(request):
-    ex_cls, kwargs = request.param
-    ex = ex_cls(**kwargs)
+    executor_cls, executor_kwargs = request.param
+    result = executor_cls(**executor_kwargs)
     try:
-        yield ex
+        yield result
     finally:
-        ex.close()
+        result.close()
 
 
 def test_node_operation_logging(ctx, executor):
@@ -295,10 +304,17 @@ def _assert_loggins(ctx, inputs):
     assert len(executions) == 1
     execution = executions[0]
 
+    tasks = ctx.model.task.list()
+    assert len(tasks) == 1
+    task = tasks[0]
+    assert task.logs.count() == 4
+
     logs = ctx.model.log.list()
     assert len(logs) == execution.logs.count() == 6
-    assert all(l in logs for l in execution.logs)
+    assert set(logs) == set(execution.logs)
+
     assert all(l.execution == execution for l in logs)
+    assert all(l in logs and l.task == task for l in task.logs)
 
     op_start_log = [l for l in logs if inputs['op_start'] in l.msg and l.level.lower() == 'info']
     assert len(op_start_log) == 1