You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/13 13:38:11 UTC

incubator-ariatosca git commit: test fixes, moved close executor to be handled inside the executor

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-278-Remove-core-tasks 62fa985c6 -> 8044a035f


test fixes, moved close executor to be handled inside the executor


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/8044a035
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/8044a035
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/8044a035

Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 8044a035fdb7f88e871e3a643b47b3e42c43da67
Parents: 62fa985
Author: max-orlov <ma...@gigaspaces.com>
Authored: Tue Jun 13 15:56:44 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Jun 13 15:56:44 2017 +0300

----------------------------------------------------------------------
 aria/modeling/orchestration.py                  |  6 ++---
 aria/orchestrator/workflow_runner.py            |  9 +++++--
 aria/orchestrator/workflows/core/engine.py      |  4 +--
 aria/orchestrator/workflows/core/translation.py |  5 +---
 aria/orchestrator/workflows/executor/base.py    | 15 +++++++----
 tests/modeling/test_mixins.py                   |  8 ------
 tests/orchestrator/context/test_operation.py    | 28 +++++---------------
 tests/orchestrator/context/test_serialize.py    | 14 ++++++----
 tests/orchestrator/context/test_toolbelt.py     |  7 +----
 tests/orchestrator/test_workflow_runner.py      |  2 +-
 10 files changed, 41 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 9ac885d..37aa431 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -461,7 +461,7 @@ class TaskBase(mixins.ModelMixin):
 
         return cls(**instantiation_kwargs)
 
-    def execute(self, ctx):
+    def execute(self, ctx, executor_kwargs=None):
         from aria.orchestrator.context import operation
         context_cls = self._context_cls or operation.BaseOperationContext
         op_ctx = context_cls(
@@ -474,10 +474,10 @@ class TaskBase(mixins.ModelMixin):
             execution_id=self.execution.id,
             name=self.name
         )
-        executor = self._executor(**(self._executor_kwargs or {}))
+        executor = self._executor(**dict(self._executor_kwargs or {}, **(executor_kwargs or {})))
         try:
             return executor.execute(op_ctx)
