You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by da...@apache.org on 2016/12/21 17:39:04 UTC
incubator-ariatosca git commit: ARIA-41 Provide (initial) means for
serializing an operation context object [Forced Update!]
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-41-limited-context-serialization d86214caf -> d143772d1 (forced update)
ARIA-41 Provide (initial) means for serializing an operation context object
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/d143772d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/d143772d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/d143772d
Branch: refs/heads/ARIA-41-limited-context-serialization
Commit: d143772d1497a54e447c1739083ed030328a7a0b
Parents: 5cf84ee
Author: Dan Kilman <da...@gigaspaces.com>
Authored: Mon Dec 19 17:05:54 2016 +0200
Committer: Dan Kilman <da...@gigaspaces.com>
Committed: Wed Dec 21 19:38:59 2016 +0200
----------------------------------------------------------------------
aria/__init__.py | 26 ++---
aria/orchestrator/context/common.py | 11 +-
aria/orchestrator/context/operation.py | 21 ++--
aria/orchestrator/context/serialization.py | 94 +++++++++++++++++
aria/orchestrator/context/workflow.py | 13 ++-
aria/orchestrator/workflows/core/task.py | 11 +-
aria/orchestrator/workflows/events_logging.py | 6 +-
aria/orchestrator/workflows/executor/process.py | 22 ++--
aria/storage/filesystem_rapi.py | 5 +-
tests/__init__.py | 4 +
tests/mock/context.py | 18 +++-
tests/orchestrator/context/test_serialize.py | 101 +++++++++++++++++++
tests/orchestrator/workflows/core/test_task.py | 1 +
.../workflows/executor/test_executor.py | 19 +++-
.../workflows/executor/test_process_executor.py | 11 +-
15 files changed, 303 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/__init__.py
----------------------------------------------------------------------
diff --git a/aria/__init__.py b/aria/__init__.py
index 0f7bec6..cc362c0 100644
--- a/aria/__init__.py
+++ b/aria/__init__.py
@@ -41,8 +41,6 @@ __all__ = (
'operation',
)
-_resource_storage = {}
-
def install_aria_extensions():
"""
@@ -61,7 +59,7 @@ def install_aria_extensions():
def application_model_storage(api, api_kwargs=None):
"""
- Initiate model storage for the supplied storage driver
+ Initiate model storage
"""
models = [
storage.models.Plugin,
@@ -85,17 +83,15 @@ def application_model_storage(api, api_kwargs=None):
return storage.ModelStorage(api, items=models, api_kwargs=api_kwargs or {})
-def application_resource_storage(driver):
+def application_resource_storage(api, api_kwargs=None):
"""
- Initiate resource storage for the supplied storage driver
+ Initiate resource storage
"""
- if driver not in _resource_storage:
- _resource_storage[driver] = storage.ResourceStorage(
- driver,
- resources=[
- 'blueprint',
- 'deployment',
- 'plugin',
- 'snapshot',
- ])
- return _resource_storage[driver]
+ return storage.ResourceStorage(
+ api,
+ api_kwargs=api_kwargs or {},
+ items=[
+ 'blueprint',
+ 'deployment',
+ 'plugin',
+ ])
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 14efd9d..fdbe152 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -29,13 +29,9 @@ class BaseContext(logger.LoggerMixin):
def __init__(
self,
name,
+ deployment_id,
model_storage,
resource_storage,
- deployment_id,
- workflow_name,
- task_max_attempts=1,
- task_retry_interval=0,
- task_ignore_failure=False,
**kwargs):
super(BaseContext, self).__init__(**kwargs)
self._name = name
@@ -43,16 +39,11 @@ class BaseContext(logger.LoggerMixin):
self._model = model_storage
self._resource = resource_storage
self._deployment_id = deployment_id
- self._workflow_name = workflow_name
- self._task_max_attempts = task_max_attempts
- self._task_retry_interval = task_retry_interval
- self._task_ignore_failure = task_ignore_failure
def __repr__(self):
return (
'{name}(name={self.name}, '
'deployment_id={self._deployment_id}, '
- 'workflow_name={self._workflow_name}, '
.format(name=self.__class__.__name__, self=self))
@property
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index a73bad1..19bb73a 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -26,17 +26,22 @@ class BaseOperationContext(BaseContext):
Context object used during operation creation and execution
"""
- def __init__(self, name, workflow_context, task, actor, **kwargs):
+ def __init__(self,
+ name,
+ model_storage,
+ resource_storage,
+ deployment_id,
+ task_id,
+ actor_id,
+ **kwargs):
super(BaseOperationContext, self).__init__(
name=name,
- model_storage=workflow_context.model,
- resource_storage=workflow_context.resource,
- deployment_id=workflow_context._deployment_id,
- workflow_name=workflow_context._workflow_name,
+ model_storage=model_storage,
+ resource_storage=resource_storage,
+ deployment_id=deployment_id,
**kwargs)
- self._task_model = task
- self._task_id = task.id
- self._actor_id = actor.id
+ self._task_id = task_id
+ self._actor_id = actor_id
def __repr__(self):
details = 'operation_mapping={task.operation_mapping}; ' \
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/context/serialization.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/serialization.py b/aria/orchestrator/context/serialization.py
new file mode 100644
index 0000000..93cb38a
--- /dev/null
+++ b/aria/orchestrator/context/serialization.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sqlalchemy.orm
+import sqlalchemy.pool
+
+import aria
+
+
+def operation_context_to_dict(context):
+ context_cls = context.__class__
+ context_dict = {
+ 'name': context.name,
+ 'deployment_id': context._deployment_id,
+ 'task_id': context._task_id,
+ 'actor_id': context._actor_id,
+ }
+ if context.model:
+ model = context.model
+ context_dict['model_storage'] = {
+ 'api_cls': model.api,
+ 'api_kwargs': _serialize_sql_mapi_kwargs(model)
+ }
+ else:
+ context_dict['model_storage'] = None
+ if context.resource:
+ resource = context.resource
+ context_dict['resource_storage'] = {
+ 'api_cls': resource.api,
+ 'api_kwargs': _serialize_file_rapi_kwargs(resource)
+ }
+ else:
+ context_dict['resource_storage'] = None
+ return {
+ 'context_cls': context_cls,
+ 'context': context_dict
+ }
+
+
+def operation_context_from_dict(context_dict):
+ context_cls = context_dict['context_cls']
+ context = context_dict['context']
+
+ model_storage = context['model_storage']
+ if model_storage:
+ api_cls = model_storage['api_cls']
+ api_kwargs = _deserialize_sql_mapi_kwargs(model_storage.get('api_kwargs', {}))
+ context['model_storage'] = aria.application_model_storage(api=api_cls,
+ api_kwargs=api_kwargs)
+
+ resource_storage = context['resource_storage']
+ if resource_storage:
+ api_cls = resource_storage['api_cls']
+ api_kwargs = _deserialize_file_rapi_kwargs(resource_storage.get('api_kwargs', {}))
+ context['resource_storage'] = aria.application_resource_storage(api=api_cls,
+ api_kwargs=api_kwargs)
+
+ return context_cls(**context)
+
+
+def _serialize_sql_mapi_kwargs(model):
+ engine_url = str(model._api_kwargs['engine'].url)
+ assert ':memory:' not in engine_url
+ return {'engine_url': engine_url}
+
+
+def _deserialize_sql_mapi_kwargs(api_kwargs):
+ engine_url = api_kwargs.get('engine_url')
+ if not engine_url:
+ return {}
+ engine = sqlalchemy.create_engine(engine_url)
+ session_factory = sqlalchemy.orm.sessionmaker(bind=engine)
+ session = sqlalchemy.orm.scoped_session(session_factory=session_factory)
+ return {'session': session, 'engine': engine}
+
+
+def _serialize_file_rapi_kwargs(resource):
+ return {'directory': resource._api_kwargs['directory']}
+
+
+def _deserialize_file_rapi_kwargs(api_kwargs):
+ return api_kwargs
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index e2e8e25..e3be2d5 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -29,9 +29,20 @@ class WorkflowContext(BaseContext):
"""
Context object used during workflow creation and execution
"""
- def __init__(self, parameters=None, execution_id=None, *args, **kwargs):
+ def __init__(self,
+ workflow_name,
+ parameters=None,
+ execution_id=None,
+ task_max_attempts=1,
+ task_retry_interval=0,
+ task_ignore_failure=False,
+ *args, **kwargs):
super(WorkflowContext, self).__init__(*args, **kwargs)
+ self._workflow_name = workflow_name
self.parameters = parameters or {}
+ self._task_max_attempts = task_max_attempts
+ self._task_retry_interval = task_retry_interval
+ self._task_ignore_failure = task_ignore_failure
# TODO: execution creation should happen somewhere else
# should be moved there, when such logical place exists
self._execution_id = self._create_execution() if execution_id is None else execution_id
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index 08cf26e..663eeac 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -134,14 +134,17 @@ class OperationTask(BaseTask):
max_attempts=api_task.max_attempts,
retry_interval=api_task.retry_interval,
ignore_failure=api_task.ignore_failure,
- plugin_id=plugin_id
+ plugin_id=plugin_id,
+ execution_id=self._workflow_context.execution.id
)
self._workflow_context.model.task.put(operation_task)
self._ctx = context_class(name=api_task.name,
- workflow_context=self._workflow_context,
- task=operation_task,
- actor=operation_task.actor)
+ model_storage=self._workflow_context.model,
+ resource_storage=self._workflow_context.resource,
+ deployment_id=self._workflow_context._deployment_id,
+ task_id=operation_task.id,
+ actor_id=api_task.actor.id)
self._task_id = operation_task.id
self._update_fields = None
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py
index 409ce0a..142ef74 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -35,9 +35,9 @@ def _success_task_handler(task, **kwargs):
@events.on_failure_task_signal.connect
-def _failure_operation_handler(task, **kwargs):
- task.logger.error('Event: Task failure: {task.name}'.format(task=task),
- exc_info=kwargs.get('exception', True))
+def _failure_operation_handler(task, exception, **kwargs):
+ error = '{0}: {1}'.format(type(exception).__name__, exception)
+ task.logger.error('Event: Task failure: {task.name} [{error}]'.format(task=task, error=error))
@events.start_workflow_signal.connect
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 1a47d4c..5da03dd 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -41,6 +41,7 @@ import jsonpickle
from aria.utils import imports
from aria.orchestrator.workflows.executor import base
+from aria.orchestrator.context import serialization
_IS_WIN = os.name == 'nt'
@@ -106,12 +107,7 @@ class ProcessExecutor(base.BaseExecutor):
file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json')
os.close(file_descriptor)
with open(arguments_json_path, 'wb') as f:
- f.write(jsonpickle.dumps({
- 'task_id': task.id,
- 'operation_mapping': task.operation_mapping,
- 'operation_inputs': task.inputs,
- 'port': self._server_port
- }))
+ f.write(jsonpickle.dumps(self._create_arguments_dict(task)))
env = os.environ.copy()
# See _update_env for plugin_prefix usage
@@ -176,6 +172,15 @@ class ProcessExecutor(base.BaseExecutor):
if self._stopped:
raise RuntimeError('Executor closed')
+ def _create_arguments_dict(self, task):
+ return {
+ 'task_id': task.id,
+ 'operation_mapping': task.operation_mapping,
+ 'operation_inputs': task.inputs,
+ 'port': self._server_port,
+ 'context': serialization.operation_context_to_dict(task.context),
+ }
+
def _update_env(self, env, plugin_prefix):
pythonpath_dirs = []
# If this is a plugin operation, plugin prefix will point to where
@@ -261,12 +266,13 @@ def _main():
task_id = arguments['task_id']
port = arguments['port']
messenger = _Messenger(task_id=task_id, port=port)
+ messenger.started()
operation_mapping = arguments['operation_mapping']
operation_inputs = arguments['operation_inputs']
- ctx = None
- messenger.started()
+ context_dict = arguments['context']
try:
+ ctx = serialization.operation_context_from_dict(context_dict)
task_func = imports.load_attribute(operation_mapping)
task_func(ctx=ctx, **operation_inputs)
messenger.succeeded()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/storage/filesystem_rapi.py
----------------------------------------------------------------------
diff --git a/aria/storage/filesystem_rapi.py b/aria/storage/filesystem_rapi.py
index f810f58..c6b3a81 100644
--- a/aria/storage/filesystem_rapi.py
+++ b/aria/storage/filesystem_rapi.py
@@ -87,7 +87,10 @@ class FileSystemResourceAPI(api.ResourceAPI):
os.makedirs(self.directory)
except (OSError, IOError):
pass
- os.makedirs(self.base_path)
+ try:
+ os.makedirs(self.base_path)
+ except (OSError, IOError):
+ pass
def read(self, entry_id, path=None, **_):
"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/__init__.py b/tests/__init__.py
index ae1e83e..d2858d2 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -12,3 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+import os
+
+ROOT_DIR = os.path.dirname(os.path.dirname(__file__))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/mock/context.py
----------------------------------------------------------------------
diff --git a/tests/mock/context.py b/tests/mock/context.py
index 1904140..5559675 100644
--- a/tests/mock/context.py
+++ b/tests/mock/context.py
@@ -13,15 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from aria import application_model_storage
+import aria
from aria.orchestrator import context
+from aria.storage.filesystem_rapi import FileSystemResourceAPI
from aria.storage.sql_mapi import SQLAlchemyModelAPI
from . import models
-def simple(api_kwargs, **kwargs):
- model_storage = application_model_storage(SQLAlchemyModelAPI, api_kwargs=api_kwargs)
+def simple(mapi_kwargs, resources_dir=None, **kwargs):
+ model_storage = aria.application_model_storage(SQLAlchemyModelAPI, api_kwargs=mapi_kwargs)
blueprint = models.get_blueprint()
model_storage.blueprint.put(blueprint)
deployment = models.get_deployment(blueprint)
@@ -56,10 +57,19 @@ def simple(api_kwargs, **kwargs):
)
model_storage.relationship_instance.put(relationship_instance)
+ # pytest tmpdir
+ if resources_dir:
+ resource_storage = aria.application_resource_storage(
+ FileSystemResourceAPI,
+ api_kwargs={'directory': resources_dir}
+ )
+ else:
+ resource_storage = None
+
final_kwargs = dict(
name='simple_context',
model_storage=model_storage,
- resource_storage=None,
+ resource_storage=resource_storage,
deployment_id=deployment.id,
workflow_name=models.WORKFLOW_NAME,
task_max_attempts=models.TASK_MAX_ATTEMPTS,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
new file mode 100644
index 0000000..ed0afcd
--- /dev/null
+++ b/tests/orchestrator/context/test_serialize.py
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+import aria
+from aria.storage.sql_mapi import SQLAlchemyModelAPI
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+from aria.orchestrator.context import serialization
+
+import tests
+from tests import mock
+from tests import storage
+
+TEST_FILE_CONTENT = 'CONTENT'
+TEST_FILE_ENTRY_ID = 'entry'
+TEST_FILE_NAME = 'test_file'
+
+
+def test_serialize_operation_context(context, executor, tmpdir):
+ test_file = tmpdir.join(TEST_FILE_NAME)
+ test_file.write(TEST_FILE_CONTENT)
+ resource = context.resource
+ resource.blueprint.upload(TEST_FILE_ENTRY_ID, str(test_file))
+ graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
+ eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
+ eng.execute()
+
+
+def test_illegal_serialize_of_memory_model_storage(memory_model_storage):
+ with pytest.raises(AssertionError):
+ serialization._serialize_sql_mapi_kwargs(memory_model_storage)
+
+
+@workflow
+def _mock_workflow(ctx, graph):
+ node_instance = ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+ node_instance.node.operations['test.op'] = {'operation': _operation_mapping()}
+ task = api.task.OperationTask.node_instance(instance=node_instance, name='test.op')
+ graph.add_tasks(task)
+ return graph
+
+
+@operation
+def _mock_operation(ctx):
+ # We test several things in this operation
+ # ctx.task, ctx.node, etc... tell us that the model storage was properly re-created
+ # a correct ctx.task.operation_mapping tells us we kept the correct task_id
+ assert ctx.task.operation_mapping == _operation_mapping()
+ # a correct ctx.node.name tells us we kept the correct actor_id
+ assert ctx.node.name == mock.models.DEPENDENCY_NODE_NAME
+ # a correct ctx.name tells us we kept the correct name
+ assert ctx.name is not None
+ assert ctx.name == ctx.task.name
+ # a correct ctx.deployment.name tells us we kept the correct deployment_id
+ assert ctx.deployment.name == mock.models.DEPLOYMENT_NAME
+ # Here we test that the resource storage was properly re-created
+ test_file_content = ctx.resource.blueprint.read(TEST_FILE_ENTRY_ID, TEST_FILE_NAME)
+ assert test_file_content == TEST_FILE_CONTENT
+
+
+def _operation_mapping():
+ return '{name}.{func.__name__}'.format(name=__name__, func=_mock_operation)
+
+
+@pytest.fixture
+def executor():
+ result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+ yield result
+ result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+ result = mock.context.simple(storage.get_sqlite_api_kwargs(str(tmpdir)),
+ resources_dir=str(tmpdir.join('resources')))
+ yield result
+ storage.release_sqlite_storage(result.model)
+
+
+@pytest.fixture
+def memory_model_storage():
+ result = aria.application_model_storage(
+ SQLAlchemyModelAPI, api_kwargs=storage.get_sqlite_api_kwargs())
+ yield result
+ storage.release_sqlite_storage(result)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/orchestrator/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py
index 6c3825c..fc11548 100644
--- a/tests/orchestrator/workflows/core/test_task.py
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -60,6 +60,7 @@ class TestOperationTask(object):
node.operations['aria.interfaces.lifecycle.create'] = {'plugin': 'plugin1'}
api_task, core_task = self._create_operation_task(ctx, node_instance)
storage_task = ctx.model.task.get_by_name(core_task.name)
+ assert storage_task.execution_id == ctx.execution.id
assert core_task.model_task == storage_task
assert core_task.name == api_task.name
assert core_task.operation_mapping == api_task.operation_mapping
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 7a11524..5ded4fb 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -14,7 +14,6 @@
# limitations under the License.
import logging
-import os
import uuid
from contextlib import contextmanager
@@ -29,7 +28,6 @@ except ImportError:
_celery = None
app = None
-import aria
from aria.storage import models
from aria.orchestrator import events
from aria.orchestrator.workflows.executor import (
@@ -38,6 +36,8 @@ from aria.orchestrator.workflows.executor import (
# celery
)
+import tests
+
def test_execute(executor):
expected_value = 'value'
@@ -80,11 +80,20 @@ class MockException(Exception):
pass
+class MockContext(object):
+
+ def __init__(self, *args, **kwargs):
+ pass
+
+ def __getattr__(self, item):
+ return None
+
+
class MockTask(object):
INFINITE_RETRIES = models.Task.INFINITE_RETRIES
- def __init__(self, func, inputs=None, ctx=None):
+ def __init__(self, func, inputs=None):
self.states = []
self.exception = None
self.id = str(uuid.uuid4())
@@ -94,7 +103,7 @@ class MockTask(object):
self.logger = logging.getLogger()
self.name = name
self.inputs = inputs or {}
- self.context = ctx
+ self.context = MockContext()
self.retry_count = 0
self.max_attempts = 1
self.plugin_id = None
@@ -112,7 +121,7 @@ class MockTask(object):
(thread.ThreadExecutor, {'pool_size': 2}),
# subprocess needs to load a tests module so we explicitly add the root directory as if
# the project has been installed in editable mode
- (process.ProcessExecutor, {'python_path': [os.path.dirname(os.path.dirname(aria.__file__))]}),
+ (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
# (celery.CeleryExecutor, {'app': app})
])
def executor(request):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index 364d354..0098f30 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -106,6 +106,15 @@ def mock_plugin(plugin_manager, tmpdir):
return plugin_manager.install(source=plugin_path)
+class MockContext(object):
+
+ def __init__(self, *args, **kwargs):
+ pass
+
+ def __getattr__(self, item):
+ return None
+
+
class MockTask(object):
INFINITE_RETRIES = models.Task.INFINITE_RETRIES
@@ -116,7 +125,7 @@ class MockTask(object):
self.logger = logging.getLogger()
self.name = operation
self.inputs = {}
- self.context = None
+ self.context = MockContext()
self.retry_count = 0
self.max_attempts = 1
self.plugin_id = plugin.id