You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/07/09 14:04:15 UTC

incubator-ariatosca git commit: added test [Forced Update!]

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions 69fbd8ce9 -> 34b64dcce (forced update)


added test


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/34b64dcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/34b64dcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/34b64dcc

Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions
Commit: 34b64dcce354d663bebaf09913e34d1847cd9db7
Parents: d4db727
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Jul 9 16:19:30 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Sun Jul 9 17:04:11 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/core/engine.py |   9 +-
 tests/orchestrator/test_workflow_runner.py | 125 +++++++++++++++++++++---
 2 files changed, 119 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/34b64dcc/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 0ec3cd8..69505fc 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -69,8 +69,15 @@ class Engine(logger.LoggerMixin):
             if cancel:
                 self._terminate_tasks(tasks_tracker.executing_tasks)
                 events.on_cancelled_workflow_signal.send(ctx)
-            else:
+            elif all(task.status == task.SUCCESS or task.ignore_failure
+                     for task in ctx.execution.tasks):
                 events.on_success_workflow_signal.send(ctx)
+            else:
+                exception = "Tasks {tasks} remain failed".format(
+                    tasks=
+                    [t for t in ctx.execution.tasks if t.status == t.SUCCESS or t.ignore_failure]
+                )
+                events.on_failure_workflow_signal.send(ctx, exception=exception)
         except BaseException as e:
             # Cleanup any remaining tasks
             self._terminate_tasks(tasks_tracker.executing_tasks)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/34b64dcc/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index 7ed0182..1bfc0ce 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -50,7 +50,8 @@ custom_events = {
     'is_resumed': Event(),
     'is_active': Event(),
     'execution_cancelled': Event(),
-    'execution_ended': Event()
+    'execution_ended': Event(),
+    'execution_failed': Event()
 }
 
 
@@ -349,7 +350,7 @@ class TestResumableWorkflows(object):
         self._create_interface(workflow_context, node, mock_resuming_task)
 
         wf_runner = self._create_initial_workflow_runner(
-            workflow_context, mock_parallel_workflow, thread_executor)
+            workflow_context, mock_two_parallel_tasks_workflow, thread_executor)
 
         wf_thread = Thread(target=wf_runner.execute)
         wf_thread.daemon = True
@@ -387,14 +388,14 @@ class TestResumableWorkflows(object):
         assert node.attributes['invocations'].value == 3
         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
 
-    def test_resume_failed_task(self, workflow_context, thread_executor):
+    def test_resume_single_failed_task(self, workflow_context, thread_executor):
 
         node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
-        self._create_interface(workflow_context, node, mock_failed_first_task)
+        self._create_interface(workflow_context, node, mock_conditional_failing_task)
 
         wf_runner = self._create_initial_workflow_runner(
-            workflow_context, mock_sequential_workflow, thread_executor)
+            workflow_context, mock_single_task_workflow, thread_executor)
         wf_thread = Thread(target=wf_runner.execute)
         wf_thread.setDaemon(True)
         wf_thread.start()
@@ -436,6 +437,76 @@ class TestResumableWorkflows(object):
         assert task.status == task.SUCCESS
         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
 
+    def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor):
+        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+        self._create_interface(workflow_context, node, mock_three_different_tasks)
+
+        wf_runner = self._create_initial_workflow_runner(
+            workflow_context, mock_three_parallel_tasks_workflow, thread_executor)
+        wf_thread = Thread(target=wf_runner.execute)
+        wf_thread.setDaemon(True)
+        wf_thread.start()
+
+        if custom_events['execution_ended'].wait(60) is False:
+            raise TimeoutError("Execution did not end")
+
+        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+        assert node.attributes['invocations'].value == 3
+
+        # First task passes
+        assert any(task.status == task.FAILED for task in tasks)
+        # Second task fails
+        assert any(task.status == task.SUCCESS for task in tasks)
+        # third task remains stuck
+
+        assert wf_runner.execution.status in wf_runner.execution.FAILED
+
+        custom_events['is_resumed'].set()
+        # third task ends
+        assert node.attributes['invocations'].value == 3
+
+        # Create a new workflow runner, with an existing execution id. This would cause
+        # the old execution to restart.
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_wf_runner = WorkflowRunner(
+                service_id=wf_runner.service.id,
+                inputs={},
+                model_storage=workflow_context.model,
+                resource_storage=workflow_context.resource,
+                plugin_manager=None,
+                execution_id=wf_runner.execution.id,
+                executor=new_thread_executor)
+
+            new_wf_runner.execute()
+        finally:
+            new_thread_executor.close()
+        assert node.attributes['invocations'].value == 3
+
+        # Resuming the workflow with retry failed should retry only the second task (
+        # as it was the only one failed)
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_wf_runner = WorkflowRunner(
+                service_id=wf_runner.service.id,
+                retry_failed=True,
+                inputs={},
+                model_storage=workflow_context.model,
+                resource_storage=workflow_context.resource,
+                plugin_manager=None,
+                execution_id=wf_runner.execution.id,
+                executor=new_thread_executor)
+
+            new_wf_runner.execute()
+        finally:
+            new_thread_executor.close()
+
+        # Wait for it to finish and assert changes.
+        assert node.attributes['invocations'].value == 4
+        assert all(task.status == task.SUCCESS for task in tasks)
+        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
     @staticmethod
     @pytest.fixture
     def thread_executor():
@@ -495,7 +566,7 @@ class TestResumableWorkflows(object):
 
 
 @workflow
-def mock_parallel_workflow(ctx, graph):
+def mock_two_parallel_tasks_workflow(ctx, graph):
     node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
     graph.add_tasks(
         api.task.OperationTask(
@@ -518,19 +589,16 @@ def mock_resuming_task(ctx):
 
 
 @workflow
-def mock_sequential_workflow(ctx, graph):
+def mock_single_task_workflow(ctx, graph):
     node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-    graph.sequence(
-        api.task.OperationTask(node,
-                               interface_name='aria.interfaces.lifecycle',
-                               operation_name='create',
-                               retry_interval=1,
-                               max_attempts=10),
+    graph.add_tasks(
+        api.task.OperationTask(
+            node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=10)
     )
 
 
 @operation
-def mock_failed_first_task(ctx):
+def mock_conditional_failing_task(ctx):
     """
     The task should run atmost ctx.task.max_attempts - 1 times, and only then pass.
     overall, the number of invocations should be ctx.task.max_attempts - 1
@@ -549,3 +617,32 @@ def mock_failed_first_task(ctx):
     else:
         # fail o.w.
         raise BaseException("stop this task")
+
+
+@workflow
+def mock_three_parallel_tasks_workflow(ctx, graph):
+    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+    graph.add_tasks(
+        api.task.OperationTask(
+            node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=1),
+        api.task.OperationTask(
+            node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=1),
+        api.task.OperationTask(
+            node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=1),
+    )
+
+
+@operation
+def mock_three_different_tasks(ctx):
+    ctx.node.attributes['invocations'] += 1
+
+    # The first task should end gracefully
+
+    if ctx.node.attributes['invocations'] == 2:
+        # The second task should fail
+        raise BaseException("First execution should fail")
+
+    elif ctx.node.attributes['invocations'] == 3:
+        # The third task should remain stuck until the execution is resumed
+        while custom_events['is_resumed'].isSet():
+            pass