You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by em...@apache.org on 2017/03/17 19:38:34 UTC
[06/18] incubator-ariatosca git commit:
ARIA-117-Log-model-should-have-an-Task-field
ARIA-117-Log-model-should-have-an-Task-field
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/95177d0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/95177d0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/95177d0f
Branch: refs/heads/ARIA-105-integrate-modeling
Commit: 95177d0f7fdcedf9c32421e2557ddf965683525a
Parents: c0d76ad
Author: max-orlov <ma...@gigaspaces.com>
Authored: Mon Mar 6 16:43:58 2017 +0200
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Mon Mar 13 13:18:08 2017 +0200
----------------------------------------------------------------------
aria/logger.py | 1 +
aria/orchestrator/context/common.py | 11 +++---
aria/orchestrator/context/operation.py | 16 ++++++---
aria/orchestrator/execution_plugin/common.py | 8 ++---
.../execution_plugin/ctx_proxy/server.py | 6 +++-
aria/orchestrator/execution_plugin/local.py | 3 +-
.../execution_plugin/ssh/operations.py | 3 +-
aria/storage/modeling/orchestrator_elements.py | 8 +++++
tests/orchestrator/context/test_operation.py | 36 ++++++++++++++------
.../orchestrator/execution_plugin/test_local.py | 3 +-
10 files changed, 65 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index 6f0b84a..42e3679 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -164,6 +164,7 @@ class _SQLAlchemyHandler(logging.Handler):
'%Y-%m-%d %H:%M:%S,%f')
log = self._cls(
execution_fk=self._execution_id,
+ task_fk=record.task_id,
actor=record.prefix,
level=record.levelname,
msg=record.msg,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 1d228b6..bb9d839 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -36,13 +36,15 @@ class BaseContext(object):
"""
class PrefixedLogger(object):
- def __init__(self, logger, prefix=''):
+ def __init__(self, logger, prefix='', task_id=None):
self._logger = logger
self._prefix = prefix
+ self._task_id = task_id
def __getattr__(self, item):
if item.upper() in logging._levelNames:
- return partial(getattr(self._logger, item), extra={'prefix': self._prefix})
+ return partial(getattr(self._logger, item),
+ extra={'prefix': self._prefix, 'task_id': self._task_id})
else:
return getattr(self._logger, item)
@@ -74,9 +76,10 @@ class BaseContext(object):
self.model.execution.put(execution)
return execution.id
- def _register_logger(self, logger_name=None, level=None):
+ def _register_logger(self, logger_name=None, level=None, task_id=None):
self.logger = self.PrefixedLogger(logging.getLogger(logger_name or self.__class__.__name__),
- self.logging_id)
+ self.logging_id,
+ task_id=task_id)
self.logger.addHandler(aria_logger.create_console_log_handler())
self.logger.addHandler(self._get_sqla_handler())
self.logger.setLevel(level or logging.DEBUG)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index cbaa462..d2716e8 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -17,6 +17,8 @@
Workflow and operation contexts
"""
+import threading
+
import aria
from aria.utils import file
from .common import BaseContext
@@ -44,9 +46,9 @@ class BaseOperationContext(BaseContext):
**kwargs)
self._task_id = task_id
self._actor_id = actor_id
- self._task = None
+ self._thread_local = threading.local()
self._execution_id = execution_id
- self._register_logger()
+ self._register_logger(task_id=self.task.id)
def __repr__(self):
details = 'implementation={task.implementation}; ' \
@@ -64,9 +66,13 @@ class BaseOperationContext(BaseContext):
The task in the model storage
:return: Task model
"""
- if not self._task:
- self._task = self.model.task.get(self._task_id)
- return self._task
+ # SQLAlchemy prevents from accessing an object which was created on a different thread.
+ # So we retrieve the object from the storage if the current thread isn't the same as the
+ # original thread.
+
+ if not hasattr(self._thread_local, 'task'):
+ self._thread_local.task = self.model.task.get(self._task_id)
+ return self._thread_local.task
@property
def plugin_workdir(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/execution_plugin/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/common.py b/aria/orchestrator/execution_plugin/common.py
index 47cb631..7915c47 100644
--- a/aria/orchestrator/execution_plugin/common.py
+++ b/aria/orchestrator/execution_plugin/common.py
@@ -106,8 +106,6 @@ def create_process_config(script_path, process, operation_kwargs, quote_json_env
def patch_ctx(ctx):
ctx._error = None
task = ctx.task
- task._original_abort = task.abort
- task._original_retry = task.retry
def _validate_legal_action():
if ctx._error is not None:
@@ -133,14 +131,14 @@ def check_error(ctx, error_check_func=None, reraise=False):
_error = ctx._error
# this happens when a script calls task.abort/task.retry more than once
if isinstance(_error, RuntimeError):
- ctx.task._original_abort(str(_error))
+ ctx.task.abort(str(_error))
# ScriptException is populated by the ctx proxy server when task.abort or task.retry
# are called
elif isinstance(_error, exceptions.ScriptException):
if _error.retry:
- ctx.task._original_retry(_error.message, _error.retry_interval)
+ ctx.task.retry(_error.message, _error.retry_interval)
else:
- ctx.task._original_abort(_error.message)
+ ctx.task.abort(_error.message)
# local and ssh operations may pass an additional logic check for errors here
if error_check_func:
error_check_func()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/execution_plugin/ctx_proxy/server.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
index 2782ae3..817d064 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
@@ -30,8 +30,9 @@ from .. import exceptions
class CtxProxy(object):
- def __init__(self, ctx):
+ def __init__(self, ctx, ctx_patcher=(lambda *args, **kwargs: None)):
self.ctx = ctx
+ self._ctx_patcher = ctx_patcher
self.port = _get_unused_port()
self.socket_url = 'http://localhost:{0}'.format(self.port)
self.server = None
@@ -69,6 +70,9 @@ class CtxProxy(object):
server.serve_forever(poll_interval=0.1)
def serve():
+ # Since task is a thread_local object, we need to patch it inside the server thread.
+ self._ctx_patcher(self.ctx)
+
bottle_app = bottle.Bottle()
bottle_app.post('/', callback=self._request_handler)
bottle.run(
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/execution_plugin/local.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/local.py b/aria/orchestrator/execution_plugin/local.py
index bc2d661..121e582 100644
--- a/aria/orchestrator/execution_plugin/local.py
+++ b/aria/orchestrator/execution_plugin/local.py
@@ -75,8 +75,7 @@ def _execute_func(script_path, ctx, process, operation_kwargs):
env = os.environ.copy()
env.update(process['env'])
ctx.logger.info('Executing: {0}'.format(command))
- common.patch_ctx(ctx)
- with ctx_proxy.server.CtxProxy(ctx) as proxy:
+ with ctx_proxy.server.CtxProxy(ctx, common.patch_ctx) as proxy:
env[ctx_proxy.client.CTX_SOCKET_URL] = proxy.socket_url
running_process = subprocess.Popen(
command,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/execution_plugin/ssh/operations.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ssh/operations.py b/aria/orchestrator/execution_plugin/ssh/operations.py
index 7589d42..f240beb 100644
--- a/aria/orchestrator/execution_plugin/ssh/operations.py
+++ b/aria/orchestrator/execution_plugin/ssh/operations.py
@@ -70,14 +70,13 @@ def run_script(ctx, script_path, fabric_env, process, use_sudo, hide_output, **k
paths.remote_work_dir))
# this file has to be present before using ctx
fabric.api.put(_PROXY_CLIENT_PATH, paths.remote_ctx_path)
- _patch_ctx(ctx)
process = common.create_process_config(
script_path=paths.remote_script_path,
process=process,
operation_kwargs=kwargs,
quote_json_env_vars=True)
fabric.api.put(paths.local_script_path, paths.remote_script_path)
- with ctx_proxy.server.CtxProxy(ctx) as proxy:
+ with ctx_proxy.server.CtxProxy(ctx, _patch_ctx) as proxy:
local_port = proxy.port
with fabric.context_managers.cd(process.get('cwd', paths.remote_work_dir)): # pylint: disable=not-context-manager
with tunnel.remote(ctx, local_port=local_port) as remote_port:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/storage/modeling/orchestrator_elements.py
----------------------------------------------------------------------
diff --git a/aria/storage/modeling/orchestrator_elements.py b/aria/storage/modeling/orchestrator_elements.py
index d06b5d0..ef773ed 100644
--- a/aria/storage/modeling/orchestrator_elements.py
+++ b/aria/storage/modeling/orchestrator_elements.py
@@ -479,6 +479,14 @@ class LogBase(ModelMixin):
def execution(cls):
return cls.many_to_one_relationship('execution')
+ @declared_attr
+ def task_fk(cls):
+ return cls.foreign_key('task', nullable=True)
+
+ @declared_attr
+ def task(cls):
+ return cls.many_to_one_relationship('task')
+
level = Column(String)
msg = Column(String)
created_at = Column(DateTime, index=True)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index 8db2bc6..b49b1cb 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -49,12 +49,21 @@ def ctx(tmpdir):
@pytest.fixture
+def process_executor():
+ ex = process.ProcessExecutor(**dict(python_path=tests.ROOT_DIR))
+ try:
+ yield ex
+ finally:
+ ex.close()
+
+
+@pytest.fixture
def thread_executor():
- result = thread.ThreadExecutor()
+ ex = thread.ThreadExecutor()
try:
- yield result
+ yield ex
finally:
- result.close()
+ ex.close()
def test_node_operation_task_execution(ctx, thread_executor):
@@ -213,16 +222,16 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
@pytest.fixture(params=[
- # (thread.ThreadExecutor, dict()),
- (process.ProcessExecutor, dict(python_path=tests.ROOT_DIR))
+ (thread.ThreadExecutor, {}),
+ (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
])
def executor(request):
- ex_cls, kwargs = request.param
- ex = ex_cls(**kwargs)
+ executor_cls, executor_kwargs = request.param
+ result = executor_cls(**executor_kwargs)
try:
- yield ex
+ yield result
finally:
- ex.close()
+ result.close()
def test_node_operation_logging(ctx, executor):
@@ -295,10 +304,17 @@ def _assert_loggins(ctx, inputs):
assert len(executions) == 1
execution = executions[0]
+ tasks = ctx.model.task.list()
+ assert len(tasks) == 1
+ task = tasks[0]
+ assert task.logs.count() == 4
+
logs = ctx.model.log.list()
assert len(logs) == execution.logs.count() == 6
- assert all(l in logs for l in execution.logs)
+ assert set(logs) == set(execution.logs)
+
assert all(l.execution == execution for l in logs)
+ assert all(l in logs and l.task == task for l in task.logs)
op_start_log = [l for l in logs if inputs['op_start'] in l.msg and l.level.lower() == 'info']
assert len(op_start_log) == 1
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 5224496..a94fc83 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -30,7 +30,8 @@ from aria.orchestrator.execution_plugin import constants
from aria.orchestrator.workflows.executor import process
from aria.orchestrator.workflows.core import engine
-from tests import mock, storage
+from tests import mock
+from tests import storage
from tests.orchestrator.workflows.helpers import events_collector
IS_WINDOWS = os.name == 'nt'