You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/07/10 15:01:50 UTC

[1/3] incubator-ariatosca git commit: ARIA-103 Remove dependency on Clint [Forced Update!]

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions ef1419cd0 -> 123a55ce7 (forced update)


ARIA-103 Remove dependency on Clint

We no longer require this third-party library, instead the utils/console
module uses the existing cli/color module.

This commit also fixes the cli/color module to properly support Unicode,
and also properly deinitialize Colorama in Windows.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/f903006b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/f903006b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/f903006b

Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions
Commit: f903006b013fdc9c77b7be42a915dfb72fb16b96
Parents: b30a7ed
Author: Tal Liron <ta...@gmail.com>
Authored: Mon Jul 10 12:28:23 2017 +0300
Committer: Tal Liron <ta...@gmail.com>
Committed: Mon Jul 10 15:56:01 2017 +0300

----------------------------------------------------------------------
 aria/cli/color.py     | 21 ++++++++++++------
 aria/utils/console.py | 53 +++++++++++++++++++++++++++++++---------------
 requirements.in       |  1 -
 requirements.txt      |  4 +---
 4 files changed, 52 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f903006b/aria/cli/color.py
----------------------------------------------------------------------
diff --git a/aria/cli/color.py b/aria/cli/color.py
index 03381ba..d6a4cd6 100644
--- a/aria/cli/color.py
+++ b/aria/cli/color.py
@@ -18,11 +18,20 @@ Terminal colorization utilities.
 """
 
 from StringIO import StringIO
+import atexit
 import re
 
 import colorama
 
+from ..utils.formatting import safe_str
+
+
+def _restore_terminal():
+    colorama.deinit()
+
+
 colorama.init()
+atexit.register(_restore_terminal)
 
 
 class StringStylizer(object):
@@ -33,20 +42,20 @@ class StringStylizer(object):
     def __repr__(self):
         if self._color_spec:
             return '{schema}{str}{reset}'.format(
-                schema=self._color_spec, str=str(self._str), reset=Colors.Style.RESET_ALL)
+                schema=self._color_spec, str=safe_str(self._str), reset=Colors.Style.RESET_ALL)
         return self._str
 
     def __add__(self, other):
-        return str(self) + other
+        return safe_str(self) + other
 
     def __radd__(self, other):
-        return other + str(self)
+        return other + safe_str(self)
 
     def color(self, color_spec):
         self._color_spec = color_spec
 
     def replace(self, old, new, **kwargs):
-        self._str = self._str.replace(str(old), str(new), **kwargs)
+        self._str = self._str.replace(safe_str(old), safe_str(new), **kwargs)
 
     def format(self, *args, **kwargs):
         self._str = self._str.format(*args, **kwargs)
@@ -79,8 +88,8 @@ class Colors(object):
 class ColorSpec(object):
     def __init__(self, fore=None, back=None, style=None):
         """
-        It is possible to provide fore, back and style arguments. each could be either
-        the color is lower case letter, or the actual color from Colorama.
+        It is possible to provide fore, back and style arguments. Each could be either the color as
+        lowercase letters, or the full color name for Colorama.
         """
         self._kwargs = dict(fore=fore, back=back, style=style)
         self._str = StringIO()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f903006b/aria/utils/console.py
----------------------------------------------------------------------
diff --git a/aria/utils/console.py b/aria/utils/console.py
index 642cbb1..2f6f622 100644
--- a/aria/utils/console.py
+++ b/aria/utils/console.py
@@ -17,52 +17,71 @@
 Abstraction API above terminal color libraries.
 """
 
-from clint.textui.core import STDOUT
-from clint.textui import puts as _puts
-from clint.textui.colored import ColoredString as _ColoredString
-from clint.textui import indent  # pylint: disable=unused-import
+import os
+import sys
+
+from contextlib import contextmanager
 
 from .formatting import safe_str
