You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/07/10 09:36:29 UTC

incubator-ariatosca git commit: review 1 fixes

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-299-Resuming-canceled-execution-with-frozen-task-fails 62875b56c -> 05d2b73ae


review 1 fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/05d2b73a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/05d2b73a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/05d2b73a

Branch: refs/heads/ARIA-299-Resuming-canceled-execution-with-frozen-task-fails
Commit: 05d2b73ae20d144de581d9a225fdf9ab19dcd0bd
Parents: 62875b5
Author: max-orlov <ma...@gigaspaces.com>
Authored: Mon Jul 10 12:36:22 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Mon Jul 10 12:36:22 2017 +0300

----------------------------------------------------------------------
 .../workflows/core/events_handler.py            |  8 +-
 .../orchestrator/execution_plugin/test_local.py |  6 +-
 tests/orchestrator/execution_plugin/test_ssh.py |  6 +-
 tests/orchestrator/test_workflow_runner.py      | 98 +++++++++++++++-----
 4 files changed, 87 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/05d2b73a/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index eb6f271..37801de 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -114,10 +114,6 @@ def _workflow_cancelled(workflow_context, *args, **kwargs):
         elif execution.status in (execution.SUCCEEDED, execution.FAILED):
             _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
         else:
-            # Any non ended task would be put back to pending state
-            for task in execution.tasks:
-                if not task.has_ended():
-                    task.status = task.PENDING
             execution.status = execution.CANCELLED
             execution.ended_at = datetime.utcnow()
 
@@ -127,6 +123,10 @@ def _workflow_resume(workflow_context, *args, **kwargs):
     with workflow_context.persist_changes:
         execution = workflow_context.execution
         execution.status = execution.PENDING
+        # Any non ended task would be put back to pending state
+        for task in execution.tasks:
+            if not task.has_ended():
+                task.status = task.PENDING
 
 
 @events.on_cancelling_workflow_signal.connect

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/05d2b73a/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 5b94917..e64e998 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -509,8 +509,10 @@ if __name__ == '__main__':
     @pytest.fixture
     def executor(self):
         result = process.ProcessExecutor()
-        yield result
-        result.close()
+        try:
+            yield result
+        finally:
+            result.close()
 
     @pytest.fixture
     def workflow_context(self, tmpdir):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/05d2b73a/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index 4fa8184..a96c91d 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -277,8 +277,10 @@ class TestWithActualSSHServer(object):
     @pytest.fixture
     def executor(self):
         result = process.ProcessExecutor()
-        yield result
-        result.close()
+        try:
+            yield result
+        finally:
+            result.close()
 
     @pytest.fixture
     def workflow_context(self, tmpdir):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/05d2b73a/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index 3527f34..a77d727 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import json
+import time
 from threading import Thread, Event
 from datetime import datetime
 
@@ -58,6 +59,10 @@ class TimeoutError(BaseException):
     pass
 
 
+class FailingTask(BaseException):
+    pass
+
+
 def test_undeclared_workflow(request):
     # validating a proper error is raised when the workflow is not declared in the service
     with pytest.raises(exceptions.UndeclaredWorkflowError):
@@ -342,6 +347,14 @@ class TestResumableWorkflows(object):
             executor=executor)
         return wf_runner
 
+    @staticmethod
+    def _wait_for_active_and_cancel(workflow_runner):
+        if custom_events['is_active'].wait(60) is False:
+            raise TimeoutError("is_active wasn't set to True")
+        workflow_runner.cancel()
+        if custom_events['execution_cancelled'].wait(60) is False:
+            raise TimeoutError("Execution did not end")
+
     def test_resume_workflow(self, workflow_context, thread_executor):
         node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
@@ -355,18 +368,13 @@ class TestResumableWorkflows(object):
         wf_thread.start()
 
         # Wait for the execution to start
-        if custom_events['is_active'].wait(5) is False:
-            raise TimeoutError("is_active wasn't set to True")
-        wf_runner.cancel()
-
-        if custom_events['execution_cancelled'].wait(60) is False:
-            raise TimeoutError("Execution did not end")
+        self._wait_for_active_and_cancel(wf_runner)
 
         tasks = workflow_context.model.task.list(filters={'_stub_type': None})
         assert any(task.status == task.SUCCESS for task in tasks)
-        assert any(task.status == task.PENDING for task in tasks)
+        assert any(task.status == task.RETRYING for task in tasks)
         custom_events['is_resumed'].set()
