You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/13 13:38:11 UTC
incubator-ariatosca git commit: test fixes,
moved close executor to be handled inside the executor
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-278-Remove-core-tasks 62fa985c6 -> 8044a035f
test fixes, moved close executor to be handled inside the executor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/8044a035
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/8044a035
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/8044a035
Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 8044a035fdb7f88e871e3a643b47b3e42c43da67
Parents: 62fa985
Author: max-orlov <ma...@gigaspaces.com>
Authored: Tue Jun 13 15:56:44 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Jun 13 15:56:44 2017 +0300
----------------------------------------------------------------------
aria/modeling/orchestration.py | 6 ++---
aria/orchestrator/workflow_runner.py | 9 +++++--
aria/orchestrator/workflows/core/engine.py | 4 +--
aria/orchestrator/workflows/core/translation.py | 5 +---
aria/orchestrator/workflows/executor/base.py | 15 +++++++----
tests/modeling/test_mixins.py | 8 ------
tests/orchestrator/context/test_operation.py | 28 +++++---------------
tests/orchestrator/context/test_serialize.py | 14 ++++++----
tests/orchestrator/context/test_toolbelt.py | 7 +----
tests/orchestrator/test_workflow_runner.py | 2 +-
10 files changed, 41 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 9ac885d..37aa431 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -461,7 +461,7 @@ class TaskBase(mixins.ModelMixin):
return cls(**instantiation_kwargs)
- def execute(self, ctx):
+ def execute(self, ctx, executor_kwargs=None):
from aria.orchestrator.context import operation
context_cls = self._context_cls or operation.BaseOperationContext
op_ctx = context_cls(
@@ -474,10 +474,10 @@ class TaskBase(mixins.ModelMixin):
execution_id=self.execution.id,
name=self.name
)
- executor = self._executor(**(self._executor_kwargs or {}))
+ executor = self._executor(**dict(self._executor_kwargs or {}, **(executor_kwargs or {})))
try:
return executor.execute(op_ctx)
- finally:
+ except BaseException:
executor.close()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 848c59b..961796a 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -39,7 +39,7 @@ class WorkflowRunner(object):
def __init__(self, workflow_name, service_id, inputs,
model_storage, resource_storage, plugin_manager,
- executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
+ executor=None, executor_kwargs=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
"""
Manages a single workflow execution on a given service.
@@ -84,9 +84,14 @@ class WorkflowRunner(object):
execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
self._tasks_graph = workflow_fn(ctx=workflow_context, **execution_inputs_dict)
- executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
+ # Set default executor and kwargs
+ if executor is None:
+ executor = ProcessExecutor
+ executor_kwargs = dict(plugin_manager=plugin_manager)
+
self._engine = Engine(
executor=executor,
+ executor_kwargs=executor_kwargs,
workflow_context=workflow_context,
tasks_graph=self._tasks_graph)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 02749b7..826a7a2 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -41,10 +41,10 @@ class Engine(logger.LoggerMixin):
super(Engine, self).__init__(**kwargs)
self._workflow_context = workflow_context
self._execution_graph = networkx.DiGraph()
+ self._executor_kwargs = executor_kwargs
translation.build_execution_graph(task_graph=tasks_graph,
execution_graph=self._execution_graph,
default_executor=executor,
- executor_kwargs=executor_kwargs,
execution=workflow_context.execution)
# Flush changes
@@ -115,7 +115,7 @@ class Engine(logger.LoggerMixin):
def _handle_executable_task(self, task):
if not task.stub_type:
events.sent_task_signal.send(task)
- task.execute(self._workflow_context)
+ task.execute(self._workflow_context, self._executor_kwargs)
def _handle_ended_tasks(self, task):
if task.status == models.Task.FAILED and not task.ignore_failure:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
index 93c173e..be4e34d 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -26,7 +26,6 @@ def build_execution_graph(
task_graph,
execution_graph,
default_executor,
- executor_kwargs,
execution,
start_stub_type=models.Task.START_WORKFLOW,
end_stub_type=models.Task.END_WORKFLOW,
@@ -54,8 +53,7 @@ def build_execution_graph(
if isinstance(api_task, api.task.OperationTask):
operation_task = models.Task.from_api_task(api_task=api_task,
- executor=default_executor,
- executor_kwargs=executor_kwargs)
+ executor=default_executor)
_add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
elif isinstance(api_task, api.task.WorkflowTask):
# Build the graph recursively while adding start and end markers
@@ -63,7 +61,6 @@ def build_execution_graph(
task_graph=api_task,
execution_graph=execution_graph,
default_executor=default_executor,
- executor_kwargs=executor_kwargs,
execution=execution,
start_stub_type=models.Task.START_SUBWROFKLOW,
end_stub_type=models.Task.END_SUBWORKFLOW,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 69e4a39..089126d 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -55,17 +55,22 @@ class BaseExecutor(logger.LoggerMixin):
events.start_task_signal.send(ctx)
ctx.model.task.update(ctx.task)
- @staticmethod
- def _task_failed(ctx, exception, traceback=None):
+ def _task_failed(self, ctx, exception, traceback=None):
events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback)
ctx.model.task.update(ctx.task)
+ self.close()
- @staticmethod
- def _task_succeeded(ctx):
+ def _task_succeeded(self, ctx):
events.on_success_task_signal.send(ctx)
ctx.model.task.update(ctx.task)
+ self.close()
class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method
- def execute(self, ctx):
+
+ def __init__(self, *args, **kwargs):
+ # TODO: the executor kwargs delivery system is bad, so we need to aps the kwargs each time (and they are not persisted - this is bad!)
+ super(StubTaskExecutor, self).__init__()
+
+ def execute(self, ctx, *args, **kwargs):
ctx.task.status = ctx.task.SUCCESS
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/tests/modeling/test_mixins.py
----------------------------------------------------------------------
diff --git a/tests/modeling/test_mixins.py b/tests/modeling/test_mixins.py
index 4101fff..2c91a4b 100644
--- a/tests/modeling/test_mixins.py
+++ b/tests/modeling/test_mixins.py
@@ -216,11 +216,3 @@ def test_strict_list():
assert_strict(strict_class)
with pytest.raises(ValueFormatException):
strict_class.strict_list[0] = 1
-
-
-def test_task():
- from aria.orchestrator.workflows.executor import process
- task = modeling.models.StubTask()
- task.executor = process.ProcessExecutor
- e = task.executor
- pass
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index 1cb8f65..e8f1a63 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -50,21 +50,8 @@ def ctx(tmpdir):
@pytest.fixture
-def process_executor():
- ex = process.ProcessExecutor(**dict(python_path=tests.ROOT_DIR))
- try:
- yield ex
- finally:
- ex.close()
-
-
-@pytest.fixture
def thread_executor():
- ex = thread.ThreadExecutor()
- try:
- yield ex
- finally:
- ex.close()
+ return thread.ThreadExecutor
@pytest.fixture
@@ -263,7 +250,7 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
@pytest.fixture(params=[
(thread.ThreadExecutor, {}),
- # (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
+ (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
])
def executor(request):
return request.param
@@ -299,14 +286,13 @@ def test_node_operation_logging(ctx, executor):
arguments=arguments
)
)
-
execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor[0],
- executor_kwargs=dict(ctx=ctx))
+ executor_kwargs=executor[1])
_assert_loggins(ctx, arguments)
def test_relationship_operation_logging(ctx, executor):
- interface_name, operation_name = mock.operations.RELATIONHIP_OPERATIONS_INSTALL[0]
+ interface_name, operation_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[0]
relationship = ctx.model.relationship.list()[0]
arguments = {
@@ -334,7 +320,7 @@ def test_relationship_operation_logging(ctx, executor):
)
)
- execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor[0], executor_kwargs=executor[1])
_assert_loggins(ctx, arguments)
@@ -391,7 +377,7 @@ def test_attribute_consumption(ctx, executor, dataholder):
)
)
- execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor[0], executor_kwargs=executor[1])
target_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
assert len(source_node.attributes) == len(target_node.attributes) == 2
@@ -414,7 +400,7 @@ def _assert_loggins(ctx, arguments):
assert len(executions) == 1
execution = executions[0]
- tasks = ctx.model.task.list()
+ tasks = ctx.model.task.list(filters={'stub_type': None})
assert len(tasks) == 1
task = tasks[0]
assert len(task.logs) == 4
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index 4db7bf4..f7e441e 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -28,13 +28,14 @@ TEST_FILE_ENTRY_ID = 'entry'
TEST_FILE_NAME = 'test_file'
-def test_serialize_operation_context(context, executor, tmpdir):
+def test_serialize_operation_context(context, executor, executor_kwargs, tmpdir):
test_file = tmpdir.join(TEST_FILE_NAME)
test_file.write(TEST_FILE_CONTENT)
resource = context.resource
resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file))
graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
+ eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph,
+ executor_kwargs=executor_kwargs)
eng.execute()
@@ -82,9 +83,12 @@ def _operation_mapping():
@pytest.fixture
def executor():
- result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
- yield result
- result.close()
+ return process.ProcessExecutor
+
+
+@pytest.fixture
+def executor_kwargs():
+ return dict(python_path=[tests.ROOT_DIR])
@pytest.fixture
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/tests/orchestrator/context/test_toolbelt.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py
index 4de9e55..df54d9d 100644
--- a/tests/orchestrator/context/test_toolbelt.py
+++ b/tests/orchestrator/context/test_toolbelt.py
@@ -41,12 +41,7 @@ def workflow_context(tmpdir):
@pytest.fixture
def executor():
- result = thread.ThreadExecutor()
- try:
- yield result
- finally:
- result.close()
-
+ return thread.ThreadExecutor
@pytest.fixture
def dataholder(tmpdir):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index c5a62ae..baf45d7 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -99,7 +99,7 @@ def test_default_executor(request):
with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls:
_create_workflow_runner(request, mock_workflow)
_, engine_kwargs = mock_engine_cls.call_args
- assert isinstance(engine_kwargs.get('executor'), ProcessExecutor)
+ assert engine_kwargs.get('executor') == ProcessExecutor
def test_custom_executor(request):