You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/04/10 09:51:26 UTC
[17/18] incubator-ariatosca git commit: logger wip
logger wip
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/03103f6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/03103f6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/03103f6b
Branch: refs/heads/logger_task
Commit: 03103f6b97d81bf559892d6388f8ba177926a08d
Parents: 5b245b4
Author: max-orlov <ma...@gigaspaces.com>
Authored: Thu Apr 6 11:54:44 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Mon Apr 10 12:50:58 2017 +0300
----------------------------------------------------------------------
aria/cli/commands/executions.py | 12 +++-
aria/cli/logger.py | 66 +++++++++++++++++++-
aria/logger.py | 1 -
aria/modeling/orchestration.py | 4 +-
aria/orchestrator/context/common.py | 11 +---
aria/orchestrator/context/operation.py | 13 ----
aria/orchestrator/context/workflow.py | 4 +-
aria/orchestrator/workflow_runner.py | 6 +-
aria/orchestrator/workflows/events_logging.py | 19 +++---
aria/orchestrator/workflows/executor/process.py | 6 +-
10 files changed, 99 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03103f6b/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index 396985a..a84a8a2 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+from aria.cli.logger import LogConsumer
from .. import utils
from ..table import print_data
from ..cli import aria
@@ -139,13 +139,19 @@ def start(workflow_name,
logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
execution_thread.start()
+
+ log_consumer = LogConsumer(model_storage, workflow_runner.execution_id)
try:
while execution_thread.is_alive():
- # using join without a timeout blocks and ignores KeyboardInterrupt
- execution_thread.join(1)
+ for log in log_consumer:
+ logger.log(log)
+
except KeyboardInterrupt:
_cancel_execution(workflow_runner, execution_thread, logger)
+ for log in log_consumer:
+ logger.log(log)
+
# raise any errors from the execution thread (note these are not workflow execution errors)
execution_thread.raise_error_if_exists()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03103f6b/aria/cli/logger.py
----------------------------------------------------------------------
diff --git a/aria/cli/logger.py b/aria/cli/logger.py
index 289dbd3..bfafbfa 100644
--- a/aria/cli/logger.py
+++ b/aria/cli/logger.py
@@ -51,6 +51,42 @@ DEFAULT_LOGGER_CONFIG = {
}
+class _ModelLogLogger(object):
+ def __init__(self, logger, level, formats):
+ self._logger = logger
+ self._level = level
+ self._formats = formats
+
+ def log(self, item):
+ kwargs = dict(item=item)
+ formats = self._formats[self.level]
+ if 'created_at' in formats:
+ kwargs['created_at'] = item.created_at.strftime(formats['created_at'])
+ if 'level' in formats:
+ kwargs['level'] = formats['level'].format(item.level)
+ if 'msg' in formats:
+ kwargs['msg'] = formats['msg'].format(item.msg)
+
+ if 'actor' in formats and item.task:
+ kwargs['actor'] = formats['actor'].format(item.task.actor)
+ if 'execution' in formats:
+ kwargs['execution'] = formats['execution'].format(item.execution)
+
+ msg = formats['main_msg'].format(**kwargs)
+ return getattr(self._logger, item.level.lower())(msg)
+
+ @property
+ def level(self):
+ return logging.INFO if self._level is NO_VERBOSE else logging.DEBUG
+
+ @level.setter
+ def level(self, level):
+ self._level = level
+
+ def __getattr__(self, item):
+ return getattr(self._logger, item)
+
+
class Logging(object):
def __init__(self, config):
@@ -58,7 +94,18 @@ class Logging(object):
self._verbosity_level = NO_VERBOSE
self._all_loggers = []
self._configure_loggers(config)
- self._lgr = logging.getLogger('aria.cli.main')
+
+ self._lgr = _ModelLogLogger(
+ logging.getLogger('aria.cli.main'),
+ self._verbosity_level,
+ {logging.INFO: {
+ 'main_msg': '{item.msg}',
+ },
+ logging.DEBUG: {
+ 'main_msg': '{created_at} | {item.level[0]} | {item.msg}',
+ 'created_at': '%H:%M:%S'
+ }
+ })
@property
def logger(self):
@@ -77,7 +124,7 @@ class Logging(object):
@verbosity_level.setter
def verbosity_level(self, level):
- self._verbosity_level = level
+ self._verbosity_level = self._lgr.level = level
if self.is_high_verbose_level():
for logger_name in self._all_loggers:
logging.getLogger(logger_name).setLevel(logging.DEBUG)
@@ -111,3 +158,18 @@ class Logging(object):
self._all_loggers.append(logger_name)
logging.config.dictConfig(logger_dict)
+
+
+class LogConsumer(object):
+
+ def __init__(self, model_storage, execution_id):
+ self._last_visited_id = 0
+ self._model_storage = model_storage
+ self._execution_id = execution_id
+
+ def __iter__(self):
+
+ for log in self._model_storage.log.iter(filters=dict(
+ execution_fk=self._execution_id, id=dict(gt=self._last_visited_id) )):
+ self._last_visited_id = log.id
+ yield log
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03103f6b/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index bbb6c7a..040ab3f 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -168,7 +168,6 @@ class _SQLAlchemyHandler(logging.Handler):
log = self._cls(
execution_fk=self._execution_id,
task_fk=record.task_id,
- actor=record.prefix,
level=record.levelname,
msg=str(record.msg),
created_at=created_at,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03103f6b/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 12a56f6..f880d31 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -393,7 +393,6 @@ class LogBase(ModelMixin):
level = Column(String)
msg = Column(String)
created_at = Column(DateTime, index=True)
- actor = Column(String)
# region foreign keys
@@ -408,5 +407,4 @@ class LogBase(ModelMixin):
# endregion
def __repr__(self):
- return "<{self.created_at}: [{self.level}] @{self.actor}> {msg}".format(
- self=self, msg=self.msg[:50])
+ return "{msg}".format(msg=self.msg)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03103f6b/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 11b5eb9..c9ca53b 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -39,15 +39,13 @@ class BaseContext(object):
"""
class PrefixedLogger(object):
- def __init__(self, logger, prefix='', task_id=None):
+ def __init__(self, logger, task_id=None):
self._logger = logger
- self._prefix = prefix
self._task_id = task_id
def __getattr__(self, item):
if item.upper() in logging._levelNames:
- return partial(getattr(self._logger, item),
- extra={'prefix': self._prefix, 'task_id': self._task_id})
+ return partial(getattr(self._logger, item), extra=dict(task_id=self._task_id))
else:
return getattr(self._logger, item)
@@ -73,7 +71,6 @@ class BaseContext(object):
logging.getLogger(logger.TASK_LOGGER_NAME), self.logging_id, task_id=task_id)
self.logger.setLevel(level or logging.DEBUG)
if not self.logger.handlers:
- self.logger.addHandler(aria_logger.create_console_log_handler())
self.logger.addHandler(self._get_sqla_handler())
def _get_sqla_handler(self):
@@ -103,10 +100,6 @@ class BaseContext(object):
self.logger.removeHandler(handler)
@property
- def logging_id(self):
- raise NotImplementedError
-
- @property
def model(self):
"""
Access to the model storage
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03103f6b/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index cbd186c..6aed377 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -57,10 +57,6 @@ class BaseOperationContext(BaseContext):
return '{name}({0})'.format(details, name=self.name)
@property
- def logging_id(self):
- raise NotImplementedError
-
- @property
def task(self):
"""
The task in the model storage
@@ -121,10 +117,6 @@ class NodeOperationContext(BaseOperationContext):
"""
@property
- def logging_id(self):
- return self.node.name or self.node.id
-
- @property
def node_template(self):
"""
the node of the current operation
@@ -147,11 +139,6 @@ class RelationshipOperationContext(BaseOperationContext):
"""
@property
- def logging_id(self):
- return '{0}->{1}'.format(self.source_node.name or self.source_node.id,
- self.target_node.name or self.target_node.id)
-
- @property
def source_node_template(self):
"""
The source node
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03103f6b/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index ad4a2ff..9d8c0d1 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -52,8 +52,8 @@ class WorkflowContext(BaseContext):
name=self.__class__.__name__, self=self))
@property
- def logging_id(self):
- return '{0}[{1}]'.format(self._workflow_name, self._execution_id)
+ def workflow_name(self):
+ return self._workflow_name
@property
def execution(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03103f6b/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 1cdf1de..6c18baf 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -92,8 +92,12 @@ class WorkflowRunner(object):
tasks_graph=self._tasks_graph)
@property
+ def execution_id(self):
+ return self._execution_id
+
+ @property
def execution(self):
- return self._model_storage.execution.get(self._execution_id)
+ return self._model_storage.execution.get(self.execution_id)
@property
def service(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03103f6b/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py
index e831bfe..07981e9 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -26,16 +26,21 @@ from .. import events
@events.start_task_signal.connect
def _start_task_handler(task, **kwargs):
- task.context.logger.debug('Event: Starting task: {task.name}'.format(task=task))
+ task.context.logger.debug('{actor.name} {task.interface_name}.{task.operation_name} started...'
+ .format(actor=task.actor, task=task))
@events.on_success_task_signal.connect
def _success_task_handler(task, **kwargs):
- task.context.logger.debug('Event: Task success: {task.name}'.format(task=task))
+ task.context.logger.info('{actor.name} {task.interface_name}.{task.operation_name} successful'
+ .format(actor=task.actor, task=task))
@events.on_failure_task_signal.connect
def _failure_operation_handler(task, exception, **kwargs):
+ # todo: add full support for exceptions and errors logging
+ task.context.logger.info('{actor.name} {task.interface_name}.{task.operation_name} failed'
+ .format(actor=task.actor, task=task))
error = '{0}: {1}'.format(type(exception).__name__, exception)
task.context.logger.error('Event: Task failure: {task.name} [{error}]'.format(
task=task, error=error))
@@ -43,24 +48,24 @@ def _failure_operation_handler(task, exception, **kwargs):
@events.start_workflow_signal.connect
def _start_workflow_handler(context, **kwargs):
- context.logger.debug('Event: Starting workflow: {context.name}'.format(context=context))
+ context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context))
@events.on_failure_workflow_signal.connect
def _failure_workflow_handler(context, **kwargs):
- context.logger.debug('Event: Workflow failure: {context.name}'.format(context=context))
+ context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context))
@events.on_success_workflow_signal.connect
def _success_workflow_handler(context, **kwargs):
- context.logger.debug('Event: Workflow success: {context.name}'.format(context=context))
+ context.logger.info("'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context))
@events.on_cancelled_workflow_signal.connect
def _cancel_workflow_handler(context, **kwargs):
- context.logger.debug('Event: Workflow cancelled: {context.name}'.format(context=context))
+ context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context))
@events.on_cancelling_workflow_signal.connect
def _cancelling_workflow_handler(context, **kwargs):
- context.logger.debug('Event: Workflow cancelling: {context.name}'.format(context=context))
+ context.logger.info("Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/03103f6b/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index dc369ab..83d8b55 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -148,8 +148,10 @@ class ProcessExecutor(base.BaseExecutor):
def _create_arguments_dict(self, task):
return {
'task_id': task.id,
- 'implementation': task.implementation,
- 'operation_inputs': Parameter.unwrap_dict(task.inputs),
+ # 'implementation': task.implementation,
+ 'implementation': 'aria.orchestrator.execution_plugin.operations.run_script_locally',
+ # 'operation_inputs': Parameter.unwrap_dict(task.inputs),
+ 'operation_inputs': dict(script_path=task.implementation),
'port': self._server_port,
'context': task.context.serialization_dict,
}