You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by em...@apache.org on 2017/03/17 19:38:34 UTC

[06/18] incubator-ariatosca git commit: ARIA-117-Log-model-should-have-an-Task-field

ARIA-117-Log-model-should-have-an-Task-field


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/95177d0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/95177d0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/95177d0f

Branch: refs/heads/ARIA-105-integrate-modeling
Commit: 95177d0f7fdcedf9c32421e2557ddf965683525a
Parents: c0d76ad
Author: max-orlov <ma...@gigaspaces.com>
Authored: Mon Mar 6 16:43:58 2017 +0200
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Mon Mar 13 13:18:08 2017 +0200

----------------------------------------------------------------------
 aria/logger.py                                  |  1 +
 aria/orchestrator/context/common.py             | 11 +++---
 aria/orchestrator/context/operation.py          | 16 ++++++---
 aria/orchestrator/execution_plugin/common.py    |  8 ++---
 .../execution_plugin/ctx_proxy/server.py        |  6 +++-
 aria/orchestrator/execution_plugin/local.py     |  3 +-
 .../execution_plugin/ssh/operations.py          |  3 +-
 aria/storage/modeling/orchestrator_elements.py  |  8 +++++
 tests/orchestrator/context/test_operation.py    | 36 ++++++++++++++------
 .../orchestrator/execution_plugin/test_local.py |  3 +-
 10 files changed, 65 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index 6f0b84a..42e3679 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -164,6 +164,7 @@ class _SQLAlchemyHandler(logging.Handler):
                                        '%Y-%m-%d %H:%M:%S,%f')
         log = self._cls(
             execution_fk=self._execution_id,
+            task_fk=record.task_id,
             actor=record.prefix,
             level=record.levelname,
             msg=record.msg,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 1d228b6..bb9d839 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -36,13 +36,15 @@ class BaseContext(object):
     """
 
     class PrefixedLogger(object):
-        def __init__(self, logger, prefix=''):
+        def __init__(self, logger, prefix='', task_id=None):
             self._logger = logger
             self._prefix = prefix
+            self._task_id = task_id
 
         def __getattr__(self, item):
             if item.upper() in logging._levelNames:
-                return partial(getattr(self._logger, item), extra={'prefix': self._prefix})
+                return partial(getattr(self._logger, item),
+                               extra={'prefix': self._prefix, 'task_id': self._task_id})
             else:
                 return getattr(self._logger, item)
 
@@ -74,9 +76,10 @@ class BaseContext(object):
         self.model.execution.put(execution)
         return execution.id
 
-    def _register_logger(self, logger_name=None, level=None):
+    def _register_logger(self, logger_name=None, level=None, task_id=None):
         self.logger = self.PrefixedLogger(logging.getLogger(logger_name or self.__class__.__name__),
-                                          self.logging_id)
+                                          self.logging_id,
+                                          task_id=task_id)
         self.logger.addHandler(aria_logger.create_console_log_handler())
         self.logger.addHandler(self._get_sqla_handler())
         self.logger.setLevel(level or logging.DEBUG)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index cbaa462..d2716e8 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -17,6 +17,8 @@
 Workflow and operation contexts
 """
 
+import threading
+
 import aria
 from aria.utils import file
 from .common import BaseContext
@@ -44,9 +46,9 @@ class BaseOperationContext(BaseContext):
             **kwargs)
         self._task_id = task_id
         self._actor_id = actor_id
-        self._task = None
+        self._thread_local = threading.local()
         self._execution_id = execution_id
-        self._register_logger()
+        self._register_logger(task_id=self.task.id)
 
     def __repr__(self):
         details = 'implementation={task.implementation}; ' \
