You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/04/05 16:15:16 UTC

incubator-ariatosca git commit: wip

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/logger_task [created] 167080780


wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/16708078
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/16708078
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/16708078

Branch: refs/heads/logger_task
Commit: 16708078066f69733c5c2838c658e7977b29ae4e
Parents: 9df203e
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed Apr 5 19:15:04 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed Apr 5 19:15:04 2017 +0300

----------------------------------------------------------------------
 aria/cli/commands/executions.py                 | 12 +++-
 aria/cli/logger.py                              | 66 +++++++++++++++++++-
 aria/logger.py                                  |  1 -
 aria/modeling/orchestration.py                  |  4 +-
 aria/orchestrator/context/common.py             | 14 +----
 aria/orchestrator/context/operation.py          | 13 ----
 aria/orchestrator/context/workflow.py           |  4 +-
 aria/orchestrator/workflow_runner.py            |  6 +-
 aria/orchestrator/workflows/events_logging.py   | 19 +++---
 aria/orchestrator/workflows/executor/process.py |  6 +-
 10 files changed, 100 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index cc8bf6c..dd62e94 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+from aria.cli.logger import LogConsumer
 from .. import utils
 from ..table import print_data
 from ..cli import aria
@@ -139,13 +139,19 @@ def start(workflow_name,
 
     logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
     execution_thread.start()
+
+    log_consumer = LogConsumer(model_storage, workflow_runner.execution_id)
     try:
         while execution_thread.is_alive():
-            # using join without a timeout blocks and ignores KeyboardInterrupt
-            execution_thread.join(1)
+            for log in log_consumer:
+                logger.log(log)
+
     except KeyboardInterrupt:
         _cancel_execution(workflow_runner, execution_thread, logger)
 
+    for log in log_consumer:
+        logger.log(log)
+
     # raise any errors from the execution thread (note these are not workflow execution errors)
     execution_thread.raise_error_if_exists()
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/cli/logger.py
----------------------------------------------------------------------
diff --git a/aria/cli/logger.py b/aria/cli/logger.py
index 289dbd3..bfafbfa 100644
--- a/aria/cli/logger.py
+++ b/aria/cli/logger.py
@@ -51,6 +51,42 @@ DEFAULT_LOGGER_CONFIG = {
 }
 
 
+class _ModelLogLogger(object):
+    def __init__(self, logger, level, formats):
+        self._logger = logger
+        self._level = level
+        self._formats = formats
+
+    def log(self, item):
+        kwargs = dict(item=item)
+        formats = self._formats[self.level]
+        if 'created_at' in formats:
+            kwargs['created_at'] = item.created_at.strftime(formats['created_at'])
+        if 'level' in formats:
+            kwargs['level'] = formats['level'].format(item.level)
+        if 'msg' in formats:
+            kwargs['msg'] = formats['msg'].format(item.msg)
+
+        if 'actor' in formats and item.task:
+            kwargs['actor'] = formats['actor'].format(item.task.actor)
+        if 'execution' in formats:
+            kwargs['execution'] = formats['execution'].format(item.execution)
+
+        msg = formats['main_msg'].format(**kwargs)
+        return getattr(self._logger, item.level.lower())(msg)
+
+    @property
+    def level(self):
+        return logging.INFO if self._level is NO_VERBOSE else logging.DEBUG
+
+    @level.setter
+    def level(self, level):
+        self._level = level
+
+    def __getattr__(self, item):
+        return getattr(self._logger, item)
+
+
 class Logging(object):
 
     def __init__(self, config):
@@ -58,7 +94,18 @@ class Logging(object):
         self._verbosity_level = NO_VERBOSE
         self._all_loggers = []
         self._configure_loggers(config)
-        self._lgr = logging.getLogger('aria.cli.main')
+
+        self._lgr = _ModelLogLogger(
+            logging.getLogger('aria.cli.main'),
+            self._verbosity_level,
+            {logging.INFO: {
+                'main_msg': '{item.msg}',
+            },
+             logging.DEBUG: {
+                'main_msg': '{created_at} | {item.level[0]} | {item.msg}',
+                'created_at': '%H:%M:%S'
+            }
+        })
 
     @property
     def logger(self):
@@ -77,7 +124,7 @@ class Logging(object):
 
     @verbosity_level.setter
     def verbosity_level(self, level):
-        self._verbosity_level = level
+        self._verbosity_level = self._lgr.level = level
         if self.is_high_verbose_level():
             for logger_name in self._all_loggers:
                 logging.getLogger(logger_name).setLevel(logging.DEBUG)
@@ -111,3 +158,18 @@ class Logging(object):
             self._all_loggers.append(logger_name)
 
         logging.config.dictConfig(logger_dict)
+
+
+class LogConsumer(object):
+
+    def __init__(self, model_storage, execution_id):
+        self._last_visited_id = 0
+        self._model_storage = model_storage
+        self._execution_id = execution_id
+
+    def __iter__(self):
+
+        for log in self._model_storage.log.iter(filters=dict(
+                execution_fk=self._execution_id, id=dict(gt=self._last_visited_id)  )):
+            self._last_visited_id = log.id
+            yield log

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index e3039f5..26ba1e3 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -165,7 +165,6 @@ class _SQLAlchemyHandler(logging.Handler):
         log = self._cls(
             execution_fk=self._execution_id,
             task_fk=record.task_id,
-            actor=record.prefix,
             level=record.levelname,
             msg=str(record.msg),
             created_at=created_at,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 3ad6b58..bf7012f 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -394,7 +394,6 @@ class LogBase(ModelMixin):
     level = Column(String)
     msg = Column(String)
     created_at = Column(DateTime, index=True)
-    actor = Column(String)
 
     # region foreign keys
 
@@ -409,5 +408,4 @@ class LogBase(ModelMixin):
     # endregion
 
     def __repr__(self):
-        return "<{self.created_at}: [{self.level}] @{self.actor}> {msg}".format(
-            self=self, msg=self.msg[:50])
+        return "{msg}".format(msg=self.msg)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 2e33d77..452f5d0 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -38,15 +38,13 @@ class BaseContext(object):
     """
 
     class PrefixedLogger(object):
-        def __init__(self, logger, prefix='', task_id=None):
+        def __init__(self, logger, task_id=None):
             self._logger = logger
-            self._prefix = prefix
             self._task_id = task_id
 
         def __getattr__(self, item):
             if item.upper() in logging._levelNames:
-                return partial(getattr(self._logger, item),
-                               extra={'prefix': self._prefix, 'task_id': self._task_id})
+                return partial(getattr(self._logger, item), extra=dict(task_id=self._task_id))
             else:
                 return getattr(self._logger, item)
 
@@ -68,11 +66,9 @@ class BaseContext(object):
         self.logger = None
 
     def _register_logger(self, level=None, task_id=None):
-        self.logger = self.PrefixedLogger(
-            logging.getLogger('aria.executions.task'), self.logging_id, task_id=task_id)
+        self.logger = self.PrefixedLogger(logging.getLogger('aria.orchestrator'), task_id=task_id)
         self.logger.setLevel(level or logging.DEBUG)
         if not self.logger.handlers:
-            self.logger.addHandler(aria_logger.create_console_log_handler())
             self.logger.addHandler(self._get_sqla_handler())
 
     def _get_sqla_handler(self):
@@ -102,10 +98,6 @@ class BaseContext(object):
                 self.logger.removeHandler(handler)
 
     @property
-    def logging_id(self):
-        raise NotImplementedError
-
-    @property
     def model(self):
         """
         Access to the model storage

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index cbd186c..6aed377 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -57,10 +57,6 @@ class BaseOperationContext(BaseContext):
         return '{name}({0})'.format(details, name=self.name)
 
     @property
-    def logging_id(self):
-        raise NotImplementedError
-
-    @property
     def task(self):
         """
         The task in the model storage
@@ -121,10 +117,6 @@ class NodeOperationContext(BaseOperationContext):
     """
 
     @property
-    def logging_id(self):
-        return self.node.name or self.node.id
-
-    @property
     def node_template(self):
         """
         the node of the current operation
@@ -147,11 +139,6 @@ class RelationshipOperationContext(BaseOperationContext):
     """
 
     @property
-    def logging_id(self):
-        return '{0}->{1}'.format(self.source_node.name or self.source_node.id,
-                                 self.target_node.name or self.target_node.id)
-
-    @property
     def source_node_template(self):
         """
         The source node

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index ad4a2ff..9d8c0d1 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -52,8 +52,8 @@ class WorkflowContext(BaseContext):
                 name=self.__class__.__name__, self=self))
 
     @property
-    def logging_id(self):
-        return '{0}[{1}]'.format(self._workflow_name, self._execution_id)
+    def workflow_name(self):
+        return self._workflow_name
 
     @property
     def execution(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 65c0d4c..eff5347 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -90,8 +90,12 @@ class WorkflowRunner(object):
             tasks_graph=self._tasks_graph)
 
     @property
+    def execution_id(self):
+        return self._execution_id
+
+    @property
     def execution(self):
-        return self._model_storage.execution.get(self._execution_id)
+        return self._model_storage.execution.get(self.execution_id)
 
     @property
     def service(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py
index e831bfe..07981e9 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -26,16 +26,21 @@ from .. import events
 
 @events.start_task_signal.connect
 def _start_task_handler(task, **kwargs):
-    task.context.logger.debug('Event: Starting task: {task.name}'.format(task=task))
+    task.context.logger.debug('{actor.name} {task.interface_name}.{task.operation_name} started...'
+                             .format(actor=task.actor, task=task))
 
 
 @events.on_success_task_signal.connect
 def _success_task_handler(task, **kwargs):
-    task.context.logger.debug('Event: Task success: {task.name}'.format(task=task))
+    task.context.logger.info('{actor.name} {task.interface_name}.{task.operation_name} successful'
+                             .format(actor=task.actor, task=task))
 
 
 @events.on_failure_task_signal.connect
 def _failure_operation_handler(task, exception, **kwargs):
+    # todo: add full support for exceptions and errors logging
+    task.context.logger.info('{actor.name} {task.interface_name}.{task.operation_name} failed'
+                             .format(actor=task.actor, task=task))
     error = '{0}: {1}'.format(type(exception).__name__, exception)
     task.context.logger.error('Event: Task failure: {task.name} [{error}]'.format(
         task=task, error=error))
@@ -43,24 +48,24 @@ def _failure_operation_handler(task, exception, **kwargs):
 
 @events.start_workflow_signal.connect
 def _start_workflow_handler(context, **kwargs):
-    context.logger.debug('Event: Starting workflow: {context.name}'.format(context=context))
+    context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context))
 
 
 @events.on_failure_workflow_signal.connect
 def _failure_workflow_handler(context, **kwargs):
-    context.logger.debug('Event: Workflow failure: {context.name}'.format(context=context))
+    context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context))
 
 
 @events.on_success_workflow_signal.connect
 def _success_workflow_handler(context, **kwargs):
-    context.logger.debug('Event: Workflow success: {context.name}'.format(context=context))
+    context.logger.info("'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context))
 
 
 @events.on_cancelled_workflow_signal.connect
 def _cancel_workflow_handler(context, **kwargs):
-    context.logger.debug('Event: Workflow cancelled: {context.name}'.format(context=context))
+    context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context))
 
 
 @events.on_cancelling_workflow_signal.connect
 def _cancelling_workflow_handler(context, **kwargs):
-    context.logger.debug('Event: Workflow cancelling: {context.name}'.format(context=context))
+    context.logger.info("Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index dc369ab..83d8b55 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -148,8 +148,10 @@ class ProcessExecutor(base.BaseExecutor):
     def _create_arguments_dict(self, task):
         return {
             'task_id': task.id,
-            'implementation': task.implementation,
-            'operation_inputs': Parameter.unwrap_dict(task.inputs),
+            # 'implementation': task.implementation,
+            'implementation': 'aria.orchestrator.execution_plugin.operations.run_script_locally',
+            # 'operation_inputs': Parameter.unwrap_dict(task.inputs),
+            'operation_inputs': dict(script_path=task.implementation),
             'port': self._server_port,
             'context': task.context.serialization_dict,
         }