You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/10/19 12:01:17 UTC
[07/10] incubator-ariatosca git commit: ARIA-299 Resuming canceled
execution with frozen task fails
ARIA-299 Resuming canceled execution with frozen task fails
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/c63059a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/c63059a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/c63059a2
Branch: refs/heads/new_wagon_setuptools
Commit: c63059a2061927c10cc32571c161ea3faf9f933a
Parents: 9de6bf5
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed Jul 5 16:16:39 2017 +0300
Committer: Ran Ziv <ra...@gigaspaces.com>
Committed: Mon Jul 10 17:00:46 2017 +0300
----------------------------------------------------------------------
.../workflows/core/events_handler.py | 4 +
aria/orchestrator/workflows/executor/base.py | 2 +-
aria/orchestrator/workflows/executor/thread.py | 8 +-
tests/orchestrator/context/test_serialize.py | 6 +-
.../orchestrator/execution_plugin/test_local.py | 6 +-
tests/orchestrator/execution_plugin/test_ssh.py | 6 +-
tests/orchestrator/test_workflow_runner.py | 206 ++++++++++++++++---
.../workflows/executor/test_process_executor.py | 66 +++---
...process_executor_concurrent_modifications.py | 6 +-
.../executor/test_process_executor_extension.py | 6 +-
.../test_process_executor_tracked_changes.py | 6 +-
11 files changed, 251 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c63059a2/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 769c1a8..37801de 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -123,6 +123,10 @@ def _workflow_resume(workflow_context, *args, **kwargs):
with workflow_context.persist_changes:
execution = workflow_context.execution
execution.status = execution.PENDING
+ # Any non ended task would be put back to pending state
+ for task in execution.tasks:
+ if not task.has_ended():
+ task.status = task.PENDING
@events.on_cancelling_workflow_signal.connect
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c63059a2/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index ec1a0c7..e7d03ea 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -49,7 +49,7 @@ class BaseExecutor(logger.LoggerMixin):
"""
pass
- def terminate(self, ctx):
+ def terminate(self, task_id):
"""
Terminate the executing task
:return:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c63059a2/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index d9dcdf8..6cef2c0 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -36,9 +36,10 @@ class ThreadExecutor(BaseExecutor):
Note: This executor is incapable of running plugin operations.
"""
- def __init__(self, pool_size=1, *args, **kwargs):
+ def __init__(self, pool_size=1, close_timeout=5, *args, **kwargs):
super(ThreadExecutor, self).__init__(*args, **kwargs)
self._stopped = False
+ self._close_timeout = close_timeout
self._queue = Queue.Queue()
self._pool = []
for i in range(pool_size):
@@ -54,7 +55,10 @@ class ThreadExecutor(BaseExecutor):
def close(self):
self._stopped = True
for thread in self._pool:
- thread.join()
+ if self._close_timeout is None:
+ thread.join()
+ else:
+ thread.join(self._close_timeout)
def _processor(self):
while not self._stopped:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c63059a2/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index 6046a16..091e23c 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -87,8 +87,10 @@ def _operation_mapping():
@pytest.fixture
def executor():
result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
- yield result
- result.close()
+ try:
+ yield result
+ finally:
+ result.close()
@pytest.fixture
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c63059a2/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 5b94917..e64e998 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -509,8 +509,10 @@ if __name__ == '__main__':
@pytest.fixture
def executor(self):
result = process.ProcessExecutor()
- yield result
- result.close()
+ try:
+ yield result
+ finally:
+ result.close()
@pytest.fixture
def workflow_context(self, tmpdir):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c63059a2/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index 4fa8184..a96c91d 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -277,8 +277,10 @@ class TestWithActualSSHServer(object):
@pytest.fixture
def executor(self):
result = process.ProcessExecutor()
- yield result
- result.close()
+ try:
+ yield result
+ finally:
+ result.close()
@pytest.fixture
def workflow_context(self, tmpdir):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c63059a2/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index e640c7d..a77d727 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -14,6 +14,7 @@
# limitations under the License.
import json
+import time
from threading import Thread, Event
from datetime import datetime
@@ -23,7 +24,7 @@ import pytest
from aria.modeling import exceptions as modeling_exceptions
from aria.modeling import models
from aria.orchestrator import exceptions
-from aria.orchestrator.events import on_cancelled_workflow_signal
+from aria.orchestrator import events
from aria.orchestrator.workflow_runner import WorkflowRunner
from aria.orchestrator.workflows.executor.process import ProcessExecutor
from aria.orchestrator.workflows import api
@@ -46,9 +47,10 @@ from ..fixtures import ( # pylint: disable=unused-import
resource_storage as resource
)
-events = {
+custom_events = {
'is_resumed': Event(),
'is_active': Event(),
+ 'execution_cancelled': Event(),
'execution_ended': Event()
}
@@ -57,6 +59,10 @@ class TimeoutError(BaseException):
pass
+class FailingTask(BaseException):
+ pass
+
+
def test_undeclared_workflow(request):
# validating a proper error is raised when the workflow is not declared in the service
with pytest.raises(exceptions.UndeclaredWorkflowError):
@@ -318,43 +324,57 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
class TestResumableWorkflows(object):
- def test_resume_workflow(self, workflow_context, executor):
- node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
- node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
- self._create_interface(workflow_context, node, mock_resuming_task)
+ def _create_initial_workflow_runner(
+ self, workflow_context, workflow, executor, inputs=None):
service = workflow_context.service
service.workflows['custom_workflow'] = tests_mock.models.create_operation(
'custom_workflow',
- operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)}
+ operation_kwargs={
+ 'function': '{0}.{1}'.format(__name__, workflow.__name__),
+ 'inputs': dict((k, models.Input.wrap(k, v)) for k, v in (inputs or {}).items())
+ }
)
workflow_context.model.service.update(service)
wf_runner = WorkflowRunner(
service_id=workflow_context.service.id,
- inputs={},
+ inputs=inputs or {},
model_storage=workflow_context.model,
resource_storage=workflow_context.resource,
plugin_manager=None,
workflow_name='custom_workflow',
executor=executor)
+ return wf_runner
+
+ @staticmethod
+ def _wait_for_active_and_cancel(workflow_runner):
+ if custom_events['is_active'].wait(60) is False:
+ raise TimeoutError("is_active wasn't set to True")
+ workflow_runner.cancel()
+ if custom_events['execution_cancelled'].wait(60) is False:
+ raise TimeoutError("Execution did not end")
+
+ def test_resume_workflow(self, workflow_context, thread_executor):
+ node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+ self._create_interface(workflow_context, node, mock_resuming_task)
+
+ wf_runner = self._create_initial_workflow_runner(
+ workflow_context, mock_parallel_workflow, thread_executor)
+
wf_thread = Thread(target=wf_runner.execute)
wf_thread.daemon = True
wf_thread.start()
# Wait for the execution to start
- if events['is_active'].wait(5) is False:
- raise TimeoutError("is_active wasn't set to True")
- wf_runner.cancel()
-
- if events['execution_ended'].wait(60) is False:
- raise TimeoutError("Execution did not end")
+ self._wait_for_active_and_cancel(wf_runner)
tasks = workflow_context.model.task.list(filters={'_stub_type': None})
assert any(task.status == task.SUCCESS for task in tasks)
- assert any(task.status in (task.FAILED, task.RETRYING) for task in tasks)
- events['is_resumed'].set()
- assert any(task.status in (task.FAILED, task.RETRYING) for task in tasks)
+ assert any(task.status == task.RETRYING for task in tasks)
+ custom_events['is_resumed'].set()
+ assert any(task.status == task.RETRYING for task in tasks)
# Create a new workflow runner, with an existing execution id. This would cause
# the old execution to restart.
@@ -365,7 +385,7 @@ class TestResumableWorkflows(object):
resource_storage=workflow_context.resource,
plugin_manager=None,
execution_id=wf_runner.execution.id,
- executor=executor)
+ executor=thread_executor)
new_wf_runner.execute()
@@ -374,9 +394,93 @@ class TestResumableWorkflows(object):
assert node.attributes['invocations'].value == 3
assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+ def test_resume_started_task(self, workflow_context, thread_executor):
+ node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+ self._create_interface(workflow_context, node, mock_stuck_task)
+
+ wf_runner = self._create_initial_workflow_runner(
+ workflow_context, mock_single_task_workflow, thread_executor)
+
+ wf_thread = Thread(target=wf_runner.execute)
+ wf_thread.daemon = True
+ wf_thread.start()
+
+ self._wait_for_active_and_cancel(wf_runner)
+ task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
+ assert node.attributes['invocations'].value == 1
+ assert task.status == task.STARTED
+ assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
+ wf_runner.execution.CANCELLING)
+ custom_events['is_resumed'].set()
+
+ new_thread_executor = thread.ThreadExecutor()
+ try:
+ new_wf_runner = WorkflowRunner(
+ service_id=wf_runner.service.id,
+ inputs={},
+ model_storage=workflow_context.model,
+ resource_storage=workflow_context.resource,
+ plugin_manager=None,
+ execution_id=wf_runner.execution.id,
+ executor=new_thread_executor)
+
+ new_wf_runner.execute()
+ finally:
+ new_thread_executor.close()
+
+ # Wait for it to finish and assert changes.
+ assert node.attributes['invocations'].value == 2
+ assert task.status == task.SUCCESS
+ assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
+ def test_resume_failed_task(self, workflow_context, thread_executor):
+ node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+ self._create_interface(workflow_context, node, mock_failed_before_resuming)
+
+ wf_runner = self._create_initial_workflow_runner(
+ workflow_context, mock_single_task_workflow, thread_executor)
+ wf_thread = Thread(target=wf_runner.execute)
+ wf_thread.setDaemon(True)
+ wf_thread.start()
+
+ self._wait_for_active_and_cancel(wf_runner)
+
+ task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
+ assert node.attributes['invocations'].value == 2
+ assert task.status == task.STARTED
+ assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
+ wf_runner.execution.CANCELLING)
+
+ custom_events['is_resumed'].set()
+ assert node.attributes['invocations'].value == 2
+
+ # Create a new workflow runner, with an existing execution id. This would cause
+ # the old execution to restart.
+ new_thread_executor = thread.ThreadExecutor()
+ try:
+ new_wf_runner = WorkflowRunner(
+ service_id=wf_runner.service.id,
+ inputs={},
+ model_storage=workflow_context.model,
+ resource_storage=workflow_context.resource,
+ plugin_manager=None,
+ execution_id=wf_runner.execution.id,
+ executor=new_thread_executor)
+
+ new_wf_runner.execute()
+ finally:
+ new_thread_executor.close()
+
+ # Wait for it to finish and assert changes.
+ assert node.attributes['invocations'].value == task.max_attempts - 1
+ assert task.status == task.SUCCESS
+ assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
@staticmethod
@pytest.fixture
- def executor():
+ def thread_executor():
result = thread.ThreadExecutor()
try:
yield result
@@ -417,16 +521,23 @@ class TestResumableWorkflows(object):
@pytest.fixture(autouse=True)
def register_to_events(self):
+ def execution_cancelled(*args, **kwargs):
+ custom_events['execution_cancelled'].set()
+
def execution_ended(*args, **kwargs):
- events['execution_ended'].set()
+ custom_events['execution_ended'].set()
- on_cancelled_workflow_signal.connect(execution_ended)
+ events.on_cancelled_workflow_signal.connect(execution_cancelled)
+ events.on_failure_workflow_signal.connect(execution_ended)
yield
- on_cancelled_workflow_signal.disconnect(execution_ended)
+ events.on_cancelled_workflow_signal.disconnect(execution_cancelled)
+ events.on_failure_workflow_signal.disconnect(execution_ended)
+ for event in custom_events.values():
+ event.clear()
@workflow
-def mock_workflow(ctx, graph):
+def mock_parallel_workflow(ctx, graph):
node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
graph.add_tasks(
api.task.OperationTask(
@@ -441,8 +552,51 @@ def mock_resuming_task(ctx):
ctx.node.attributes['invocations'] += 1
if ctx.node.attributes['invocations'] != 1:
- events['is_active'].set()
- if not events['is_resumed'].isSet():
+ custom_events['is_active'].set()
+ if not custom_events['is_resumed'].isSet():
# if resume was called, increase by one. o/w fail the execution - second task should
# fail as long it was not a part of resuming the workflow
- raise BaseException("wasn't resumed yet")
+ raise FailingTask("wasn't resumed yet")
+
+
+@workflow
+def mock_single_task_workflow(ctx, graph):
+ node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ graph.add_tasks(
+ api.task.OperationTask(node,
+ interface_name='aria.interfaces.lifecycle',
+ operation_name='create',
+ retry_interval=1,
+ max_attempts=10),
+ )
+
+
+@operation
+def mock_failed_before_resuming(ctx):
+ """
+ The task should run atmost ctx.task.max_attempts - 1 times, and only then pass.
+ overall, the number of invocations should be ctx.task.max_attempts - 1
+ """
+ ctx.node.attributes['invocations'] += 1
+
+ if ctx.node.attributes['invocations'] == 2:
+ custom_events['is_active'].set()
+ # unfreeze the thread only when all of the invocations are done
+ while ctx.node.attributes['invocations'] < ctx.task.max_attempts - 1:
+ time.sleep(5)
+
+ elif ctx.node.attributes['invocations'] == ctx.task.max_attempts - 1:
+ # pass only just before the end.
+ return
+ else:
+ # fail o.w.
+ raise FailingTask("stop this task")
+
+
+@operation
+def mock_stuck_task(ctx):
+ ctx.node.attributes['invocations'] += 1
+ while not custom_events['is_resumed'].isSet():
+ if not custom_events['is_active'].isSet():
+ custom_events['is_active'].set()
+ time.sleep(5)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c63059a2/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index f8fc567..e050d18 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -43,36 +43,25 @@ from . import MockContext
class TestProcessExecutor(object):
- def test_plugin_execution(self, executor, mock_plugin, model):
+ def test_plugin_execution(self, executor, mock_plugin, model, queue):
ctx = MockContext(
model,
task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id)
)
- queue = Queue.Queue()
-
- def handler(_, exception=None, **kwargs):
- queue.put(exception)
-
- events.on_success_task_signal.connect(handler)
- events.on_failure_task_signal.connect(handler)
- try:
- executor.execute(ctx)
- error = queue.get(timeout=60)
- # tests/resources/plugins/mock-plugin1 is the plugin installed
- # during this tests setup. The module mock_plugin1 contains a single
- # operation named "operation" which calls an entry point defined in the plugin's
- # setup.py. This entry points simply prints 'mock-plugin-output' to stdout.
- # The "operation" operation that called this subprocess, then raises a RuntimeError
- # with that subprocess output as the error message.
- # This is what we assert here. This tests checks that both the PYTHONPATH (operation)
- # and PATH (entry point) are properly updated in the subprocess in which the task is
- # running.
- assert isinstance(error, RuntimeError)
- assert error.message == 'mock-plugin-output'
- finally:
- events.on_success_task_signal.disconnect(handler)
- events.on_failure_task_signal.disconnect(handler)
+ executor.execute(ctx)
+ error = queue.get(timeout=60)
+ # tests/resources/plugins/mock-plugin1 is the plugin installed
+ # during this tests setup. The module mock_plugin1 contains a single
+ # operation named "operation" which calls an entry point defined in the plugin's
+ # setup.py. This entry points simply prints 'mock-plugin-output' to stdout.
+ # The "operation" operation that called this subprocess, then raises a RuntimeError
+ # with that subprocess output as the error message.
+ # This is what we assert here. This tests checks that both the PYTHONPATH (operation)
+ # and PATH (entry point) are properly updated in the subprocess in which the task is
+ # running.
+ assert isinstance(error, RuntimeError)
+ assert error.message == 'mock-plugin-output'
def test_closed(self, executor, model):
executor.close()
@@ -127,6 +116,23 @@ while True:
# making the test more readable
assert pid not in psutil.pids()
+
+@pytest.fixture
+def queue():
+ _queue = Queue.Queue()
+
+ def handler(_, exception=None, **kwargs):
+ _queue.put(exception)
+
+ events.on_success_task_signal.connect(handler)
+ events.on_failure_task_signal.connect(handler)
+ try:
+ yield _queue
+ finally:
+ events.on_success_task_signal.disconnect(handler)
+ events.on_failure_task_signal.disconnect(handler)
+
+
@pytest.fixture
def fs_test_holder(tmpdir):
dataholder_path = str(tmpdir.join('dataholder'))
@@ -136,11 +142,11 @@ def fs_test_holder(tmpdir):
@pytest.fixture
def executor(plugin_manager):
- result = process.ProcessExecutor(
- plugin_manager=plugin_manager,
- python_path=[tests.ROOT_DIR])
- yield result
- result.close()
+ result = process.ProcessExecutor(plugin_manager=plugin_manager, python_path=[tests.ROOT_DIR])
+ try:
+ yield result
+ finally:
+ result.close()
@pytest.fixture
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c63059a2/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
index 6163c09..86a2edf 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
@@ -120,8 +120,10 @@ def _test(context, executor, lock_files, func, dataholder, expected_failure):
@pytest.fixture
def executor():
result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
- yield result
- result.close()
+ try:
+ yield result
+ finally:
+ result.close()
@pytest.fixture
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c63059a2/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 6ed3e2b..b26fa43 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -86,8 +86,10 @@ def _mock_operation(ctx, **operation_arguments):
@pytest.fixture
def executor():
result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
- yield result
- result.close()
+ try:
+ yield result
+ finally:
+ result.close()
@pytest.fixture
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c63059a2/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index a74a473..47ee2f7 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -155,8 +155,10 @@ def _operation_mapping(func):
@pytest.fixture
def executor():
result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
- yield result
- result.close()
+ try:
+ yield result
+ finally:
+ result.close()
@pytest.fixture