@@ -64,9 +66,13 @@ class BaseOperationContext(BaseContext):
         The task in the model storage
         :return: Task model
         """
-        if not self._task:
-            self._task = self.model.task.get(self._task_id)
-        return self._task
+        # SQLAlchemy prevents from accessing an object which was created on a different thread.
+        # So we retrieve the object from the storage if the current thread isn't the same as the
+        # original thread.
+
+        if not hasattr(self._thread_local, 'task'):
+            self._thread_local.task = self.model.task.get(self._task_id)
+        return self._thread_local.task
 
     @property
     def plugin_workdir(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/execution_plugin/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/common.py b/aria/orchestrator/execution_plugin/common.py
index 47cb631..7915c47 100644
--- a/aria/orchestrator/execution_plugin/common.py
+++ b/aria/orchestrator/execution_plugin/common.py
@@ -106,8 +106,6 @@ def create_process_config(script_path, process, operation_kwargs, quote_json_env
 def patch_ctx(ctx):
     ctx._error = None
     task = ctx.task
-    task._original_abort = task.abort
-    task._original_retry = task.retry
 
     def _validate_legal_action():
         if ctx._error is not None:
@@ -133,14 +131,14 @@ def check_error(ctx, error_check_func=None, reraise=False):
     _error = ctx._error
     # this happens when a script calls task.abort/task.retry more than once
     if isinstance(_error, RuntimeError):
-        ctx.task._original_abort(str(_error))
+        ctx.task.abort(str(_error))
     # ScriptException is populated by the ctx proxy server when task.abort or task.retry
     # are called
     elif isinstance(_error, exceptions.ScriptException):
         if _error.retry:
-            ctx.task._original_retry(_error.message, _error.retry_interval)
+            ctx.task.retry(_error.message, _error.retry_interval)
         else:
-            ctx.task._original_abort(_error.message)
+            ctx.task.abort(_error.message)
     # local and ssh operations may pass an additional logic check for errors here
     if error_check_func:
         error_check_func()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/execution_plugin/ctx_proxy/server.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
index 2782ae3..817d064 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
@@ -30,8 +30,9 @@ from .. import exceptions
 
 class CtxProxy(object):
 
-    def __init__(self, ctx):
+    def __init__(self, ctx, ctx_patcher=(lambda *args, **kwargs: None)):
         self.ctx = ctx
+        self._ctx_patcher = ctx_patcher
         self.port = _get_unused_port()
         self.socket_url = 'http://localhost:{0}'.format(self.port)
         self.server = None
@@ -69,6 +70,9 @@ class CtxProxy(object):
                 server.serve_forever(poll_interval=0.1)
 
         def serve():
+            # Since task is a thread_local object, we need to patch it inside the server thread.
+            self._ctx_patcher(self.ctx)
+
             bottle_app = bottle.Bottle()
             bottle_app.post('/', callback=self._request_handler)
             bottle.run(

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/execution_plugin/local.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/local.py b/aria/orchestrator/execution_plugin/local.py
index bc2d661..121e582 100644
--- a/aria/orchestrator/execution_plugin/local.py
+++ b/aria/orchestrator/execution_plugin/local.py
@@ -75,8 +75,7 @@ def _execute_func(script_path, ctx, process, operation_kwargs):
     env = os.environ.copy()
     env.update(process['env'])
     ctx.logger.info('Executing: {0}'.format(command))
-    common.patch_ctx(ctx)
-    with ctx_proxy.server.CtxProxy(ctx) as proxy:
+    with ctx_proxy.server.CtxProxy(ctx, common.patch_ctx) as proxy:
         env[ctx_proxy.client.CTX_SOCKET_URL] = proxy.socket_url
         running_process = subprocess.Popen(
             command,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/execution_plugin/ssh/operations.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ssh/operations.py b/aria/orchestrator/execution_plugin/ssh/operations.py
index 7589d42..f240beb 100644
--- a/aria/orchestrator/execution_plugin/ssh/operations.py
+++ b/aria/orchestrator/execution_plugin/ssh/operations.py
@@ -70,14 +70,13 @@ def run_script(ctx, script_path, fabric_env, process, use_sudo, hide_output, **k
                                                                  paths.remote_work_dir))
             # this file has to be present before using ctx
             fabric.api.put(_PROXY_CLIENT_PATH, paths.remote_ctx_path)
-        _patch_ctx(ctx)
         process = common.create_process_config(
             script_path=paths.remote_script_path,
             process=process,
             operation_kwargs=kwargs,
             quote_json_env_vars=True)
         fabric.api.put(paths.local_script_path, paths.remote_script_path)
-        with ctx_proxy.server.CtxProxy(ctx) as proxy:
+        with ctx_proxy.server.CtxProxy(ctx, _patch_ctx) as proxy:
             local_port = proxy.port
             with fabric.context_managers.cd(process.get('cwd', paths.remote_work_dir)):  # pylint: disable=not-context-manager
                 with tunnel.remote(ctx, local_port=local_port) as remote_port:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/storage/modeling/orchestrator_elements.py
----------------------------------------------------------------------
diff --git a/aria/storage/modeling/orchestrator_elements.py b/aria/storage/modeling/orchestrator_elements.py
index d06b5d0..ef773ed 100644
--- a/aria/storage/modeling/orchestrator_elements.py
+++ b/aria/storage/modeling/orchestrator_elements.py
@@ -479,6 +479,14 @@ class LogBase(ModelMixin):
     def execution(cls):
         return cls.many_to_one_relationship('execution')
 
+    @declared_attr
+    def task_fk(cls):
+        return cls.foreign_key('task', nullable=True)
+
+    @declared_attr
+    def task(cls):
+        return cls.many_to_one_relationship('task')
+
     level = Column(String)
     msg = Column(String)
     created_at = Column(DateTime, index=True)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index 8db2bc6..b49b1cb 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -49,12 +49,21 @@ def ctx(tmpdir):
 
 
 @pytest.fixture
+def process_executor():
+    ex = process.ProcessExecutor(**dict(python_path=tests.ROOT_DIR))
+    try:
+        yield ex
+    finally:
+        ex.close()
+
+
+@pytest.fixture
 def thread_executor():
-    result = thread.ThreadExecutor()
+    ex = thread.ThreadExecutor()
     try:
-        yield result
+        yield ex
     finally:
-        result.close()
+        ex.close()
 
 
 def test_node_operation_task_execution(ctx, thread_executor):
@@ -213,16 +222,16 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
 
 
 @pytest.fixture(params=[
-    # (thread.ThreadExecutor, dict()),
-    (process.ProcessExecutor, dict(python_path=tests.ROOT_DIR))
+    (thread.ThreadExecutor, {}),
+    (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
 ])
 def executor(request):
-    ex_cls, kwargs = request.param
-    ex = ex_cls(**kwargs)
+    executor_cls, executor_kwargs = request.param
+    result = executor_cls(**executor_kwargs)
     try:
-        yield ex
+        yield result
     finally:
-        ex.close()
+        result.close()
 
 
 def test_node_operation_logging(ctx, executor):
@@ -295,10 +304,17 @@ def _assert_loggins(ctx, inputs):
     assert len(executions) == 1
     execution = executions[0]
 
+    tasks = ctx.model.task.list()
+    assert len(tasks) == 1
+    task = tasks[0]
+    assert task.logs.count() == 4
+
     logs = ctx.model.log.list()
     assert len(logs) == execution.logs.count() == 6
-    assert all(l in logs for l in execution.logs)
+    assert set(logs) == set(execution.logs)
+
     assert all(l.execution == execution for l in logs)
+    assert all(l in logs and l.task == task for l in task.logs)
 
     op_start_log = [l for l in logs if inputs['op_start'] in l.msg and l.level.lower() == 'info']
     assert len(op_start_log) == 1

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 5224496..a94fc83 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -30,7 +30,8 @@ from aria.orchestrator.execution_plugin import constants
 from aria.orchestrator.workflows.executor import process
 from aria.orchestrator.workflows.core import engine
 
-from tests import mock, storage
+from tests import mock
+from tests import storage
 from tests.orchestrator.workflows.helpers import events_collector
 
 IS_WINDOWS = os.name == 'nt'