+from ..cli import color
+
+
+_indent_string = ''
 
 
-class ColoredString(_ColoredString):
-    def __init__(self, color, str_, always_color=False, bold=False):
-        super(ColoredString, self).__init__(color, safe_str(str_), always_color, bold)
+def puts(string='', newline=True, stream=sys.stdout):
+    stream.write(_indent_string)
+    stream.write(safe_str(string))
+    if newline:
+        stream.write(os.linesep)
 
 
-def puts(string='', newline=True, stream=STDOUT):
-    _puts(safe_str(string), newline, stream)
+@contextmanager
+def indent(size=4):
+    global _indent_string
+    original_indent_string = _indent_string
+    try:
+        _indent_string += ' ' * size
+        yield
+    finally:
+        _indent_string = original_indent_string
 
 
 class Colored(object):
     @staticmethod
     def black(string, always=False, bold=False):
-        return ColoredString('BLACK', string, always_color=always, bold=bold)
+        return Colored._color(string, color.Colors.Fore.BLACK, bold)
 
     @staticmethod
     def red(string, always=False, bold=False):
-        return ColoredString('RED', string, always_color=always, bold=bold)
+        return Colored._color(string, color.Colors.Fore.RED, bold)
 
     @staticmethod
     def green(string, always=False, bold=False):
-        return ColoredString('GREEN', string, always_color=always, bold=bold)
+        return Colored._color(string, color.Colors.Fore.GREEN, bold)
 
     @staticmethod
     def yellow(string, always=False, bold=False):
-        return ColoredString('YELLOW', string, always_color=always, bold=bold)
+        return Colored._color(string, color.Colors.Fore.YELLOW, bold)
 
     @staticmethod
     def blue(string, always=False, bold=False):
-        return ColoredString('BLUE', string, always_color=always, bold=bold)
+        return Colored._color(string, color.Colors.Fore.BLUE, bold)
 
     @staticmethod
     def magenta(string, always=False, bold=False):
-        return ColoredString('MAGENTA', string, always_color=always, bold=bold)
+        return Colored._color(string, color.Colors.Fore.MAGENTA, bold)
 
     @staticmethod
     def cyan(string, always=False, bold=False):
-        return ColoredString('CYAN', string, always_color=always, bold=bold)
+        return Colored._color(string, color.Colors.Fore.CYAN, bold)
 
     @staticmethod
     def white(string, always=False, bold=False):
-        return ColoredString('WHITE', string, always_color=always, bold=bold)
+        return Colored._color(string, color.Colors.Fore.WHITE, bold)
+
+    @staticmethod
+    def _color(string, fore, bold):
+        return color.StringStylizer(string, color.ColorSpec(
+            fore=fore,
+            style=color.Colors.Style.BRIGHT if bold else color.Colors.Style.NORMAL))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f903006b/requirements.in
----------------------------------------------------------------------
diff --git a/requirements.in b/requirements.in
index 723ed51..a864335 100644
--- a/requirements.in
+++ b/requirements.in
@@ -22,7 +22,6 @@ ruamel.yaml>=0.11.12, <0.12.0  # version 0.12.0 dropped support of python 2.6
 Jinja2>=2.8, <2.9
 shortuuid>=0.5, <0.6
 CacheControl[filecache]>=0.11.0, <0.13
-clint>=0.5.0, <0.6
 SQLAlchemy>=1.1.0, <1.2  # version 1.2 dropped support of python 2.6
 wagon==0.6.0
 bottle>=0.12.0, <0.13

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f903006b/requirements.txt
----------------------------------------------------------------------
diff --git a/requirements.txt b/requirements.txt
index 7ee1008..ea97922 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,17 +2,15 @@
 # This file is autogenerated by pip-compile
 # To update, run:
 #
-#    pip-compile --output-file requirements.txt requirements.in
+#    pip-compile --output-file ./requirements.txt ./requirements.in
 #
 appdirs==1.4.3            # via setuptools
-args==0.1.0               # via clint
 backports.shutil_get_terminal_size==1.0.0
 blinker==1.4
 bottle==0.12.13
 cachecontrol[filecache]==0.12.1
 click==6.7
 click_didyoumean==0.0.3
-clint==0.5.1
 colorama==0.3.9
 decorator==4.0.11         # via networkx
 importlib==1.0.4 ; python_version < "2.7"


[3/3] incubator-ariatosca git commit: ARIA-237 Support for resuming failed workflow executions

