You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/19 15:52:10 UTC

incubator-ariatosca git commit: cleaning up 1

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-236-Resumable-workflow-executions 9431465dd -> 2fa30b4c1


cleaning up 1


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/2fa30b4c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/2fa30b4c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/2fa30b4c

Branch: refs/heads/ARIA-236-Resumable-workflow-executions
Commit: 2fa30b4c1fb8f02c4a6edf68fa5590e46a012a37
Parents: 9431465
Author: max-orlov <ma...@gigaspaces.com>
Authored: Mon Jun 19 18:52:05 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Mon Jun 19 18:52:05 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/events.py                |  2 +-
 aria/orchestrator/exceptions.py            |  7 +++
 aria/orchestrator/workflow_runner.py       | 16 +++---
 tests/mock/__init__.py                     |  2 +-
 tests/orchestrator/test_workflow_runner.py | 67 +++++++++++--------------
 5 files changed, 44 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2fa30b4c/aria/orchestrator/events.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/events.py b/aria/orchestrator/events.py
index a1d7006..aa1b5bc 100644
--- a/aria/orchestrator/events.py
+++ b/aria/orchestrator/events.py
@@ -34,4 +34,4 @@ on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal')
 on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal')
 on_success_workflow_signal = signal('on_success_workflow_signal')
 on_failure_workflow_signal = signal('on_failure_workflow_signal')
