You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/22 09:27:45 UTC

incubator-ariatosca git commit: review 2

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-236-Resumable-workflow-executions 92ef0fd8b -> 608f5d791


review 2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/608f5d79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/608f5d79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/608f5d79

Branch: refs/heads/ARIA-236-Resumable-workflow-executions
Commit: 608f5d7917fc0f332aaa3606732f4a46068ee7f8
Parents: 92ef0fd
Author: max-orlov <ma...@gigaspaces.com>
Authored: Thu Jun 22 12:27:41 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu Jun 22 12:27:41 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflow_runner.py       | 23 ++++++++++----------
 aria/orchestrator/workflows/core/engine.py |  6 +-----
 tests/orchestrator/test_workflow_runner.py | 28 ++++++++++++-------------
 3 files changed, 26 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/608f5d79/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 829b8cd..2d4b515 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -55,9 +55,12 @@ class WorkflowRunner(object):
         :param task_retry_interval: Retry interval in between retry attempts of a failing task
         """
 
+        if not (execution_id or (workflow_name and service_id and not execution_id)):
+            exceptions.InvalidWorkflowRunnerParams(
+                "Either provide execution id in order to resume a workflow or workflow name "
+                "and service id with inputs")
+
         self._is_resume = execution_id is not None
-        self._is_start = \
-            workflow_name is not None and service_id is not None and execution_id is None
 
         self._model_storage = model_storage
         self._resource_storage = resource_storage
@@ -65,19 +68,15 @@ class WorkflowRunner(object):
         # the IDs are stored rather than the models themselves, so this module could be used
         # by several threads without raising errors on model objects shared between threads
 
-        if self._is_start:
-            self._service_id = service_id
-            self._workflow_name = workflow_name
-            self._validate_workflow_exists_for_service()
-            self._execution_id = self._create_execution_model(inputs).id
-        elif self._is_resume:
+        if self._is_resume:
             self._execution_id = execution_id
             self._service_id = self.execution.service.id
             self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
         else:
-            raise exceptions.InvalidWorkflowRunnerParams(
-                "Either provide execution id in order to resume a workflow or workflow name "
-                "and service id with inputs")
+            self._service_id = service_id
+            self._workflow_name = workflow_name
+            self._validate_workflow_exists_for_service()
+            self._execution_id = self._create_execution_model(inputs).id
 
         self._workflow_context = WorkflowContext(
             name=self.__class__.__name__,
@@ -95,7 +94,7 @@ class WorkflowRunner(object):
         # transforming the execution inputs to dict, to pass them to the workflow function
         execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
 
-        if self._is_start:
+        if not self._is_resume:
             workflow_fn = self._get_workflow_fn()
             tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
             compile.create_execution_tasks(self._workflow_context, tasks_graph, executor.__class__)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/608f5d79/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index a7e5148..d5a6e70 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -48,7 +48,7 @@ class Engine(logger.LoggerMixin):
         executing_tasks = []
 
         if resuming:
-            self._resuming_execution(ctx)
+            events.on_resume_workflow_signal.send(ctx)
 
         try:
             events.start_workflow_signal.send(ctx)
@@ -73,10 +73,6 @@ class Engine(logger.LoggerMixin):
             raise
 
     @staticmethod
-    def _resuming_execution(ctx):
-        events.on_resume_workflow_signal.send(ctx)
-
-    @staticmethod
     def cancel_execution(ctx):
         """
         Send a cancel request to the engine. If execution already started, execution status

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/608f5d79/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index c86d6a7..7a58528 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -53,6 +53,10 @@ events = {
 }
 
 
+class TimeoutError(BaseException):
+    pass
+
+
 def test_undeclared_workflow(request):
     # validating a proper error is raised when the workflow is not declared in the service
     with pytest.raises(exceptions.UndeclaredWorkflowError):
@@ -316,7 +320,7 @@ class TestResumableWorkflows(object):
 
     def test_resume_workflow(self, workflow_context, executor):
         node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
         self._create_interface(workflow_context, node, mock_resuming_task)
 
         service = workflow_context.service
@@ -339,12 +343,13 @@ class TestResumableWorkflows(object):
         wf_thread.start()
 
         # Wait for the execution to start
-        events['is_active'].wait(5)
+        if events['is_active'].wait(5) is False:
+            TimeoutError("is_active wasn't set to True")
         wf_runner.cancel()
 
-        # Make sure the execution was canceled and the task has not ended
+        if events['execution_ended'].wait(60) is False:
+            raise TimeoutError("Execution did not end")
 
-        events['execution_ended'].wait(10)
         first_task, second_task = workflow_context.model.task.list(filters={'_stub_type': None})
         assert first_task.status == first_task.SUCCESS
         assert second_task.status in (second_task.FAILED, second_task.RETRYING)
@@ -366,7 +371,7 @@ class TestResumableWorkflows(object):
 
         # Wait for it to finish and assert changes.
         assert second_task.status == second_task.SUCCESS
-        assert node.attributes['invocations'].value == 2
+        assert node.attributes['invocations'].value == 3
         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
 
     @staticmethod
@@ -412,7 +417,7 @@ class TestResumableWorkflows(object):
 
     @pytest.fixture(autouse=True)
     def register_to_events(self):
-        def execution_ended(**_):
+        def execution_ended(*args, **kwargs):
             events['execution_ended'].set()
 
         on_cancelled_workflow_signal.connect(execution_ended)
@@ -433,16 +438,11 @@ def mock_workflow(ctx, graph):
 
 @operation
 def mock_resuming_task(ctx):
+    ctx.node.attributes['invocations'] += 1
 
-    if 'invocations' not in ctx.node.attributes:
-        # This is the first node invocation
-        ctx.node.attributes['invocations'] = 1
-    else:
+    if ctx.node.attributes['invocations'] != 1:
         events['is_active'].set()
-        # This is the second node invocation
-        if events['is_resumed'].isSet():
+        if not events['is_resumed'].isSet():
             # if resume was called, increase by one. o/w fail the execution - second task should
             # fail as long it was not a part of resuming the workflow
-            ctx.node.attributes['invocations'] += 1
-        else:
             raise BaseException("wasn't resumed yet")