Posted by mx...@apache.org.
ARIA-237 Support for resuming failed workflow executions


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/123a55ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/123a55ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/123a55ce

Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions
Commit: 123a55ce7f111f4bd113b1c4e8af8fd1e85b295e
Parents: 6c2f35e
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Jul 2 21:43:43 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Mon Jul 10 18:01:44 2017 +0300

----------------------------------------------------------------------
 aria/modeling/orchestration.py                  |   4 +-
 aria/orchestrator/workflow_runner.py            |   7 +-
 aria/orchestrator/workflows/core/engine.py      |  18 ++-
 .../workflows/core/events_handler.py            |   9 +-
 tests/modeling/test_models.py                   |   3 +-
 tests/orchestrator/test_workflow_runner.py      | 135 +++++++++++++------
 6 files changed, 125 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/123a55ce/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index df2643e..4d4f0fe 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -65,7 +65,9 @@ class ExecutionBase(mixins.ModelMixin):
         PENDING: (STARTED, CANCELLED),
         STARTED: END_STATES + (CANCELLING,),
         CANCELLING: END_STATES,
-        CANCELLED: PENDING
+        # Retrying
+        CANCELLED: PENDING,
+        FAILED: PENDING
     }
 
     # region one_to_many relationships

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/123a55ce/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 47270c0..2bd3043 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -38,7 +38,8 @@ DEFAULT_TASK_RETRY_INTERVAL = 30
 class WorkflowRunner(object):
 
     def __init__(self, model_storage, resource_storage, plugin_manager,
-                 execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None,
+                 execution_id=None, retry_failed=False,
+                 service_id=None, workflow_name=None, inputs=None, executor=None,
                  task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
                  task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
         """
@@ -62,6 +63,7 @@ class WorkflowRunner(object):
                 "and service id with inputs")
 
         self._is_resume = execution_id is not None
+        self._retry_failed = retry_failed
 
         self._model_storage = model_storage
         self._resource_storage = resource_storage
@@ -116,7 +118,8 @@ class WorkflowRunner(object):
         return self._model_storage.service.get(self._service_id)
 
     def execute(self):
-        self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume)
+        self._engine.execute(
+            ctx=self._workflow_context, resuming=self._is_resume, retry_failed=self._retry_failed)
 
     def cancel(self):
         self._engine.cancel_execution(ctx=self._workflow_context)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/123a55ce/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index d9c77e9..69505fc 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -41,14 +41,15 @@ class Engine(logger.LoggerMixin):
         self._executors = executors.copy()
         self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
 
-    def execute(self, ctx, resuming=False):
+    def execute(self, ctx, resuming=False, retry_failed=False):
         """
         Executes the workflow.
         """
         if resuming:
-            events.on_resume_workflow_signal.send(ctx)
+            events.on_resume_workflow_signal.send(ctx, retry_failed=retry_failed)
 
         tasks_tracker = _TasksTracker(ctx)
+
         try:
             events.start_workflow_signal.send(ctx)
             while True:
@@ -68,8 +69,15 @@ class Engine(logger.LoggerMixin):
             if cancel:
                 self._terminate_tasks(tasks_tracker.executing_tasks)
                 events.on_cancelled_workflow_signal.send(ctx)
-            else:
+            elif all(task.status == task.SUCCESS or task.ignore_failure
+                     for task in ctx.execution.tasks):
                 events.on_success_workflow_signal.send(ctx)
+            else:
+                exception = "Tasks {tasks} remain failed".format(
+                    tasks=
+                    [t for t in ctx.execution.tasks if t.status == t.SUCCESS or t.ignore_failure]
+                )
+                events.on_failure_workflow_signal.send(ctx, exception=exception)
         except BaseException as e:
             # Cleanup any remaining tasks
             self._terminate_tasks(tasks_tracker.executing_tasks)
@@ -124,8 +132,10 @@ class Engine(logger.LoggerMixin):
 
 
 class _TasksTracker(object):
+
     def __init__(self, ctx):
         self._ctx = ctx
+
         self._tasks = ctx.execution.tasks
         self._executed_tasks = [task for task in self._tasks if task.has_ended()]
         self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks))
@@ -155,7 +165,7 @@ class _TasksTracker(object):
     def executable_tasks(self):
         now = datetime.utcnow()
         # we need both lists since retrying task are in the executing task list.
-        for task in self._update_tasks(self._executing_tasks + self._executable_tasks):
+        for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)):
             if all([task.is_waiting(),
                     task.due_at <= now,
                     all(dependency in self._executed_tasks for dependency in task.dependencies)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/123a55ce/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 37801de..5ac1ce8 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -119,7 +119,7 @@ def _workflow_cancelled(workflow_context, *args, **kwargs):
 
 
 @events.on_resume_workflow_signal.connect
-def _workflow_resume(workflow_context, *args, **kwargs):
+def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs):
     with workflow_context.persist_changes:
         execution = workflow_context.execution
         execution.status = execution.PENDING
@@ -128,6 +128,13 @@ def _workflow_resume(workflow_context, *args, **kwargs):
             if not task.has_ended():
                 task.status = task.PENDING
 
+        if retry_failed:
+            for task in execution.tasks:
+                if task.status == task.FAILED and not task.ignore_failure:
+                    task.attempts_count = 0
+                    task.status = task.PENDING
+
+
 
 @events.on_cancelling_workflow_signal.connect
 def _workflow_cancelling(workflow_context, *args, **kwargs):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/123a55ce/tests/modeling/test_models.py
----------------------------------------------------------------------
diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py
index e1167fc..25b4080 100644
--- a/tests/modeling/test_models.py
+++ b/tests/modeling/test_models.py
@@ -324,8 +324,7 @@ class TestExecution(object):
             Execution.STARTED: [Execution.PENDING],
             Execution.CANCELLING: [Execution.PENDING,
                                    Execution.STARTED],
-            Execution.FAILED: [Execution.PENDING,
-                               Execution.STARTED,
+            Execution.FAILED: [Execution.STARTED,
                                Execution.SUCCEEDED,
                                Execution.CANCELLED,
                                Execution.CANCELLING],

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/123a55ce/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index a77d727..adb19e6 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -51,7 +51,7 @@ custom_events = {
     'is_resumed': Event(),
     'is_active': Event(),
     'execution_cancelled': Event(),
-    'execution_ended': Event()
+    'execution_failed': Event(),
 }
 
 
@@ -166,7 +166,8 @@ def test_execute(request, service):
         assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow'
 
         mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context,
-                                                    resuming=False)
+                                                    resuming=False,
+                                                    retry_failed=False)
 
 
 def test_cancel_execution(request):
@@ -358,10 +359,11 @@ class TestResumableWorkflows(object):
     def test_resume_workflow(self, workflow_context, thread_executor):
         node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
-        self._create_interface(workflow_context, node, mock_resuming_task)
+        self._create_interface(workflow_context, node, mock_failed_task)
 
         wf_runner = self._create_initial_workflow_runner(
-            workflow_context, mock_parallel_workflow, thread_executor)
+            workflow_context, mock_parallel_tasks_workflow, thread_executor,
+            inputs={'number_of_tasks': 2})
 
         wf_thread = Thread(target=wf_runner.execute)
         wf_thread.daemon = True
@@ -369,6 +371,7 @@ class TestResumableWorkflows(object):
 
         # Wait for the execution to start
         self._wait_for_active_and_cancel(wf_runner)
+        node = workflow_context.model.node.refresh(node)
 
         tasks = workflow_context.model.task.list(filters={'_stub_type': None})
         assert any(task.status == task.SUCCESS for task in tasks)
@@ -390,6 +393,7 @@ class TestResumableWorkflows(object):
         new_wf_runner.execute()
 
         # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
         assert all(task.status == task.SUCCESS for task in tasks)
         assert node.attributes['invocations'].value == 3
         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
@@ -400,13 +404,15 @@ class TestResumableWorkflows(object):
         self._create_interface(workflow_context, node, mock_stuck_task)
 
         wf_runner = self._create_initial_workflow_runner(
-            workflow_context, mock_single_task_workflow, thread_executor)
+            workflow_context, mock_parallel_tasks_workflow, thread_executor,
+            inputs={'number_of_tasks': 1})
 
         wf_thread = Thread(target=wf_runner.execute)
         wf_thread.daemon = True
         wf_thread.start()
 
         self._wait_for_active_and_cancel(wf_runner)
+        node = workflow_context.model.node.refresh(node)
         task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
         assert node.attributes['invocations'].value == 1
         assert task.status == task.STARTED
@@ -430,6 +436,7 @@ class TestResumableWorkflows(object):
             new_thread_executor.close()
 
         # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
         assert node.attributes['invocations'].value == 2
         assert task.status == task.SUCCESS
         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
@@ -439,13 +446,15 @@ class TestResumableWorkflows(object):
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
         self._create_interface(workflow_context, node, mock_failed_before_resuming)
 
-        wf_runner = self._create_initial_workflow_runner(
-            workflow_context, mock_single_task_workflow, thread_executor)
+        wf_runner = self._create_initial_workflow_runner(workflow_context,
+                                                         mock_parallel_tasks_workflow,
+                                                         thread_executor)
         wf_thread = Thread(target=wf_runner.execute)
         wf_thread.setDaemon(True)
         wf_thread.start()
 
         self._wait_for_active_and_cancel(wf_runner)
+        node = workflow_context.model.node.refresh(node)
 
         task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
         assert node.attributes['invocations'].value == 2
@@ -474,10 +483,62 @@ class TestResumableWorkflows(object):
             new_thread_executor.close()
 
         # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
         assert node.attributes['invocations'].value == task.max_attempts - 1
         assert task.status == task.SUCCESS
         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
 
+    def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor):
+        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+        self._create_interface(workflow_context, node, mock_failed_task)
+
+        wf_runner = self._create_initial_workflow_runner(
+            workflow_context,
+            mock_parallel_tasks_workflow,
+            thread_executor,
+            inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2}
+        )
+        wf_thread = Thread(target=wf_runner.execute)
+        wf_thread.setDaemon(True)
+        wf_thread.start()
+
+        if custom_events['execution_failed'].wait(60) is False:
+            raise TimeoutError("Execution did not end")
+
+        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == 2
+
+        # First task passes
+        assert any(task.status == task.FAILED for task in tasks)
+        # Second task fails
+        assert any(task.status == task.SUCCESS for task in tasks)
+        assert wf_runner.execution.status in wf_runner.execution.FAILED
+
+        custom_events['is_resumed'].set()
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_wf_runner = WorkflowRunner(
+                service_id=wf_runner.service.id,
+                retry_failed=True,
+                inputs={},
+                model_storage=workflow_context.model,
+                resource_storage=workflow_context.resource,
+                plugin_manager=None,
+                execution_id=wf_runner.execution.id,
+                executor=new_thread_executor)
+
+            new_wf_runner.execute()
+        finally:
+            new_thread_executor.close()
+
+        # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == 3
+        assert all(task.status == task.SUCCESS for task in tasks)
+        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
     @staticmethod
     @pytest.fixture
     def thread_executor():
@@ -524,51 +585,31 @@ class TestResumableWorkflows(object):
         def execution_cancelled(*args, **kwargs):
             custom_events['execution_cancelled'].set()
 
-        def execution_ended(*args, **kwargs):
-            custom_events['execution_ended'].set()
+        def execution_failed(*args, **kwargs):
+            custom_events['execution_failed'].set()
 
         events.on_cancelled_workflow_signal.connect(execution_cancelled)
-        events.on_failure_workflow_signal.connect(execution_ended)
+        events.on_failure_workflow_signal.connect(execution_failed)
         yield
         events.on_cancelled_workflow_signal.disconnect(execution_cancelled)
-        events.on_failure_workflow_signal.disconnect(execution_ended)
+        events.on_failure_workflow_signal.disconnect(execution_failed)
         for event in custom_events.values():
             event.clear()
 
 
 @workflow
-def mock_parallel_workflow(ctx, graph):
-    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-    graph.add_tasks(
-        api.task.OperationTask(
-            node, interface_name='aria.interfaces.lifecycle', operation_name='create'),
-        api.task.OperationTask(
-            node, interface_name='aria.interfaces.lifecycle', operation_name='create')
-    )
-
-
-@operation
-def mock_resuming_task(ctx):
-    ctx.node.attributes['invocations'] += 1
-
-    if ctx.node.attributes['invocations'] != 1:
-        custom_events['is_active'].set()
-        if not custom_events['is_resumed'].isSet():
-            # if resume was called, increase by one. o/w fail the execution - second task should
-            # fail as long it was not a part of resuming the workflow
-            raise FailingTask("wasn't resumed yet")
-
-
-@workflow
-def mock_single_task_workflow(ctx, graph):
+def mock_parallel_tasks_workflow(ctx, graph,
+                                 retry_interval=1, max_attempts=10, number_of_tasks=1):
     node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-    graph.add_tasks(
+    tasks = [
         api.task.OperationTask(node,
-                               interface_name='aria.interfaces.lifecycle',
-                               operation_name='create',
-                               retry_interval=1,
-                               max_attempts=10),
-    )
+                               'aria.interfaces.lifecycle',
+                               'create',
+                               retry_interval=retry_interval,
+                               max_attempts=max_attempts)
+        for _ in xrange(number_of_tasks)
+    ]
+    graph.add_tasks(*tasks)
 
 
 @operation
@@ -600,3 +641,15 @@ def mock_stuck_task(ctx):
         if not custom_events['is_active'].isSet():
             custom_events['is_active'].set()
         time.sleep(5)
+
+
+@operation
+def mock_failed_task(ctx):
+    ctx.node.attributes['invocations'] += 1
+
+    if ctx.node.attributes['invocations'] != 1:
+        custom_events['is_active'].set()
+        if not custom_events['is_resumed'].isSet():
+            # if resume was called, increase by one. o/w fail the execution - second task should
+            # fail as long it was not a part of resuming the workflow
+            raise FailingTask("wasn't resumed yet")


[2/3] incubator-ariatosca git commit: ARIA-312 Validation of workflow and operation kwargs raise False alarms

Posted by mx...@apache.org.
ARIA-312 Validation of workflow and operation kwargs raise False alarms

Workflow and Operation function kwargs validation failed in some scenarios.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/6c2f35ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/6c2f35ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/6c2f35ec

Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions
Commit: 6c2f35ecd4898ac605787f729aff48fa8dd46097
Parents: f903006
Author: max-orlov <ma...@gigaspaces.com>
Authored: Mon Jul 10 17:12:00 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Mon Jul 10 17:22:29 2017 +0300

----------------------------------------------------------------------
 aria/utils/validation.py       |  2 +-
 tests/utils/test_validation.py | 35 +++++++++++++++++++++++++++++++++++
 2 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6c2f35ec/aria/utils/validation.py
----------------------------------------------------------------------
diff --git a/aria/utils/validation.py b/aria/utils/validation.py
index 3452dcc..06989a7 100644
--- a/aria/utils/validation.py
+++ b/aria/utils/validation.py
@@ -78,7 +78,7 @@ def validate_function_arguments(func, func_kwargs):
 
     # all args without the ones with default values
     args = func.func_code.co_varnames[:args_count]
-    non_default_args = args[:len(func.func_defaults)] if func.func_defaults else args
+    non_default_args = args[:len(args) - len(func.func_defaults)] if func.func_defaults else args
 
     # Check if any args without default values is missing in the func_kwargs
     for arg in non_default_args:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6c2f35ec/tests/utils/test_validation.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_validation.py b/tests/utils/test_validation.py
new file mode 100644
index 0000000..8e35f22
--- /dev/null
+++ b/tests/utils/test_validation.py
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.utils import validation
+
+
+def test_function_kwargs_validation():
+
+    def mock_function(arg1, arg2=1, arg3=1):
+        pass
+
+    with pytest.raises(ValueError):
+        validation.validate_function_arguments(mock_function, dict(arg2=1))
+    with pytest.raises(ValueError):
+        validation.validate_function_arguments(mock_function, dict(arg3=3))
+    with pytest.raises(ValueError):
+        validation.validate_function_arguments(mock_function, dict(arg2=2, arg3=3))
+
+    validation.validate_function_arguments(mock_function, dict(arg1=1, arg3=3))
+    validation.validate_function_arguments(mock_function, dict(arg1=1, arg2=2))
+    validation.validate_function_arguments(mock_function, dict(arg1=1, arg2=2, arg3=3))