-        assert any(task.status == task.PENDING for task in tasks)
+        assert any(task.status == task.RETRYING for task in tasks)
 
         # Create a new workflow runner, with an existing execution id. This would cause
         # the old execution to restart.
@@ -386,27 +394,62 @@ class TestResumableWorkflows(object):
         assert node.attributes['invocations'].value == 3
         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
 
-    def test_resume_failed_task(self, workflow_context, thread_executor):
+    def test_resume_started_task(self, workflow_context, thread_executor):
+        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+        self._create_interface(workflow_context, node, mock_stuck_task)
+
+        wf_runner = self._create_initial_workflow_runner(
+            workflow_context, mock_single_task_workflow, thread_executor)
+
+        wf_thread = Thread(target=wf_runner.execute)
+        wf_thread.daemon = True
+        wf_thread.start()
+
+        self._wait_for_active_and_cancel(wf_runner)
+        task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
+        assert node.attributes['invocations'].value == 1
+        assert task.status == task.STARTED
+        assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
+                                              wf_runner.execution.CANCELLING)
+        custom_events['is_resumed'].set()
+
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_wf_runner = WorkflowRunner(
+                service_id=wf_runner.service.id,
+                inputs={},
+                model_storage=workflow_context.model,
+                resource_storage=workflow_context.resource,
+                plugin_manager=None,
+                execution_id=wf_runner.execution.id,
+                executor=new_thread_executor)
 
+            new_wf_runner.execute()
+        finally:
+            new_thread_executor.close()
+
+        # Wait for it to finish and assert changes.
+        assert node.attributes['invocations'].value == 2
+        assert task.status == task.SUCCESS
+        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
+    def test_resume_failed_task(self, workflow_context, thread_executor):
         node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
-        self._create_interface(workflow_context, node, mock_failed_first_task)
+        self._create_interface(workflow_context, node, mock_failed_before_resuming)
 
         wf_runner = self._create_initial_workflow_runner(
-            workflow_context, mock_sequential_workflow, thread_executor)
+            workflow_context, mock_single_task_workflow, thread_executor)
         wf_thread = Thread(target=wf_runner.execute)
         wf_thread.setDaemon(True)
         wf_thread.start()
 
-        if custom_events['is_active'].wait(60) is False:
-            raise TimeoutError("Execution did not end")
-        wf_runner.cancel()
-        if custom_events['execution_cancelled'].wait(60) is False:
-            raise TimeoutError("Execution did not end")
+        self._wait_for_active_and_cancel(wf_runner)
 
         task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
         assert node.attributes['invocations'].value == 2
-        assert task.status == task.PENDING
+        assert task.status == task.STARTED
         assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
                                               wf_runner.execution.CANCELLING)
 
@@ -513,13 +556,13 @@ def mock_resuming_task(ctx):
         if not custom_events['is_resumed'].isSet():
             # if resume was called, increase by one. o/w fail the execution - second task should
             # fail as long it was not a part of resuming the workflow
-            raise BaseException("wasn't resumed yet")
+            raise FailingTask("wasn't resumed yet")
 
 
 @workflow
-def mock_sequential_workflow(ctx, graph):
+def mock_single_task_workflow(ctx, graph):
     node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-    graph.sequence(
+    graph.add_tasks(
         api.task.OperationTask(node,
                                interface_name='aria.interfaces.lifecycle',
                                operation_name='create',
@@ -529,7 +572,7 @@ def mock_sequential_workflow(ctx, graph):
 
 
 @operation
-def mock_failed_first_task(ctx):
+def mock_failed_before_resuming(ctx):
     """
     The task should run atmost ctx.task.max_attempts - 1 times, and only then pass.
     overall, the number of invocations should be ctx.task.max_attempts - 1
@@ -540,11 +583,20 @@ def mock_failed_first_task(ctx):
         custom_events['is_active'].set()
         # unfreeze the thread only when all of the invocations are done
         while ctx.node.attributes['invocations'] < ctx.task.max_attempts - 1:
-            pass
+            time.sleep(5)
 
     elif ctx.node.attributes['invocations'] == ctx.task.max_attempts - 1:
         # pass only just before the end.
         return
     else:
         # fail o.w.
-        raise BaseException("stop this task")
+        raise FailingTask("stop this task")
+
+
+@operation
+def mock_stuck_task(ctx):
+    ctx.node.attributes['invocations'] += 1
+    while not custom_events['is_resumed'].isSet():
+        if not custom_events['is_active'].isSet():
+            custom_events['is_active'].set()
+        time.sleep(5)