-        finally:
+        except BaseException:
             executor.close()
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 848c59b..961796a 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -39,7 +39,7 @@ class WorkflowRunner(object):
 
     def __init__(self, workflow_name, service_id, inputs,
                  model_storage, resource_storage, plugin_manager,
-                 executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
+                 executor=None, executor_kwargs=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
                  task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
         """
         Manages a single workflow execution on a given service.
@@ -84,9 +84,14 @@ class WorkflowRunner(object):
         execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
         self._tasks_graph = workflow_fn(ctx=workflow_context, **execution_inputs_dict)
 
-        executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
+        # Set default executor and kwargs
+        if executor is None:
+            executor = ProcessExecutor
+            executor_kwargs = dict(plugin_manager=plugin_manager)
+
         self._engine = Engine(
             executor=executor,
+            executor_kwargs=executor_kwargs,
             workflow_context=workflow_context,
             tasks_graph=self._tasks_graph)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 02749b7..826a7a2 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -41,10 +41,10 @@ class Engine(logger.LoggerMixin):
         super(Engine, self).__init__(**kwargs)
         self._workflow_context = workflow_context
         self._execution_graph = networkx.DiGraph()
+        self._executor_kwargs = executor_kwargs
         translation.build_execution_graph(task_graph=tasks_graph,
                                           execution_graph=self._execution_graph,
                                           default_executor=executor,
-                                          executor_kwargs=executor_kwargs,
                                           execution=workflow_context.execution)
 
         # Flush changes
@@ -115,7 +115,7 @@ class Engine(logger.LoggerMixin):
     def _handle_executable_task(self, task):
         if not task.stub_type:
             events.sent_task_signal.send(task)
-        task.execute(self._workflow_context)
+        task.execute(self._workflow_context, self._executor_kwargs)
 
     def _handle_ended_tasks(self, task):
         if task.status == models.Task.FAILED and not task.ignore_failure:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
index 93c173e..be4e34d 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -26,7 +26,6 @@ def build_execution_graph(
         task_graph,
         execution_graph,
         default_executor,
-        executor_kwargs,
         execution,
         start_stub_type=models.Task.START_WORKFLOW,
         end_stub_type=models.Task.END_WORKFLOW,
@@ -54,8 +53,7 @@ def build_execution_graph(
 
         if isinstance(api_task, api.task.OperationTask):
             operation_task = models.Task.from_api_task(api_task=api_task,
-                                                       executor=default_executor,
-                                                       executor_kwargs=executor_kwargs)
+                                                       executor=default_executor)
             _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
         elif isinstance(api_task, api.task.WorkflowTask):
             # Build the graph recursively while adding start and end markers
@@ -63,7 +61,6 @@ def build_execution_graph(
                 task_graph=api_task,
                 execution_graph=execution_graph,
                 default_executor=default_executor,
-                executor_kwargs=executor_kwargs,
                 execution=execution,
                 start_stub_type=models.Task.START_SUBWROFKLOW,
                 end_stub_type=models.Task.END_SUBWORKFLOW,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 69e4a39..089126d 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -55,17 +55,22 @@ class BaseExecutor(logger.LoggerMixin):
         events.start_task_signal.send(ctx)
         ctx.model.task.update(ctx.task)
 
-    @staticmethod
-    def _task_failed(ctx, exception, traceback=None):
+    def _task_failed(self, ctx, exception, traceback=None):
         events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback)
         ctx.model.task.update(ctx.task)
+        self.close()
 
-    @staticmethod
-    def _task_succeeded(ctx):
+    def _task_succeeded(self, ctx):
         events.on_success_task_signal.send(ctx)
         ctx.model.task.update(ctx.task)
+        self.close()
 
 
 class StubTaskExecutor(BaseExecutor):                                                               # pylint: disable=abstract-method
-    def execute(self, ctx):
+
+    def __init__(self, *args, **kwargs):
+        # TODO: the executor kwargs delivery system is bad, so we need to aps the kwargs each time (and they are not persisted - this is bad!)
+        super(StubTaskExecutor, self).__init__()
+
+    def execute(self, ctx, *args, **kwargs):
         ctx.task.status = ctx.task.SUCCESS

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/tests/modeling/test_mixins.py
----------------------------------------------------------------------
diff --git a/tests/modeling/test_mixins.py b/tests/modeling/test_mixins.py
index 4101fff..2c91a4b 100644
--- a/tests/modeling/test_mixins.py
+++ b/tests/modeling/test_mixins.py
@@ -216,11 +216,3 @@ def test_strict_list():
     assert_strict(strict_class)
     with pytest.raises(ValueFormatException):
         strict_class.strict_list[0] = 1
-
-
-def test_task():
-    from aria.orchestrator.workflows.executor import process
-    task = modeling.models.StubTask()
-    task.executor = process.ProcessExecutor
-    e = task.executor
-    pass
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index 1cb8f65..e8f1a63 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -50,21 +50,8 @@ def ctx(tmpdir):
 
 
 @pytest.fixture
-def process_executor():
-    ex = process.ProcessExecutor(**dict(python_path=tests.ROOT_DIR))
-    try:
-        yield ex
-    finally:
-        ex.close()
-
-
-@pytest.fixture
 def thread_executor():
-    ex = thread.ThreadExecutor()
-    try:
-        yield ex
-    finally:
-        ex.close()
+    return thread.ThreadExecutor
 
 
 @pytest.fixture
@@ -263,7 +250,7 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
 
 @pytest.fixture(params=[
     (thread.ThreadExecutor, {}),
-    # (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
+    (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
 ])
 def executor(request):
     return request.param
@@ -299,14 +286,13 @@ def test_node_operation_logging(ctx, executor):
                 arguments=arguments
             )
         )
-
     execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor[0],
-            executor_kwargs=dict(ctx=ctx))
+            executor_kwargs=executor[1])
     _assert_loggins(ctx, arguments)
 
 
 def test_relationship_operation_logging(ctx, executor):
-    interface_name, operation_name = mock.operations.RELATIONHIP_OPERATIONS_INSTALL[0]
+    interface_name, operation_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[0]
 
     relationship = ctx.model.relationship.list()[0]
     arguments = {
@@ -334,7 +320,7 @@ def test_relationship_operation_logging(ctx, executor):
             )
         )
 
-    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor[0], executor_kwargs=executor[1])
     _assert_loggins(ctx, arguments)
 
 
@@ -391,7 +377,7 @@ def test_attribute_consumption(ctx, executor, dataholder):
             )
         )
 
-    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor[0], executor_kwargs=executor[1])
     target_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
 
     assert len(source_node.attributes) == len(target_node.attributes) == 2
@@ -414,7 +400,7 @@ def _assert_loggins(ctx, arguments):
     assert len(executions) == 1
     execution = executions[0]
 
-    tasks = ctx.model.task.list()
+    tasks = ctx.model.task.list(filters={'stub_type': None})
     assert len(tasks) == 1
     task = tasks[0]
     assert len(task.logs) == 4

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index 4db7bf4..f7e441e 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -28,13 +28,14 @@ TEST_FILE_ENTRY_ID = 'entry'
 TEST_FILE_NAME = 'test_file'
 
 
-def test_serialize_operation_context(context, executor, tmpdir):
+def test_serialize_operation_context(context, executor, executor_kwargs, tmpdir):
     test_file = tmpdir.join(TEST_FILE_NAME)
     test_file.write(TEST_FILE_CONTENT)
     resource = context.resource
     resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file))
     graph = _mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
+    eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph,
+                        executor_kwargs=executor_kwargs)
     eng.execute()
 
 
@@ -82,9 +83,12 @@ def _operation_mapping():
 
 @pytest.fixture
 def executor():
-    result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
-    yield result
-    result.close()
+    return process.ProcessExecutor
+
+
+@pytest.fixture
+def executor_kwargs():
+    return dict(python_path=[tests.ROOT_DIR])
 
 
 @pytest.fixture

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/tests/orchestrator/context/test_toolbelt.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py
index 4de9e55..df54d9d 100644
--- a/tests/orchestrator/context/test_toolbelt.py
+++ b/tests/orchestrator/context/test_toolbelt.py
@@ -41,12 +41,7 @@ def workflow_context(tmpdir):
 
 @pytest.fixture
 def executor():
-    result = thread.ThreadExecutor()
-    try:
-        yield result
-    finally:
-        result.close()
-
+    return thread.ThreadExecutor
 
 @pytest.fixture
 def dataholder(tmpdir):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8044a035/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index c5a62ae..baf45d7 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -99,7 +99,7 @@ def test_default_executor(request):
     with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls:
         _create_workflow_runner(request, mock_workflow)
         _, engine_kwargs = mock_engine_cls.call_args
-        assert isinstance(engine_kwargs.get('executor'), ProcessExecutor)
+        assert engine_kwargs.get('executor') == ProcessExecutor
 
 
 def test_custom_executor(request):