You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/21 14:56:31 UTC

incubator-ariatosca git commit: review 1 fix

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-236-Resumable-workflow-executions 2fc9896cf -> 3de374534


review 1 fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/3de37453
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/3de37453
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/3de37453

Branch: refs/heads/ARIA-236-Resumable-workflow-executions
Commit: 3de37453468ef39f4cf3ea8af8e540f2c31ee58b
Parents: 2fc9896
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed Jun 21 17:56:27 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed Jun 21 17:56:27 2017 +0300

----------------------------------------------------------------------
 aria/cli/commands/executions.py            |   7 +-
 aria/orchestrator/workflow_runner.py       |  24 ++---
 aria/orchestrator/workflows/core/engine.py |   8 +-
 tests/orchestrator/test_workflow_runner.py | 122 +++++++++++++-----------
 4 files changed, 90 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3de37453/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index a6f06c3..b337e84 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -135,8 +135,8 @@ def start(workflow_name,
 
     workflow_runner = \
         WorkflowRunner(
-            inputs, model_storage, resource_storage, plugin_manager,
-            service_id=service.id, workflow_name=workflow_name, executor=executor,
+            model_storage, resource_storage, plugin_manager,
+            service_id=service.id, workflow_name=workflow_name, inputs=inputs, executor=executor,
             task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
         )
     logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
@@ -158,7 +158,6 @@ def start(workflow_name,
 @aria.pass_plugin_manager
 @aria.pass_logger
 def resume(execution_id,
-           inputs,
            dry,
            task_max_attempts,
            task_retry_interval,
@@ -171,7 +170,7 @@ def resume(execution_id,
 
     workflow_runner = \
         WorkflowRunner(
-            inputs, model_storage, resource_storage, plugin_manager,
+            model_storage, resource_storage, plugin_manager,
             execution_id=execution_id, executor=executor,
             task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
         )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3de37453/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 99b6edf..829b8cd 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -37,8 +37,8 @@ DEFAULT_TASK_RETRY_INTERVAL = 30
 
 class WorkflowRunner(object):
 
-    def __init__(self, inputs, model_storage, resource_storage, plugin_manager,
-                 execution_id=None, service_id=None, workflow_name=None, executor=None,
+    def __init__(self, model_storage, resource_storage, plugin_manager,
+                 execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None,
                  task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
                  task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
         """
@@ -55,23 +55,29 @@ class WorkflowRunner(object):
         :param task_retry_interval: Retry interval in between retry attempts of a failing task
         """
 
+        self._is_resume = execution_id is not None
+        self._is_start = \
+            workflow_name is not None and service_id is not None and execution_id is None
+
         self._model_storage = model_storage
         self._resource_storage = resource_storage
 
         # the IDs are stored rather than the models themselves, so this module could be used
         # by several threads without raising errors on model objects shared between threads
 
-        if workflow_name is not None and service_id is not None and execution_id is None:
+        if self._is_start:
             self._service_id = service_id
             self._workflow_name = workflow_name
             self._validate_workflow_exists_for_service()
             self._execution_id = self._create_execution_model(inputs).id
-        elif execution_id is not None:
+        elif self._is_resume:
             self._execution_id = execution_id
             self._service_id = self.execution.service.id
             self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
         else:
-            raise exceptions.InvalidWorkflowRunnerParams("")
+            raise exceptions.InvalidWorkflowRunnerParams(
+                "Either provide execution id in order to resume a workflow or workflow name "
+                "and service id with inputs")
 
         self._workflow_context = WorkflowContext(
             name=self.__class__.__name__,
@@ -89,17 +95,13 @@ class WorkflowRunner(object):
         # transforming the execution inputs to dict, to pass them to the workflow function
         execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
 
-        if execution_id is None:
-            # Not an existing execution
+        if self._is_start:
             workflow_fn = self._get_workflow_fn()
             tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
             compile.create_execution_tasks(self._workflow_context, tasks_graph, executor.__class__)
 
         self._engine = engine.Engine(executors={executor.__class__: executor})
 
-        if workflow_name is None:
-            self._engine.resume_execution(self._workflow_context)
-
     @property
     def execution_id(self):
         return self._execution_id
@@ -113,7 +115,7 @@ class WorkflowRunner(object):
         return self._model_storage.service.get(self._service_id)
 
     def execute(self):
-        self._engine.execute(ctx=self._workflow_context)
+        self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume)
 
     def cancel(self):
         self._engine.cancel_execution(ctx=self._workflow_context)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3de37453/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 7f14500..a7e5148 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -41,11 +41,15 @@ class Engine(logger.LoggerMixin):
         self._executors = executors.copy()
         self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
 
-    def execute(self, ctx):
+    def execute(self, ctx, resuming=False):
         """
         execute the workflow
         """
         executing_tasks = []
+
+        if resuming:
+            self._resuming_execution(ctx)
+
         try:
             events.start_workflow_signal.send(ctx)
             while True:
@@ -69,7 +73,7 @@ class Engine(logger.LoggerMixin):
             raise
 
     @staticmethod
-    def resume_execution(ctx):
+    def _resuming_execution(ctx):
         events.on_resume_workflow_signal.send(ctx)
 
     @staticmethod

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3de37453/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index 36ebfe1..ec4b40b 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 import json
-from threading import Thread
+from threading import Thread, Event
 from datetime import datetime
 
 import mock
@@ -23,6 +23,7 @@ import pytest
 from aria.modeling import exceptions as modeling_exceptions
 from aria.modeling import models
 from aria.orchestrator import exceptions
+from aria.orchestrator.events import on_cancelled_workflow_signal
 from aria.orchestrator.workflow_runner import WorkflowRunner
 from aria.orchestrator.workflows.executor.process import ProcessExecutor
 from aria.orchestrator.workflows import api
@@ -45,7 +46,11 @@ from ..fixtures import (  # pylint: disable=unused-import
     resource_storage as resource
 )
 
-global_test_holder = {}
+global_test_holder = {
+    'is_resumed': Event(),
+    'is_active': Event(),
+    'execution_ended': Event()
+}
 
 
 def test_undeclared_workflow(request):
@@ -309,7 +314,9 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
 class TestResumableWorkflows(object):
 
     def test_resume_workflow(self, workflow_context, executor):
-        self._create_interface(workflow_context, mock_success_task)
+        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+
+        self._create_interface(workflow_context, node, mock_resuming_task)
 
         service = workflow_context.service
         service.workflows['custom_workflow'] = tests_mock.models.create_operation(
@@ -327,47 +334,39 @@ class TestResumableWorkflows(object):
             workflow_name='custom_workflow',
             executor=executor)
         wf_thread = Thread(target=wf_runner.execute)
-        try:
-            wf_thread.start()
-
-            # Wait for the execution to start
-            while global_test_holder.get('state') != 'active':
-                pass
-            global_test_holder['state'] = 'terminated'
-            wf_runner.cancel()
-
-            # Make sure the execution was canceled and the task has not ended
-            while wf_runner.execution.status != workflow_context.execution.CANCELLED:
-                pass
-            task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
-            assert task.status in (task.FAILED, task.RETRYING)
-            assert global_test_holder['state'] == 'idle'
-
-            # Create a new workflow runner, with an existing execution id. This would cause
-            # the old execution to restart.
-            new_wf_runner = WorkflowRunner(
-                service_id=wf_runner.service.id,
-                inputs={},
-                model_storage=workflow_context.model,
-                resource_storage=workflow_context.resource,
-                plugin_manager=None,
-                execution_id=wf_runner.execution.id,
-                executor=executor)
-
-            # Set the resumed to True, for the execution to succeed.
-            global_test_holder['resumed'] = True
-            new_wf_runner.execute()
-
-            # Wait for it to finish and assert changes.
-            while global_test_holder.get('state') != 'ended':
-                pass
-            assert task.status == task.SUCCESS
-            assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
-        except:
-            global_test_holder['state'] = 'terminated'
-            wf_thread.join(0.5)
-            raise
+        wf_thread.daemon = True
+        wf_thread.start()
+
+        # Wait for the execution to start
+        global_test_holder['is_active'].wait(5)
+        wf_runner.cancel()
+
+        # Make sure the execution was canceled and the task has not ended
+
+        global_test_holder['execution_ended'].wait(10)
+        first_task, second_task = workflow_context.model.task.list(filters={'_stub_type': None})
+        assert first_task.status == first_task.SUCCESS
+        assert second_task.status in (second_task.FAILED, second_task.RETRYING)
+        global_test_holder['is_resumed'].set()
+        assert second_task.status in (second_task.FAILED, second_task.RETRYING)
+
+        # Create a new workflow runner, with an existing execution id. This would cause
+        # the old execution to restart.
+        new_wf_runner = WorkflowRunner(
+            service_id=wf_runner.service.id,
+            inputs={},
+            model_storage=workflow_context.model,
+            resource_storage=workflow_context.resource,
+            plugin_manager=None,
+            execution_id=wf_runner.execution.id,
+            executor=executor)
+
+        new_wf_runner.execute()
+
+        # Wait for it to finish and assert changes.
+        assert second_task.status == second_task.SUCCESS
+        assert node.attributes['invocations'].value == 2
+        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
 
     @staticmethod
     @pytest.fixture
@@ -386,8 +385,7 @@ class TestResumableWorkflows(object):
         storage.release_sqlite_storage(workflow_context.model)
 
     @staticmethod
-    def _create_interface(ctx, func, arguments=None):
-        node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+    def _create_interface(ctx, node, func, arguments=None):
         interface_name = 'aria.interfaces.lifecycle'
         operation_kwargs = dict(function='{name}.{func.__name__}'.format(
             name=__name__, func=func))
@@ -411,22 +409,38 @@ class TestResumableWorkflows(object):
 
         return engine.Engine(executors={executor.__class__: executor})
 
+    @pytest.fixture(autouse=True)
+    def register_to_events(self):
+        def execution_ended(**_):
+            global_test_holder['execution_ended'].set()
+
+        on_cancelled_workflow_signal.connect(execution_ended)
+        yield
+        on_cancelled_workflow_signal.disconnect(execution_ended)
 
 @workflow
 def mock_workflow(ctx, graph):
     node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
     graph.add_tasks(
         api.task.OperationTask(
+            node, interface_name='aria.interfaces.lifecycle', operation_name='create'),
+        api.task.OperationTask(
             node, interface_name='aria.interfaces.lifecycle', operation_name='create')
     )
 
 
 @operation
-def mock_success_task(**_):
-    global_test_holder['state'] = 'active'
-    while global_test_holder.get('state') != 'terminated':
-        if global_test_holder.get('resumed') is True:
-            global_test_holder['state'] = 'ended'
-            return
-    global_test_holder['state'] = 'idle'
-    raise Exception("The operation was terminated")
+def mock_resuming_task(ctx):
+
+    if 'invocations' not in ctx.node.attributes:
+        # This is the first node invocation
+        ctx.node.attributes['invocations'] = 1
+    else:
+        global_test_holder['is_active'].set()
+        # This is the second node invocation
+        if global_test_holder['is_resumed'].isSet():
+            # if resume was called, increase by one. o/w fail the execution - second task should
+            # fail as long it was not a part of resuming the workflow
+            ctx.node.attributes['invocations'] += 1
+        else:
+            raise BaseException("wasn't resumed yet")