You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/07/10 09:41:25 UTC
[6/6] incubator-ariatosca git commit: review 1 fixes
review 1 fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/d31c011d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/d31c011d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/d31c011d
Branch: refs/heads/ARIA-299-Resuming-canceled-execution-with-frozen-task-fails
Commit: d31c011dfc483050c87a3a025465b0ffc753944b
Parents: bd619bd
Author: max-orlov <ma...@gigaspaces.com>
Authored: Mon Jul 10 12:36:22 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Mon Jul 10 12:41:08 2017 +0300
----------------------------------------------------------------------
.../workflows/core/events_handler.py | 8 +-
.../orchestrator/execution_plugin/test_local.py | 6 +-
tests/orchestrator/execution_plugin/test_ssh.py | 6 +-
tests/orchestrator/test_workflow_runner.py | 98 +++++++++++++++-----
4 files changed, 87 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d31c011d/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index eb6f271..37801de 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -114,10 +114,6 @@ def _workflow_cancelled(workflow_context, *args, **kwargs):
elif execution.status in (execution.SUCCEEDED, execution.FAILED):
_log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
else:
- # Any non ended task would be put back to pending state
- for task in execution.tasks:
- if not task.has_ended():
- task.status = task.PENDING
execution.status = execution.CANCELLED
execution.ended_at = datetime.utcnow()
@@ -127,6 +123,10 @@ def _workflow_resume(workflow_context, *args, **kwargs):
with workflow_context.persist_changes:
execution = workflow_context.execution
execution.status = execution.PENDING
+ # Any non ended task would be put back to pending state
+ for task in execution.tasks:
+ if not task.has_ended():
+ task.status = task.PENDING
@events.on_cancelling_workflow_signal.connect
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d31c011d/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 5b94917..e64e998 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -509,8 +509,10 @@ if __name__ == '__main__':
@pytest.fixture
def executor(self):
result = process.ProcessExecutor()
- yield result
- result.close()
+ try:
+ yield result
+ finally:
+ result.close()
@pytest.fixture
def workflow_context(self, tmpdir):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d31c011d/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index 4fa8184..a96c91d 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -277,8 +277,10 @@ class TestWithActualSSHServer(object):
@pytest.fixture
def executor(self):
result = process.ProcessExecutor()
- yield result
- result.close()
+ try:
+ yield result
+ finally:
+ result.close()
@pytest.fixture
def workflow_context(self, tmpdir):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d31c011d/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index 3527f34..a77d727 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -14,6 +14,7 @@
# limitations under the License.
import json
+import time
from threading import Thread, Event
from datetime import datetime
@@ -58,6 +59,10 @@ class TimeoutError(BaseException):
pass
+class FailingTask(BaseException):
+ pass
+
+
def test_undeclared_workflow(request):
# validating a proper error is raised when the workflow is not declared in the service
with pytest.raises(exceptions.UndeclaredWorkflowError):
@@ -342,6 +347,14 @@ class TestResumableWorkflows(object):
executor=executor)
return wf_runner
+ @staticmethod
+ def _wait_for_active_and_cancel(workflow_runner):
+ if custom_events['is_active'].wait(60) is False:
+ raise TimeoutError("is_active wasn't set to True")
+ workflow_runner.cancel()
+ if custom_events['execution_cancelled'].wait(60) is False:
+ raise TimeoutError("Execution did not end")
+
def test_resume_workflow(self, workflow_context, thread_executor):
node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
@@ -355,18 +368,13 @@ class TestResumableWorkflows(object):
wf_thread.start()
# Wait for the execution to start
- if custom_events['is_active'].wait(5) is False:
- raise TimeoutError("is_active wasn't set to True")
- wf_runner.cancel()
-
- if custom_events['execution_cancelled'].wait(60) is False:
- raise TimeoutError("Execution did not end")
+ self._wait_for_active_and_cancel(wf_runner)
tasks = workflow_context.model.task.list(filters={'_stub_type': None})
assert any(task.status == task.SUCCESS for task in tasks)
- assert any(task.status == task.PENDING for task in tasks)
+ assert any(task.status == task.RETRYING for task in tasks)
custom_events['is_resumed'].set()
- assert any(task.status == task.PENDING for task in tasks)
+ assert any(task.status == task.RETRYING for task in tasks)
# Create a new workflow runner, with an existing execution id. This would cause
# the old execution to restart.
@@ -386,27 +394,62 @@ class TestResumableWorkflows(object):
assert node.attributes['invocations'].value == 3
assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
- def test_resume_failed_task(self, workflow_context, thread_executor):
+ def test_resume_started_task(self, workflow_context, thread_executor):
+ node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+ self._create_interface(workflow_context, node, mock_stuck_task)
+
+ wf_runner = self._create_initial_workflow_runner(
+ workflow_context, mock_single_task_workflow, thread_executor)
+
+ wf_thread = Thread(target=wf_runner.execute)
+ wf_thread.daemon = True
+ wf_thread.start()
+
+ self._wait_for_active_and_cancel(wf_runner)
+ task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
+ assert node.attributes['invocations'].value == 1
+ assert task.status == task.STARTED
+ assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
+ wf_runner.execution.CANCELLING)
+ custom_events['is_resumed'].set()
+
+ new_thread_executor = thread.ThreadExecutor()
+ try:
+ new_wf_runner = WorkflowRunner(
+ service_id=wf_runner.service.id,
+ inputs={},
+ model_storage=workflow_context.model,
+ resource_storage=workflow_context.resource,
+ plugin_manager=None,
+ execution_id=wf_runner.execution.id,
+ executor=new_thread_executor)
+ new_wf_runner.execute()
+ finally:
+ new_thread_executor.close()
+
+ # Wait for it to finish and assert changes.
+ assert node.attributes['invocations'].value == 2
+ assert task.status == task.SUCCESS
+ assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
+ def test_resume_failed_task(self, workflow_context, thread_executor):
node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
- self._create_interface(workflow_context, node, mock_failed_first_task)
+ self._create_interface(workflow_context, node, mock_failed_before_resuming)
wf_runner = self._create_initial_workflow_runner(
- workflow_context, mock_sequential_workflow, thread_executor)
+ workflow_context, mock_single_task_workflow, thread_executor)
wf_thread = Thread(target=wf_runner.execute)
wf_thread.setDaemon(True)
wf_thread.start()
- if custom_events['is_active'].wait(60) is False:
- raise TimeoutError("Execution did not end")
- wf_runner.cancel()
- if custom_events['execution_cancelled'].wait(60) is False:
- raise TimeoutError("Execution did not end")
+ self._wait_for_active_and_cancel(wf_runner)
task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
assert node.attributes['invocations'].value == 2
- assert task.status == task.PENDING
+ assert task.status == task.STARTED
assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
wf_runner.execution.CANCELLING)
@@ -513,13 +556,13 @@ def mock_resuming_task(ctx):
if not custom_events['is_resumed'].isSet():
# if resume was called, increase by one. o/w fail the execution - second task should
# fail as long it was not a part of resuming the workflow
- raise BaseException("wasn't resumed yet")
+ raise FailingTask("wasn't resumed yet")
@workflow
-def mock_sequential_workflow(ctx, graph):
+def mock_single_task_workflow(ctx, graph):
node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
- graph.sequence(
+ graph.add_tasks(
api.task.OperationTask(node,
interface_name='aria.interfaces.lifecycle',
operation_name='create',
@@ -529,7 +572,7 @@ def mock_sequential_workflow(ctx, graph):
@operation
-def mock_failed_first_task(ctx):
+def mock_failed_before_resuming(ctx):
"""
The task should run atmost ctx.task.max_attempts - 1 times, and only then pass.
overall, the number of invocations should be ctx.task.max_attempts - 1
@@ -540,11 +583,20 @@ def mock_failed_first_task(ctx):
custom_events['is_active'].set()
# unfreeze the thread only when all of the invocations are done
while ctx.node.attributes['invocations'] < ctx.task.max_attempts - 1:
- pass
+ time.sleep(5)
elif ctx.node.attributes['invocations'] == ctx.task.max_attempts - 1:
# pass only just before the end.
return
else:
# fail o.w.
- raise BaseException("stop this task")
+ raise FailingTask("stop this task")
+
+
+@operation
+def mock_stuck_task(ctx):
+ ctx.node.attributes['invocations'] += 1
+ while not custom_events['is_resumed'].isSet():
+ if not custom_events['is_active'].isSet():
+ custom_events['is_active'].set()
+ time.sleep(5)