-on_resume_workflow_signal = signal('on_resume_workflow_signal')
\ No newline at end of file
+on_resume_workflow_signal = signal('on_resume_workflow_signal')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2fa30b4c/aria/orchestrator/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py
index 8d3dcc6..71b6401 100644
--- a/aria/orchestrator/exceptions.py
+++ b/aria/orchestrator/exceptions.py
@@ -74,3 +74,10 @@ class WorkflowImplementationNotFoundError(AriaError):
     Raised when attempting to import a workflow's code but the implementation is not found
     """
     pass
+
+
+class InvalidWorkflowRunnerParams(AriaError):
+    """
+    Raised when invalid combination of arguments is passed to the workflow runner
+    """
+    pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2fa30b4c/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 995d325..1f93292 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -62,18 +62,15 @@ class WorkflowRunner(object):
         # by several threads without raising errors on model objects shared between threads
         self._service_id = service_id
 
-        if workflow_name is not None:
-            assert execution_id is None
+        if workflow_name is not None and execution_id is None:
             self._workflow_name = workflow_name
-            execution = self._create_execution_model(inputs)
-            self._execution_id = execution.id
             self._validate_workflow_exists_for_service()
-            workflow_fn = self._get_workflow_fn()
-        else:
-            assert execution_id is not None
+            self._execution_id = self._create_execution_model(inputs).id
+        elif execution_id is not None:
             self._execution_id = execution_id
-            self._workflow_name = \
-                self._model_storage.execution.get(self._execution_id).workflow_name
+            self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
+        else:
+            raise exceptions.InvalidWorkflowRunnerParams("")
 
         self._workflow_context = WorkflowContext(
             name=self.__class__.__name__,
@@ -93,6 +90,7 @@ class WorkflowRunner(object):
 
         if execution_id is None:
             # Not an existing execution
+            workflow_fn = self._get_workflow_fn()
             tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
             engine.construct_execution_tasks(self.execution, tasks_graph, executor.__class__)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2fa30b4c/tests/mock/__init__.py
----------------------------------------------------------------------
diff --git a/tests/mock/__init__.py b/tests/mock/__init__.py
index 9004b4c..9183b77 100644
--- a/tests/mock/__init__.py
+++ b/tests/mock/__init__.py
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from . import models, context, topology, operations
+from . import models, context, topology, operations, workflow

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2fa30b4c/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index df4638a..7a08ba8 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -33,13 +33,11 @@ from aria.orchestrator import (
     operation,
 )
 
-from ..mock import (
-    topology,
-    workflow as workflow_mocks
+from tests import (
+    mock as tests_mock,
+    storage
 )
 
-from tests import mock, storage
-
 from ..fixtures import (  # pylint: disable=unused-import
     plugins_dir,
     plugin_manager,
@@ -72,8 +70,8 @@ def test_builtin_workflow_instantiation(request):
     # validates the workflow runner instantiates properly when provided with a builtin workflow
     # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
     workflow_runner = _create_workflow_runner(request, 'install')
-    tasks = list(workflow_runner._tasks_graph.tasks)
-    assert len(tasks) == 2  # expecting two WorkflowTasks
+    tasks = list(workflow_runner.execution.tasks)
+    assert len(tasks) == 18  # expecting 18 tasks for 2 node topology
 
 
 def test_custom_workflow_instantiation(request):
@@ -81,8 +79,8 @@ def test_custom_workflow_instantiation(request):
     # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
     mock_workflow = _setup_mock_workflow_in_service(request)
     workflow_runner = _create_workflow_runner(request, mock_workflow)
-    tasks = list(workflow_runner._tasks_graph.tasks)
-    assert len(tasks) == 0  # mock workflow creates no tasks
+    tasks = list(workflow_runner.execution.tasks)
+    assert len(tasks) == 2  # mock workflow creates only start workflow and end workflow task
 
 
 def test_existing_active_executions(request, service, model):
@@ -253,7 +251,7 @@ def test_workflow_function_parameters(request, tmpdir):
 @pytest.fixture
 def service(model):
     # sets up a service in the storage
-    service_id = topology.create_simple_topology_two_nodes(model)
+    service_id = tests_mock.topology.create_simple_topology_two_nodes(model)
     service = model.service.get(service_id)
     return service
 
@@ -264,7 +262,7 @@ def _setup_mock_workflow_in_service(request, inputs=None):
     service = request.getfuncargvalue('service')
     resource = request.getfuncargvalue('resource')
 
-    source = workflow_mocks.__file__
+    source = tests_mock.workflow.__file__
     resource.service_template.upload(str(service.service_template.id), source)
     mock_workflow_name = 'test_workflow'
     arguments = {}
@@ -311,9 +309,9 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
 class TestResumableWorkflows(object):
 
     def test_single_task_successful_execution(self, workflow_context, executor):
-        node, _, operation_name = self._create_interface(workflow_context, mock_success_task)
+        self._create_interface(workflow_context, mock_success_task)
 
-        workflow_context.service.workflows['custom_workflow'] = mock.models.create_operation(
+        workflow_context.service.workflows['custom_workflow'] = tests_mock.models.create_operation(
             'custom_workflow',
             operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)}
         )
@@ -344,6 +342,8 @@ class TestResumableWorkflows(object):
             assert task.status in (task.FAILED, task.RETRYING)
             assert global_test_holder['state'] == 'idle'
 
+            # Create a new workflow runner, with an existing execution id. This would cause
+            # the old execution to restart.
             new_wf_runner = WorkflowRunner(
                 service_id=wf_runner.service.id,
                 inputs={},
@@ -353,16 +353,20 @@ class TestResumableWorkflows(object):
                 execution_id=wf_runner.execution.id,
                 executor=executor)
 
+            # Set the resumed to True, for the execution to succeed.
             global_test_holder['resumed'] = True
             new_wf_runner.execute()
 
+            # Wait for it to finish and assert changes.
             while global_test_holder.get('state') != 'ended':
                 pass
             assert task.status == task.SUCCESS
             assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-        except BaseException:
+
+        except:
             global_test_holder['state'] = 'terminated'
-            wf_thread.join(5)
+            wf_thread.join(0.5)
+            raise
 
     @staticmethod
     @pytest.fixture
@@ -376,13 +380,13 @@ class TestResumableWorkflows(object):
     @staticmethod
     @pytest.fixture
     def workflow_context(tmpdir):
-        workflow_context = mock.context.simple(str(tmpdir))
+        workflow_context = tests_mock.context.simple(str(tmpdir))
         yield workflow_context
         storage.release_sqlite_storage(workflow_context.model)
 
     @staticmethod
     def _create_interface(ctx, func, arguments=None):
-        node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+        node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
         interface_name = 'aria.interfaces.lifecycle'
         operation_kwargs = dict(function='{name}.{func.__name__}'.format(
             name=__name__, func=func))
@@ -390,8 +394,8 @@ class TestResumableWorkflows(object):
             # the operation has to declare the arguments before those may be passed
             operation_kwargs['arguments'] = arguments
         operation_name = 'create'
-        interface = mock.models.create_interface(node.service, interface_name, operation_name,
-                                                 operation_kwargs=operation_kwargs)
+        interface = tests_mock.models.create_interface(node.service, interface_name, operation_name,
+                                                       operation_kwargs=operation_kwargs)
         node.interfaces[interface.name] = interface
         ctx.model.node.update(node)
 
@@ -409,8 +413,11 @@ class TestResumableWorkflows(object):
 
 @workflow
 def mock_workflow(ctx, graph):
-    node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
-    graph.add_tasks(_op(node, 'create'))
+    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+    graph.add_tasks(
+        api.task.OperationTask(
+            node, interface_name='aria.interfaces.lifecycle', operation_name='create')
+    )
 
 
 @operation
@@ -422,21 +429,3 @@ def mock_success_task(**_):
             return
     global_test_holder['state'] = 'idle'
     raise Exception("The operation was terminated")
-
-
-def _op(node,
-        operation_name,
-        arguments=None,
-        max_attempts=None,
-        retry_interval=None,
-        ignore_failure=None):
-
-    return api.task.OperationTask(
-        node,
-        interface_name='aria.interfaces.lifecycle',
-        operation_name=operation_name,
-        arguments=arguments,
-        max_attempts=max_attempts,
-        retry_interval=retry_interval,
-        ignore_failure=ignore_failure,
-    )
\ No newline at end of file