You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/19 15:52:10 UTC
incubator-ariatosca git commit: cleaning up 1
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-236-Resumable-workflow-executions 9431465dd -> 2fa30b4c1
cleaning up 1
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/2fa30b4c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/2fa30b4c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/2fa30b4c
Branch: refs/heads/ARIA-236-Resumable-workflow-executions
Commit: 2fa30b4c1fb8f02c4a6edf68fa5590e46a012a37
Parents: 9431465
Author: max-orlov <ma...@gigaspaces.com>
Authored: Mon Jun 19 18:52:05 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Mon Jun 19 18:52:05 2017 +0300
----------------------------------------------------------------------
aria/orchestrator/events.py | 2 +-
aria/orchestrator/exceptions.py | 7 +++
aria/orchestrator/workflow_runner.py | 16 +++---
tests/mock/__init__.py | 2 +-
tests/orchestrator/test_workflow_runner.py | 67 +++++++++++--------------
5 files changed, 44 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2fa30b4c/aria/orchestrator/events.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/events.py b/aria/orchestrator/events.py
index a1d7006..aa1b5bc 100644
--- a/aria/orchestrator/events.py
+++ b/aria/orchestrator/events.py
@@ -34,4 +34,4 @@ on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal')
on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal')
on_success_workflow_signal = signal('on_success_workflow_signal')
on_failure_workflow_signal = signal('on_failure_workflow_signal')
-on_resume_workflow_signal = signal('on_resume_workflow_signal')
\ No newline at end of file
+on_resume_workflow_signal = signal('on_resume_workflow_signal')
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2fa30b4c/aria/orchestrator/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py
index 8d3dcc6..71b6401 100644
--- a/aria/orchestrator/exceptions.py
+++ b/aria/orchestrator/exceptions.py
@@ -74,3 +74,10 @@ class WorkflowImplementationNotFoundError(AriaError):
Raised when attempting to import a workflow's code but the implementation is not found
"""
pass
+
+
+class InvalidWorkflowRunnerParams(AriaError):
+ """
+ Raised when invalid combination of arguments is passed to the workflow runner
+ """
+ pass
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2fa30b4c/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 995d325..1f93292 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -62,18 +62,15 @@ class WorkflowRunner(object):
# by several threads without raising errors on model objects shared between threads
self._service_id = service_id
- if workflow_name is not None:
- assert execution_id is None
+ if workflow_name is not None and execution_id is None:
self._workflow_name = workflow_name
- execution = self._create_execution_model(inputs)
- self._execution_id = execution.id
self._validate_workflow_exists_for_service()
- workflow_fn = self._get_workflow_fn()
- else:
- assert execution_id is not None
+ self._execution_id = self._create_execution_model(inputs).id
+ elif execution_id is not None:
self._execution_id = execution_id
- self._workflow_name = \
- self._model_storage.execution.get(self._execution_id).workflow_name
+ self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
+ else:
+ raise exceptions.InvalidWorkflowRunnerParams("")
self._workflow_context = WorkflowContext(
name=self.__class__.__name__,
@@ -93,6 +90,7 @@ class WorkflowRunner(object):
if execution_id is None:
# Not an existing execution
+ workflow_fn = self._get_workflow_fn()
tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
engine.construct_execution_tasks(self.execution, tasks_graph, executor.__class__)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2fa30b4c/tests/mock/__init__.py
----------------------------------------------------------------------
diff --git a/tests/mock/__init__.py b/tests/mock/__init__.py
index 9004b4c..9183b77 100644
--- a/tests/mock/__init__.py
+++ b/tests/mock/__init__.py
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from . import models, context, topology, operations
+from . import models, context, topology, operations, workflow
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2fa30b4c/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index df4638a..7a08ba8 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -33,13 +33,11 @@ from aria.orchestrator import (
operation,
)
-from ..mock import (
- topology,
- workflow as workflow_mocks
+from tests import (
+ mock as tests_mock,
+ storage
)
-from tests import mock, storage
-
from ..fixtures import ( # pylint: disable=unused-import
plugins_dir,
plugin_manager,
@@ -72,8 +70,8 @@ def test_builtin_workflow_instantiation(request):
# validates the workflow runner instantiates properly when provided with a builtin workflow
# (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
workflow_runner = _create_workflow_runner(request, 'install')
- tasks = list(workflow_runner._tasks_graph.tasks)
- assert len(tasks) == 2 # expecting two WorkflowTasks
+ tasks = list(workflow_runner.execution.tasks)
+ assert len(tasks) == 18 # expecting 18 tasks for 2 node topology
def test_custom_workflow_instantiation(request):
@@ -81,8 +79,8 @@ def test_custom_workflow_instantiation(request):
# (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
mock_workflow = _setup_mock_workflow_in_service(request)
workflow_runner = _create_workflow_runner(request, mock_workflow)
- tasks = list(workflow_runner._tasks_graph.tasks)
- assert len(tasks) == 0 # mock workflow creates no tasks
+ tasks = list(workflow_runner.execution.tasks)
+ assert len(tasks) == 2 # mock workflow creates only start workflow and end workflow task
def test_existing_active_executions(request, service, model):
@@ -253,7 +251,7 @@ def test_workflow_function_parameters(request, tmpdir):
@pytest.fixture
def service(model):
# sets up a service in the storage
- service_id = topology.create_simple_topology_two_nodes(model)
+ service_id = tests_mock.topology.create_simple_topology_two_nodes(model)
service = model.service.get(service_id)
return service
@@ -264,7 +262,7 @@ def _setup_mock_workflow_in_service(request, inputs=None):
service = request.getfuncargvalue('service')
resource = request.getfuncargvalue('resource')
- source = workflow_mocks.__file__
+ source = tests_mock.workflow.__file__
resource.service_template.upload(str(service.service_template.id), source)
mock_workflow_name = 'test_workflow'
arguments = {}
@@ -311,9 +309,9 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
class TestResumableWorkflows(object):
def test_single_task_successful_execution(self, workflow_context, executor):
- node, _, operation_name = self._create_interface(workflow_context, mock_success_task)
+ self._create_interface(workflow_context, mock_success_task)
- workflow_context.service.workflows['custom_workflow'] = mock.models.create_operation(
+ workflow_context.service.workflows['custom_workflow'] = tests_mock.models.create_operation(
'custom_workflow',
operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)}
)
@@ -344,6 +342,8 @@ class TestResumableWorkflows(object):
assert task.status in (task.FAILED, task.RETRYING)
assert global_test_holder['state'] == 'idle'
+ # Create a new workflow runner, with an existing execution id. This would cause
+ # the old execution to restart.
new_wf_runner = WorkflowRunner(
service_id=wf_runner.service.id,
inputs={},
@@ -353,16 +353,20 @@ class TestResumableWorkflows(object):
execution_id=wf_runner.execution.id,
executor=executor)
+ # Set the resumed to True, for the execution to succeed.
global_test_holder['resumed'] = True
new_wf_runner.execute()
+ # Wait for it to finish and assert changes.
while global_test_holder.get('state') != 'ended':
pass
assert task.status == task.SUCCESS
assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
- except BaseException:
+
+ except:
global_test_holder['state'] = 'terminated'
- wf_thread.join(5)
+ wf_thread.join(0.5)
+ raise
@staticmethod
@pytest.fixture
@@ -376,13 +380,13 @@ class TestResumableWorkflows(object):
@staticmethod
@pytest.fixture
def workflow_context(tmpdir):
- workflow_context = mock.context.simple(str(tmpdir))
+ workflow_context = tests_mock.context.simple(str(tmpdir))
yield workflow_context
storage.release_sqlite_storage(workflow_context.model)
@staticmethod
def _create_interface(ctx, func, arguments=None):
- node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
interface_name = 'aria.interfaces.lifecycle'
operation_kwargs = dict(function='{name}.{func.__name__}'.format(
name=__name__, func=func))
@@ -390,8 +394,8 @@ class TestResumableWorkflows(object):
# the operation has to declare the arguments before those may be passed
operation_kwargs['arguments'] = arguments
operation_name = 'create'
- interface = mock.models.create_interface(node.service, interface_name, operation_name,
- operation_kwargs=operation_kwargs)
+ interface = tests_mock.models.create_interface(node.service, interface_name, operation_name,
+ operation_kwargs=operation_kwargs)
node.interfaces[interface.name] = interface
ctx.model.node.update(node)
@@ -409,8 +413,11 @@ class TestResumableWorkflows(object):
@workflow
def mock_workflow(ctx, graph):
- node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
- graph.add_tasks(_op(node, 'create'))
+ node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ graph.add_tasks(
+ api.task.OperationTask(
+ node, interface_name='aria.interfaces.lifecycle', operation_name='create')
+ )
@operation
@@ -422,21 +429,3 @@ def mock_success_task(**_):
return
global_test_holder['state'] = 'idle'
raise Exception("The operation was terminated")
-
-
-def _op(node,
- operation_name,
- arguments=None,
- max_attempts=None,
- retry_interval=None,
- ignore_failure=None):
-
- return api.task.OperationTask(
- node,
- interface_name='aria.interfaces.lifecycle',
- operation_name=operation_name,
- arguments=arguments,
- max_attempts=max_attempts,
- retry_interval=retry_interval,
- ignore_failure=ignore_failure,
- )
\ No newline at end of file