You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/04/19 12:15:20 UTC
[10/10] incubator-ariatosca git commit:
ARIA-138-Make-logging-more-informative
ARIA-138-Make-logging-more-informative
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/aaac6f60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/aaac6f60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/aaac6f60
Branch: refs/heads/ARIA-138-Make-logging-more-informative
Commit: aaac6f6018c878bdec6036dd2f9ab5e0662140c6
Parents: 1cbd81b
Author: max-orlov <ma...@gigaspaces.com>
Authored: Mon Apr 17 18:50:58 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed Apr 19 13:59:56 2017 +0300
----------------------------------------------------------------------
aria/cli/commands/__init__.py | 1 +
aria/cli/commands/executions.py | 12 +++-
aria/cli/commands/logs.py | 26 ++++----
aria/cli/execution_logging.py | 66 ++++++++++++++++++++
aria/cli/logger.py | 18 ++++++
aria/logger.py | 4 +-
aria/modeling/orchestration.py | 11 +++-
aria/orchestrator/context/common.py | 39 ++++++------
aria/orchestrator/context/operation.py | 29 +--------
aria/orchestrator/context/workflow.py | 4 +-
aria/orchestrator/workflow_runner.py | 6 +-
aria/orchestrator/workflows/core/engine.py | 2 +
aria/orchestrator/workflows/events_logging.py | 26 ++++----
aria/orchestrator/workflows/executor/base.py | 4 +-
aria/orchestrator/workflows/executor/process.py | 15 +++--
aria/orchestrator/workflows/executor/thread.py | 8 ++-
tests/.pylintrc | 2 +-
.../orchestrator/workflows/executor/__init__.py | 51 +++++++++++++++
.../workflows/executor/test_executor.py | 64 +++----------------
.../workflows/executor/test_process_executor.py | 37 +----------
20 files changed, 243 insertions(+), 182 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/aria/cli/commands/__init__.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/__init__.py b/aria/cli/commands/__init__.py
index a01a029..5d76ea6 100644
--- a/aria/cli/commands/__init__.py
+++ b/aria/cli/commands/__init__.py
@@ -1,3 +1,4 @@
+
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index adec56b..b8f78d4 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -16,8 +16,10 @@
import os
from .. import helptexts
+from aria.cli import execution_logging
from .. import utils
from ..core import aria
+from .. import logger as cli_logger
from ..table import print_data
from ...modeling.models import Execution
from ...orchestrator.workflow_runner import WorkflowRunner
@@ -141,13 +143,19 @@ def start(workflow_name,
logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
execution_thread.start()
+
+ log_consumer = cli_logger.ModelLogConsumer(model_storage, workflow_runner.execution_id)
try:
while execution_thread.is_alive():
- # using join without a timeout blocks and ignores KeyboardInterrupt
- execution_thread.join(1)
+ for log in log_consumer:
+ execution_logging.load(log).log()
+
except KeyboardInterrupt:
_cancel_execution(workflow_runner, execution_thread, logger)
+ for log in log_consumer:
+ execution_logging.load(log).log()
+
# raise any errors from the execution thread (note these are not workflow execution errors)
execution_thread.raise_error_if_exists()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/aria/cli/commands/logs.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/logs.py b/aria/cli/commands/logs.py
index 8888fef..e87ee3b 100644
--- a/aria/cli/commands/logs.py
+++ b/aria/cli/commands/logs.py
@@ -12,13 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-from .. import utils
-from ..core import aria
+from ..logger import ModelLogConsumer
+from ..cli import aria
+from .. import execution_logging
@aria.group(name='logs')
-@aria.options.verbose()
def logs():
"""Show logs from workflow executions
"""
@@ -31,19 +30,18 @@ def logs():
@aria.options.verbose()
@aria.pass_model_storage
@aria.pass_logger
-def list(execution_id,
- model_storage,
- logger):
+def list(execution_id, model_storage, logger):
"""Display logs for an execution
"""
logger.info('Listing logs for execution id {0}'.format(execution_id))
- logs_list = model_storage.log.list(filters=dict(execution_fk=execution_id),
- sort=utils.storage_sort_param('created_at', False))
- # TODO: print logs nicely
- if logs_list:
- for log in logs_list:
- print log
- else:
+ log_consumer = ModelLogConsumer(model_storage, execution_id)
+ any_logs = False
+
+ for log in log_consumer:
+ execution_logging.load(log).log()
+ any_logs = True
+
+ if not any_logs:
logger.info('\tNo logs')
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/aria/cli/execution_logging.py
----------------------------------------------------------------------
diff --git a/aria/cli/execution_logging.py b/aria/cli/execution_logging.py
new file mode 100644
index 0000000..16a7d1a
--- /dev/null
+++ b/aria/cli/execution_logging.py
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+from . import logger
+from .env import env
+
+DEFAULT_FORMATTING = {
+ logger.NO_VERBOSE: {'main_msg': '{item.msg}'},
+ logger.LOW_VERBOSE: {
+ 'main_msg': '{created_at} | {item.level[0]} | {item.msg}',
+ 'created_at': '%H:%M:%S'
+ }
+}
+
+
+class load(object):
+
+ def __init__(self, item, formats=None):
+ self._item = item
+ self._formats = formats or DEFAULT_FORMATTING
+
+ def __repr__(self):
+ # Only NO_VERBOSE and LOW_VERBOSE are configurable formats. configuring
+ # the low verbose level should affect any higher level.
+ formats = self._formats[min(env.logging.verbosity_level, logger.LOW_VERBOSE)]
+
+ kwargs = dict(item=self._item)
+ if 'created_at' in formats:
+ kwargs['created_at'] = self._item.created_at.strftime(formats['created_at'])
+ if 'level' in formats:
+ kwargs['level'] = formats['level'].format(self._item.level)
+ if 'msg' in formats:
+ kwargs['msg'] = formats['msg'].format(self._item.msg)
+
+ if 'actor' in formats and self._item.task:
+ kwargs['actor'] = formats['actor'].format(self._item.task.actor)
+ if 'execution' in formats:
+ kwargs['execution'] = formats['execution'].format(self._item.execution)
+
+ # If no format was supplied just print out the original msg.
+ msg = formats.get('main_msg', '{item.msg}').format(**kwargs)
+
+ # Add the exception and the error msg.
+ if self._item.traceback and env.logging.verbosity_level >= logger.MEDIUM_VERBOSE:
+ msg += os.linesep + '------>'
+ for line in self._item.traceback.splitlines(True):
+ msg += '\t' + '|' + line
+
+ return msg
+
+ def log(self, *args, **kwargs):
+ return getattr(env.logging.logger, self._item.level.lower())(self, *args, **kwargs)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/aria/cli/logger.py
----------------------------------------------------------------------
diff --git a/aria/cli/logger.py b/aria/cli/logger.py
index 1ffa918..f130616 100644
--- a/aria/cli/logger.py
+++ b/aria/cli/logger.py
@@ -112,3 +112,21 @@ class Logging(object):
log.setLevel(level)
dictconfig.dictConfig(logger_dict)
+
+
+class ModelLogConsumer(object):
+
+ def __init__(self, model_storage, execution_id, filters=None, sort=None):
+ self._last_visited_id = 0
+ self._model_storage = model_storage
+ self._execution_id = execution_id
+ self._additional_filters = filters or {}
+ self._sort = sort or {}
+
+ def __iter__(self):
+ filters = dict(execution_fk=self._execution_id, id=dict(gt=self._last_visited_id))
+ filters.update(self._additional_filters)
+
+ for log in self._model_storage.log.iter(filters=filters, sort=self._sort):
+ self._last_visited_id = log.id
+ yield log
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index dd54264..d6a06d0 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -177,10 +177,12 @@ class _SQLAlchemyHandler(logging.Handler):
log = self._cls(
execution_fk=self._execution_id,
task_fk=record.task_id,
- actor=record.prefix,
level=record.levelname,
msg=str(record.msg),
created_at=created_at,
+
+ # Not mandatory.
+ traceback=getattr(record, 'traceback', None)
)
self._session.add(log)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index a2f041b..c2cf6b1 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -413,7 +413,9 @@ class LogBase(ModelMixin):
level = Column(String)
msg = Column(String)
created_at = Column(DateTime, index=True)
- actor = Column(String)
+
+ # In case of failed execution
+ traceback = Column(Text)
# region foreign keys
@@ -427,6 +429,9 @@ class LogBase(ModelMixin):
# endregion
+ def __str__(self):
+ return self.msg
+
def __repr__(self):
- return "<{self.created_at}: [{self.level}] @{self.actor}> {msg}".format(
- self=self, msg=self.msg[:50])
+ name = (self.task.actor if self.task else self.execution).name
+ return '{name}: {self.msg}'.format(name=name, self=self)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 15843db..67dffcd 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -38,43 +38,44 @@ class BaseContext(object):
"""
class PrefixedLogger(object):
- def __init__(self, logger, prefix='', task_id=None):
- self._logger = logger
- self._prefix = prefix
+ def __init__(self, base_logger, task_id=None):
+ self._logger = base_logger
self._task_id = task_id
def __getattr__(self, item):
if item.upper() in logging._levelNames:
- return partial(getattr(self._logger, item),
- extra={'prefix': self._prefix, 'task_id': self._task_id})
+ return partial(self._logger_with_task_id, _level=item)
else:
return getattr(self._logger, item)
- def __init__(
- self,
- name,
- service_id,
- execution_id,
- model_storage,
- resource_storage,
- workdir=None,
- **kwargs):
+ def _logger_with_task_id(self, *args, **kwargs):
+ level = kwargs.pop('_level')
+ kwargs.setdefault('extra', {})['task_id'] = self._task_id
+ return getattr(self._logger, level)(*args, **kwargs)
+
+ def __init__(self,
+ name,
+ service_id,
+ model_storage,
+ resource_storage,
+ execution_id,
+ workdir=None,
+ **kwargs):
super(BaseContext, self).__init__(**kwargs)
self._name = name
self._id = generate_uuid(variant='uuid')
self._model = model_storage
self._resource = resource_storage
self._service_id = service_id
- self._execution_id = execution_id
self._workdir = workdir
+ self._execution_id = execution_id
self.logger = None
def _register_logger(self, level=None, task_id=None):
self.logger = self.PrefixedLogger(
- logging.getLogger(aria_logger.TASK_LOGGER_NAME), self.logging_id, task_id=task_id)
+ logging.getLogger(aria_logger.TASK_LOGGER_NAME), task_id=task_id)
self.logger.setLevel(level or logging.DEBUG)
if not self.logger.handlers:
- self.logger.addHandler(aria_logger.create_console_log_handler())
self.logger.addHandler(self._get_sqla_handler())
def _get_sqla_handler(self):
@@ -104,10 +105,6 @@ class BaseContext(object):
self.logger.removeHandler(handler)
@property
- def logging_id(self):
- raise NotImplementedError
-
- @property
def model(self):
"""
Access to the model storage
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index c7d8246..c383958 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -29,20 +29,8 @@ class BaseOperationContext(BaseContext):
Context object used during operation creation and execution
"""
- def __init__(self,
- name,
- model_storage,
- resource_storage,
- service_id,
- task_id,
- actor_id,
- **kwargs):
- super(BaseOperationContext, self).__init__(
- name=name,
- model_storage=model_storage,
- resource_storage=resource_storage,
- service_id=service_id,
- **kwargs)
+ def __init__(self, task_id, actor_id, **kwargs):
+ super(BaseOperationContext, self).__init__(**kwargs)
self._task_id = task_id
self._actor_id = actor_id
self._thread_local = threading.local()
@@ -55,10 +43,6 @@ class BaseOperationContext(BaseContext):
return '{name}({0})'.format(details, name=self.name)
@property
- def logging_id(self):
- raise NotImplementedError
-
- @property
def task(self):
"""
The task in the model storage
@@ -119,10 +103,6 @@ class NodeOperationContext(BaseOperationContext):
"""
@property
- def logging_id(self):
- return self.node.name or self.node.id
-
- @property
def node_template(self):
"""
the node of the current operation
@@ -145,11 +125,6 @@ class RelationshipOperationContext(BaseOperationContext):
"""
@property
- def logging_id(self):
- return '{0}->{1}'.format(self.source_node.name or self.source_node.id,
- self.target_node.name or self.target_node.id)
-
- @property
def source_node_template(self):
"""
The source node
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index 667d22f..920b237 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -50,8 +50,8 @@ class WorkflowContext(BaseContext):
name=self.__class__.__name__, self=self))
@property
- def logging_id(self):
- return '{0}[{1}]'.format(self._workflow_name, self._execution_id)
+ def workflow_name(self):
+ return self._workflow_name
@property
def execution(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 8779f06..c2b781a 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -90,8 +90,12 @@ class WorkflowRunner(object):
tasks_graph=self._tasks_graph)
@property
+ def execution_id(self):
+ return self._execution_id
+
+ @property
def execution(self):
- return self._model_storage.execution.get(self._execution_id)
+ return self._model_storage.execution.get(self.execution_id)
@property
def service(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 155d0ee..a111247 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -20,6 +20,7 @@ The workflow engine. Executes workflows
import time
from datetime import datetime
+import logging
import networkx
from aria import logger
@@ -40,6 +41,7 @@ class Engine(logger.LoggerMixin):
def __init__(self, executor, workflow_context, tasks_graph, **kwargs):
super(Engine, self).__init__(**kwargs)
+ self.logger.addHandler(logging.NullHandler())
self._workflow_context = workflow_context
self._execution_graph = networkx.DiGraph()
self._executor = executor
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py
index e831bfe..1bc5a1e 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -26,41 +26,43 @@ from .. import events
@events.start_task_signal.connect
def _start_task_handler(task, **kwargs):
- task.context.logger.debug('Event: Starting task: {task.name}'.format(task=task))
+ task.context.logger.info('{actor.name} {task.interface_name}.{task.operation_name} started...'
+ .format(actor=task.actor, task=task))
@events.on_success_task_signal.connect
def _success_task_handler(task, **kwargs):
- task.context.logger.debug('Event: Task success: {task.name}'.format(task=task))
+ task.context.logger.info('{actor.name} {task.interface_name}.{task.operation_name} successful'
+ .format(actor=task.actor, task=task))
@events.on_failure_task_signal.connect
-def _failure_operation_handler(task, exception, **kwargs):
- error = '{0}: {1}'.format(type(exception).__name__, exception)
- task.context.logger.error('Event: Task failure: {task.name} [{error}]'.format(
- task=task, error=error))
-
+def _failure_operation_handler(task, traceback, **kwargs):
+ task.context.logger.error(
+ '{actor.name} {task.interface_name}.{task.operation_name} failed'
+ .format(actor=task.actor, task=task), extra=dict(traceback=traceback)
+ )
@events.start_workflow_signal.connect
def _start_workflow_handler(context, **kwargs):
- context.logger.debug('Event: Starting workflow: {context.name}'.format(context=context))
+ context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context))
@events.on_failure_workflow_signal.connect
def _failure_workflow_handler(context, **kwargs):
- context.logger.debug('Event: Workflow failure: {context.name}'.format(context=context))
+ context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context))
@events.on_success_workflow_signal.connect
def _success_workflow_handler(context, **kwargs):
- context.logger.debug('Event: Workflow success: {context.name}'.format(context=context))
+ context.logger.info("'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context))
@events.on_cancelled_workflow_signal.connect
def _cancel_workflow_handler(context, **kwargs):
- context.logger.debug('Event: Workflow cancelled: {context.name}'.format(context=context))
+ context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context))
@events.on_cancelling_workflow_signal.connect
def _cancelling_workflow_handler(context, **kwargs):
- context.logger.debug('Event: Workflow cancelling: {context.name}'.format(context=context))
+ context.logger.info("Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 4ae046d..39becef 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -44,8 +44,8 @@ class BaseExecutor(logger.LoggerMixin):
events.start_task_signal.send(task)
@staticmethod
- def _task_failed(task, exception):
- events.on_failure_task_signal.send(task, exception=exception)
+ def _task_failed(task, exception, traceback=None):
+ events.on_failure_task_signal.send(task, exception=exception, traceback=traceback)
@staticmethod
def _task_succeeded(task):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 851d78e..2378e0a 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -42,13 +42,16 @@ import pickle
import jsonpickle
import aria
-from aria.extension import process_executor
-from aria.utils import imports
-from aria.utils import exceptions
from aria.orchestrator.workflows.executor import base
from aria.storage import instrumentation
+from aria.extension import process_executor
+from aria.utils import (
+ imports,
+ exceptions
+)
from aria.modeling import types as modeling_types
+
_IS_WIN = os.name == 'nt'
_INT_FMT = 'I'
@@ -233,9 +236,10 @@ class ProcessExecutor(base.BaseExecutor):
except BaseException as e:
e.message += 'Task failed due to {0}.'.format(request['exception']) + \
UPDATE_TRACKED_CHANGES_FAILED_STR
- self._task_failed(task, exception=e)
+ self._task_failed(
+ task, exception=e, traceback=exceptions.get_exception_as_string(*sys.exc_info()))
else:
- self._task_failed(task, exception=request['exception'])
+ self._task_failed(task, exception=request['exception'], traceback=request['traceback'])
def _handle_apply_tracked_changes_request(self, task_id, request, response):
task = self._tasks[task_id]
@@ -319,6 +323,7 @@ class _Messenger(object):
'type': type,
'task_id': self.task_id,
'exception': exceptions.wrap_if_needed(exception),
+ 'traceback': exceptions.get_exception_as_string(*sys.exc_info()),
'tracked_changes': tracked_changes
})
response = _recv_message(sock)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index f422592..836b2bf 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -20,7 +20,9 @@ Thread based executor
import Queue
import threading
-from aria.utils import imports
+import sys
+
+from aria.utils import imports, exceptions
from .base import BaseExecutor
@@ -63,7 +65,9 @@ class ThreadExecutor(BaseExecutor):
task_func(ctx=task.context, **inputs)
self._task_succeeded(task)
except BaseException as e:
- self._task_failed(task, exception=e)
+ self._task_failed(task,
+ exception=e,
+ traceback=exceptions.get_exception_as_string(*sys.exc_info()))
# Daemon threads
except BaseException as e:
pass
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/tests/.pylintrc
----------------------------------------------------------------------
diff --git a/tests/.pylintrc b/tests/.pylintrc
index eead6e8..9795bfc 100644
--- a/tests/.pylintrc
+++ b/tests/.pylintrc
@@ -369,7 +369,7 @@ max-statements=50
max-parents=7
# Maximum number of attributes for a class (see R0902).
-max-attributes=15
+max-attributes=25
# Minimum number of public methods for a class (see R0903).
min-public-methods=0
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index ae1e83e..c05831a 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -12,3 +12,54 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import uuid
+import logging
+from collections import namedtuple
+from contextlib import contextmanager
+
+from aria.modeling import models
+
+
+class MockTask(object):
+
+ INFINITE_RETRIES = models.Task.INFINITE_RETRIES
+
+ def __init__(self, implementation, inputs=None, plugin=None):
+ self.implementation = self.name = implementation
+ self.plugin_fk = plugin.id if plugin else None
+ self.plugin = plugin or None
+ self.inputs = inputs or {}
+ self.states = []
+ self.exception = None
+ self.id = str(uuid.uuid4())
+ self.logger = logging.getLogger()
+ self.context = MockContext()
+ self.retry_count = 0
+ self.max_attempts = 1
+ self.ignore_failure = False
+ self.interface_name = 'interface_name'
+ self.operation_name = 'operation_name'
+ self.actor = namedtuple('actor', 'name')(name='actor_name')
+ self.model_task = None
+
+ for state in models.Task.STATES:
+ setattr(self, state.upper(), state)
+
+ @contextmanager
+ def _update(self):
+ yield self
+
+
+class MockContext(object):
+
+ def __init__(self):
+ self.logger = logging.getLogger('mock_logger')
+ self.task = type('SubprocessMockTask', (object, ), {'plugin': None})
+ self.serialization_dict = {'context_cls': self.__class__, 'context': {}}
+
+ def __getattr__(self, item):
+ return None
+
+ @classmethod
+ def deserialize_from_dict(cls, **kwargs):
+ return cls()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index a7619de..d4482ae 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -13,9 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
-import uuid
-from contextlib import contextmanager
import pytest
import retrying
@@ -37,14 +34,19 @@ from aria.orchestrator.workflows.executor import (
)
import tests
+from . import MockTask
+
+
+def _get_implementation(func):
+ return '{module}.{func.__name__}'.format(module=__name__, func=func)
def test_execute(executor):
expected_value = 'value'
- successful_task = MockTask(mock_successful_task)
- failing_task = MockTask(mock_failing_task)
- task_with_inputs = MockTask(mock_task_with_input, inputs={'input': models.Parameter.wrap(
- 'input', 'value')})
+ successful_task = MockTask(_get_implementation(mock_successful_task))
+ failing_task = MockTask(_get_implementation(mock_failing_task))
+ task_with_inputs = MockTask(_get_implementation(mock_task_with_input),
+ inputs={'input': models.Parameter.wrap('input', 'value')})
for task in [successful_task, failing_task, task_with_inputs]:
executor.execute(task)
@@ -81,54 +83,6 @@ class MockException(Exception):
pass
-class MockContext(object):
-
- def __init__(self, *args, **kwargs):
- self.logger = logging.getLogger()
- self.task = type('SubprocessMockTask', (object, ), {'plugin': None})
- self.serialization_dict = {'context_cls': self.__class__, 'context': {}}
-
- def __getattr__(self, item):
- return None
-
- @classmethod
- def deserialize_from_dict(cls, **kwargs):
- return cls()
-
-
-class MockTask(object):
-
- INFINITE_RETRIES = models.Task.INFINITE_RETRIES
-
- def __init__(self, func, inputs=None):
- self.states = []
- self.exception = None
- self.id = str(uuid.uuid4())
- name = func.__name__
- implementation = '{module}.{name}'.format(
- module=__name__,
- name=name)
- self.implementation = implementation
- self.logger = logging.getLogger()
- self.name = name
- self.inputs = inputs or {}
- self.context = MockContext()
- self.retry_count = 0
- self.max_attempts = 1
- self.plugin_fk = None
- self.ignore_failure = False
- self.interface_name = 'interface_name'
- self.operation_name = 'operation_name'
- self.model_task = None
-
- for state in models.Task.STATES:
- setattr(self, state.upper(), state)
-
- @contextmanager
- def _update(self):
- yield self
-
-
@pytest.fixture(params=[
(thread.ThreadExecutor, {'pool_size': 1}),
(thread.ThreadExecutor, {'pool_size': 2}),
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/aaac6f60/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index 839b9f1..b353518 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -15,13 +15,10 @@
import logging
import os
-import uuid
import Queue
-from contextlib import contextmanager
import pytest
-from aria.modeling import models as aria_models
from aria.orchestrator import events
from aria.utils.plugin import create as create_plugin
from aria.orchestrator.workflows.executor import process
@@ -33,17 +30,17 @@ from tests.fixtures import ( # pylint: disable=unused-import
plugin_manager,
fs_model as model
)
+from . import MockTask
class TestProcessExecutor(object):
def test_plugin_execution(self, executor, mock_plugin):
- task = MockTask(plugin=mock_plugin,
- implementation='mock_plugin1.operation')
+ task = MockTask('mock_plugin1.operation', plugin=mock_plugin)
queue = Queue.Queue()
- def handler(_, exception=None):
+ def handler(_, exception=None, **kwargs):
queue.put(exception)
events.on_success_task_signal.connect(handler)
@@ -100,31 +97,3 @@ class MockContext(object):
@classmethod
def deserialize_from_dict(cls, **kwargs):
return cls()
-
-
-class MockTask(object):
-
- INFINITE_RETRIES = aria_models.Task.INFINITE_RETRIES
-
- def __init__(self, plugin, implementation):
- self.id = str(uuid.uuid4())
- self.implementation = implementation
- self.logger = logging.getLogger()
- self.name = implementation
- self.inputs = {}
- self.context = MockContext()
- self.retry_count = 0
- self.max_attempts = 1
- self.plugin_fk = plugin.id
- self.plugin = plugin
- self.ignore_failure = False
- self.interface_name = 'interface_name'
- self.operation_name = 'operation_name'
- self.model_task = None
-
- for state in aria_models.Task.STATES:
- setattr(self, state.upper(), state)
-
- @contextmanager
- def _update(self):
- yield self