You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/11 09:43:32 UTC
incubator-ariatosca git commit: more tests fixes [Forced Update!]
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-276-Support-model-instrumentation-for-workflows a94a4dd12 -> 24df3f5ef (forced update)
more tests fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/24df3f5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/24df3f5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/24df3f5e
Branch: refs/heads/ARIA-276-Support-model-instrumentation-for-workflows
Commit: 24df3f5ef3e7b8a01454ee8cf313147b900119b1
Parents: ff1596e
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Jun 11 12:16:33 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Sun Jun 11 12:43:27 2017 +0300
----------------------------------------------------------------------
aria/storage/collection_instrumentation.py | 1 -
tests/orchestrator/context/test_serialize.py | 20 +++--
tests/orchestrator/execution_plugin/test_ssh.py | 50 +++++------
.../orchestrator/workflows/core/test_engine.py | 88 +++++++++++++++-----
.../executor/test_process_executor_extension.py | 24 +++---
.../test_process_executor_tracked_changes.py | 26 +++---
6 files changed, 130 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/24df3f5e/aria/storage/collection_instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/collection_instrumentation.py b/aria/storage/collection_instrumentation.py
index b8f656c..454f97a 100644
--- a/aria/storage/collection_instrumentation.py
+++ b/aria/storage/collection_instrumentation.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from functools import partial
from . import exceptions
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/24df3f5e/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index 4db7bf4..0919e81 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -33,16 +33,10 @@ def test_serialize_operation_context(context, executor, tmpdir):
test_file.write(TEST_FILE_CONTENT)
resource = context.resource
resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file))
- graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
- eng.execute()
-
-@workflow
-def _mock_workflow(ctx, graph):
- node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
plugin = mock.models.create_plugin()
- ctx.model.plugin.put(plugin)
+ context.model.plugin.put(plugin)
interface = mock.models.create_interface(
node.service,
'test',
@@ -51,6 +45,16 @@ def _mock_workflow(ctx, graph):
plugin=plugin)
)
node.interfaces[interface.name] = interface
+ context.model.node.update(node)
+
+ graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
+ eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
+ eng.execute()
+
+
+@workflow
+def _mock_workflow(ctx, graph):
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
task = api.task.OperationTask(node, interface_name='test', operation_name='op')
graph.add_tasks(task)
return graph
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/24df3f5e/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index 8b326e7..8c4dd2d 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -214,33 +214,33 @@ class TestWithActualSSHServer(object):
else:
operation = operations.run_script_with_ssh
+ node = self._workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ arguments = {
+ 'script_path': script_path,
+ 'fabric_env': _FABRIC_ENV,
+ 'process': process,
+ 'use_sudo': use_sudo,
+ 'custom_env_var': custom_input,
+ 'test_operation': '',
+ }
+ if hide_output:
+ arguments['hide_output'] = hide_output
+ if commands:
+ arguments['commands'] = commands
+ interface = mock.models.create_interface(
+ node.service,
+ 'test',
+ 'op',
+ operation_kwargs=dict(
+ function='{0}.{1}'.format(
+ operations.__name__,
+ operation.__name__),
+ arguments=arguments)
+ )
+ node.interfaces[interface.name] = interface
+
@workflow
def mock_workflow(ctx, graph):
- node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
- arguments = {
- 'script_path': script_path,
- 'fabric_env': _FABRIC_ENV,
- 'process': process,
- 'use_sudo': use_sudo,
- 'custom_env_var': custom_input,
- 'test_operation': '',
- }
- if hide_output:
- arguments['hide_output'] = hide_output
- if commands:
- arguments['commands'] = commands
- interface = mock.models.create_interface(
- node.service,
- 'test',
- 'op',
- operation_kwargs=dict(
- function='{0}.{1}'.format(
- operations.__name__,
- operation.__name__),
- arguments=arguments)
- )
- node.interfaces[interface.name] = interface
-
ops = []
for test_operation in test_operations:
op_arguments = arguments.copy()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/24df3f5e/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index 6d2836c..0438544 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -55,12 +55,7 @@ class BaseTest(object):
tasks_graph=graph)
@staticmethod
- def _op(ctx,
- func,
- arguments=None,
- max_attempts=None,
- retry_interval=None,
- ignore_failure=None):
+ def _create_interface(ctx, func, arguments=None):
node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
interface_name = 'aria.interfaces.lifecycle'
operation_kwargs = dict(function='{name}.{func.__name__}'.format(
@@ -72,6 +67,17 @@ class BaseTest(object):
interface = mock.models.create_interface(node.service, interface_name, operation_name,
operation_kwargs=operation_kwargs)
node.interfaces[interface.name] = interface
+ ctx.model.node.update(node)
+
+ return node, interface_name, operation_name
+
+ @staticmethod
+ def _op(node,
+ operation_name,
+ arguments=None,
+ max_attempts=None,
+ retry_interval=None,
+ ignore_failure=None):
return api.task.OperationTask(
node,
@@ -158,9 +164,11 @@ class TestEngine(BaseTest):
assert execution.status == models.Execution.SUCCEEDED
def test_single_task_successful_execution(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(workflow_context, mock_success_task)
+
@workflow
def mock_workflow(ctx, graph):
- graph.add_tasks(self._op(ctx, func=mock_success_task))
+ graph.add_tasks(self._op(node, operation_name))
self._execute(
workflow_func=mock_workflow,
workflow_context=workflow_context,
@@ -170,9 +178,11 @@ class TestEngine(BaseTest):
assert global_test_holder.get('sent_task_signal_calls') == 1
def test_single_task_failed_execution(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(workflow_context, mock_failed_task)
+
@workflow
def mock_workflow(ctx, graph):
- graph.add_tasks(self._op(ctx, func=mock_failed_task))
+ graph.add_tasks(self._op(node, operation_name))
with pytest.raises(exceptions.ExecutorException):
self._execute(
workflow_func=mock_workflow,
@@ -187,10 +197,13 @@ class TestEngine(BaseTest):
assert execution.status == models.Execution.FAILED
def test_two_tasks_execution_order(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_ordered_task, {'counter': 1})
+
@workflow
def mock_workflow(ctx, graph):
- op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1})
- op2 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2})
+ op1 = self._op(node, operation_name, arguments={'counter': 1})
+ op2 = self._op(node, operation_name, arguments={'counter': 2})
graph.sequence(op1, op2)
self._execute(
workflow_func=mock_workflow,
@@ -202,11 +215,14 @@ class TestEngine(BaseTest):
assert global_test_holder.get('sent_task_signal_calls') == 2
def test_stub_and_subworkflow_execution(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_ordered_task, {'counter': 1})
+
@workflow
def sub_workflow(ctx, graph):
- op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1})
+ op1 = self._op(node, operation_name, arguments={'counter': 1})
op2 = api.task.StubTask()
- op3 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2})
+ op3 = self._op(node, operation_name, arguments={'counter': 2})
graph.sequence(op1, op2, op3)
@workflow
@@ -225,11 +241,13 @@ class TestCancel(BaseTest):
def test_cancel_started_execution(self, workflow_context, executor):
number_of_tasks = 100
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_sleep_task, {'seconds': 0.1})
@workflow
def mock_workflow(ctx, graph):
operations = (
- self._op(ctx, func=mock_sleep_task, arguments=dict(seconds=0.1))
+ self._op(node, operation_name, arguments=dict(seconds=0.1))
for _ in range(number_of_tasks)
)
return graph.sequence(*operations)
@@ -267,9 +285,12 @@ class TestCancel(BaseTest):
class TestRetries(BaseTest):
def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_conditional_failure_task,
+ op = self._op(node, operation_name,
arguments={'failure_count': 1},
max_attempts=2)
graph.add_tasks(op)
@@ -283,9 +304,12 @@ class TestRetries(BaseTest):
assert global_test_holder.get('sent_task_signal_calls') == 2
def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_conditional_failure_task,
+ op = self._op(node, operation_name,
arguments={'failure_count': 2},
max_attempts=2)
graph.add_tasks(op)
@@ -300,9 +324,11 @@ class TestRetries(BaseTest):
assert global_test_holder.get('sent_task_signal_calls') == 2
def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_conditional_failure_task,
+ op = self._op(node, operation_name,
arguments={'failure_count': 1},
max_attempts=3)
graph.add_tasks(op)
@@ -316,9 +342,12 @@ class TestRetries(BaseTest):
assert global_test_holder.get('sent_task_signal_calls') == 2
def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_conditional_failure_task,
+ op = self._op(node, operation_name,
arguments={'failure_count': 2},
max_attempts=3)
graph.add_tasks(op)
@@ -332,9 +361,11 @@ class TestRetries(BaseTest):
assert global_test_holder.get('sent_task_signal_calls') == 3
def test_infinite_retries(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_conditional_failure_task,
+ op = self._op(node, operation_name,
arguments={'failure_count': 1},
max_attempts=-1)
graph.add_tasks(op)
@@ -358,9 +389,11 @@ class TestRetries(BaseTest):
executor=executor)
def _test_retry_interval(self, retry_interval, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_conditional_failure_task,
+ op = self._op(node, operation_name,
arguments={'failure_count': 1},
max_attempts=2,
retry_interval=retry_interval)
@@ -378,9 +411,11 @@ class TestRetries(BaseTest):
assert global_test_holder.get('sent_task_signal_calls') == 2
def test_ignore_failure(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_conditional_failure_task,
+ op = self._op(node, operation_name,
ignore_failure=True,
arguments={'failure_count': 100},
max_attempts=100)
@@ -401,10 +436,12 @@ class TestTaskRetryAndAbort(BaseTest):
def test_task_retry_default_interval(self, workflow_context, executor):
default_retry_interval = 0.1
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_task_retry, {'message': self.message})
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_task_retry,
+ op = self._op(node, operation_name,
arguments={'message': self.message},
retry_interval=default_retry_interval,
max_attempts=2)
@@ -425,10 +462,13 @@ class TestTaskRetryAndAbort(BaseTest):
def test_task_retry_custom_interval(self, workflow_context, executor):
default_retry_interval = 100
custom_retry_interval = 0.1
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_task_retry, {'message': self.message,
+ 'retry_interval': custom_retry_interval})
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_task_retry,
+ op = self._op(node, operation_name,
arguments={'message': self.message,
'retry_interval': custom_retry_interval},
retry_interval=default_retry_interval,
@@ -449,9 +489,11 @@ class TestTaskRetryAndAbort(BaseTest):
assert global_test_holder.get('sent_task_signal_calls') == 2
def test_task_abort(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_task_abort, {'message': self.message})
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_task_abort,
+ op = self._op(node, operation_name,
arguments={'message': self.message},
retry_interval=100,
max_attempts=100)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/24df3f5e/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 7969457..5f0b75f 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -32,19 +32,23 @@ def test_decorate_extension(context, executor):
def get_node(ctx):
return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ node = get_node(context)
+ interface_name = 'test_interface'
+ operation_name = 'operation'
+ interface = mock.models.create_interface(
+ context.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(function='{0}.{1}'.format(__name__, _mock_operation.__name__),
+ arguments=arguments)
+ )
+ node.interfaces[interface.name] = interface
+ context.model.node.update(node)
+
+
@workflow
def mock_workflow(ctx, graph):
node = get_node(ctx)
- interface_name = 'test_interface'
- operation_name = 'operation'
- interface = mock.models.create_interface(
- ctx.service,
- interface_name,
- operation_name,
- operation_kwargs=dict(function='{0}.{1}'.format(__name__, _mock_operation.__name__),
- arguments=arguments)
- )
- node.interfaces[interface.name] = interface
task = api.task.OperationTask(
node,
interface_name=interface_name,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/24df3f5e/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 2d80a3b..7dbcc5a 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -83,20 +83,22 @@ def test_apply_tracked_changes_during_an_operation(context, executor):
def _run_workflow(context, executor, op_func, arguments=None):
+ node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface_name = 'test_interface'
+ operation_name = 'operation'
+ wf_arguments = arguments or {}
+ interface = mock.models.create_interface(
+ context.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(function=_operation_mapping(op_func),
+ arguments=wf_arguments)
+ )
+ node.interfaces[interface.name] = interface
+ context.model.node.update(node)
+
@workflow
def mock_workflow(ctx, graph):
- node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
- interface_name = 'test_interface'
- operation_name = 'operation'
- wf_arguments = arguments or {}
- interface = mock.models.create_interface(
- ctx.service,
- interface_name,
- operation_name,
- operation_kwargs=dict(function=_operation_mapping(op_func),
- arguments=wf_arguments)
- )
- node.interfaces[interface.name] = interface
task = api.task.OperationTask(
node,
interface_name=interface_name,