You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mxmrlv <gi...@git.apache.org> on 2017/06/21 07:55:47 UTC

[GitHub] incubator-ariatosca pull request #158: wip

GitHub user mxmrlv opened a pull request:

    https://github.com/apache/incubator-ariatosca/pull/158

    wip

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/apache/incubator-ariatosca ARIA-236-Resumable-workflow-executions

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-ariatosca/pull/158.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #158
    
----
commit 8fdea04ff2ba38855d415642d245a020ff4305de
Author: max-orlov <ma...@gigaspaces.com>
Date:   2017-06-19T14:44:45Z

    wip

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #158: ARIA-236 Resumable workflow execution...

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/158#discussion_r123254181
  
    --- Diff: tests/orchestrator/test_workflow_runner.py ---
    @@ -293,3 +304,129 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
             resource_storage=resource,
             plugin_manager=plugin_manager,
             **task_configuration_kwargs)
    +
    +
    +class TestResumableWorkflows(object):
    +
    +    def test_resume_workflow(self, workflow_context, executor):
    +        self._create_interface(workflow_context, mock_success_task)
    +
    +        service = workflow_context.service
    +        service.workflows['custom_workflow'] = tests_mock.models.create_operation(
    +            'custom_workflow',
    +            operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)}
    +        )
    +        workflow_context.model.service.update(service)
    +
    +        wf_runner = WorkflowRunner(
    +            service_id=workflow_context.service.id,
    +            inputs={},
    +            model_storage=workflow_context.model,
    +            resource_storage=workflow_context.resource,
    +            plugin_manager=None,
    +            workflow_name='custom_workflow',
    +            executor=executor)
    +        wf_thread = Thread(target=wf_runner.execute)
    +        try:
    +            wf_thread.start()
    +
    +            # Wait for the execution to start
    +            while global_test_holder.get('state') != 'active':
    +                pass
    --- End diff --
    
    sleep(0.1)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #158: ARIA-236 Resumable workflow execution...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-ariatosca/pull/158


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #158: ARIA-236 Resumable workflow execution...

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/158#discussion_r123253556
  
    --- Diff: tests/orchestrator/test_workflow_runner.py ---
    @@ -293,3 +304,129 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
             resource_storage=resource,
             plugin_manager=plugin_manager,
             **task_configuration_kwargs)
    +
    +
    +class TestResumableWorkflows(object):
    +
    +    def test_resume_workflow(self, workflow_context, executor):
    +        self._create_interface(workflow_context, mock_success_task)
    +
    +        service = workflow_context.service
    +        service.workflows['custom_workflow'] = tests_mock.models.create_operation(
    +            'custom_workflow',
    +            operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)}
    +        )
    +        workflow_context.model.service.update(service)
    +
    +        wf_runner = WorkflowRunner(
    +            service_id=workflow_context.service.id,
    +            inputs={},
    +            model_storage=workflow_context.model,
    +            resource_storage=workflow_context.resource,
    +            plugin_manager=None,
    +            workflow_name='custom_workflow',
    +            executor=executor)
    +        wf_thread = Thread(target=wf_runner.execute)
    +        try:
    +            wf_thread.start()
    +
    +            # Wait for the execution to start
    +            while global_test_holder.get('state') != 'active':
    +                pass
    +            global_test_holder['state'] = 'terminated'
    +            wf_runner.cancel()
    +
    +            # Make sure the execution was canceled and the task has not ended
    +            while wf_runner.execution.status != workflow_context.execution.CANCELLED:
    +                pass
    +            task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
    +            assert task.status in (task.FAILED, task.RETRYING)
    +            assert global_test_holder['state'] == 'idle'
    +
    +            # Create a new workflow runner, with an existing execution id. This would cause
    +            # the old execution to restart.
    +            new_wf_runner = WorkflowRunner(
    +                service_id=wf_runner.service.id,
    +                inputs={},
    +                model_storage=workflow_context.model,
    +                resource_storage=workflow_context.resource,
    +                plugin_manager=None,
    +                execution_id=wf_runner.execution.id,
    +                executor=executor)
    +
    +            # Set the resumed to True, for the execution to succeed.
    +            global_test_holder['resumed'] = True
    +            new_wf_runner.execute()
    +
    +            # Wait for it to finish and assert changes.
    +            while global_test_holder.get('state') != 'ended':
    +                pass
    +            assert task.status == task.SUCCESS
    +            assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
    +
    +        except:
    +            global_test_holder['state'] = 'terminated'
    +            wf_thread.join(0.5)
    +            raise
    +
    +    @staticmethod
    +    @pytest.fixture
    +    def executor():
    +        result = thread.ThreadExecutor()
    +        try:
    +            yield result
    +        finally:
    +            result.close()
    +
    +    @staticmethod
    +    @pytest.fixture
    +    def workflow_context(tmpdir):
    +        workflow_context = tests_mock.context.simple(str(tmpdir))
    +        yield workflow_context
    +        storage.release_sqlite_storage(workflow_context.model)
    +
    +    @staticmethod
    +    def _create_interface(ctx, func, arguments=None):
    +        node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
    +        interface_name = 'aria.interfaces.lifecycle'
    +        operation_kwargs = dict(function='{name}.{func.__name__}'.format(
    +            name=__name__, func=func))
    +        if arguments:
    +            # the operation has to declare the arguments before those may be passed
    +            operation_kwargs['arguments'] = arguments
    +        operation_name = 'create'
    +        interface = tests_mock.models.create_interface(node.service, interface_name, operation_name,
    +                                                       operation_kwargs=operation_kwargs)
    +        node.interfaces[interface.name] = interface
    +        ctx.model.node.update(node)
    +
    +        return node, interface_name, operation_name
    +
    +    @staticmethod
    +    def _engine(workflow_func, workflow_context, executor):
    +        graph = workflow_func(ctx=workflow_context)
    +        execution = workflow_context.execution
    +        compile.create_execution_tasks(execution, graph, executor.__class__)
    +        workflow_context.execution = execution
    +
    +        return engine.Engine(executors={executor.__class__: executor})
    +
    +
    +@workflow
    +def mock_workflow(ctx, graph):
    +    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
    +    graph.add_tasks(
    +        api.task.OperationTask(
    +            node, interface_name='aria.interfaces.lifecycle', operation_name='create')
    +    )
    +
    +
    +@operation
    +def mock_success_task(**_):
    --- End diff --
    
    rename


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #158: ARIA-236 Resumable workflow execution...

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/158#discussion_r123454711
  
    --- Diff: aria/orchestrator/workflows/core/engine.py ---
    @@ -69,7 +73,7 @@ def execute(self, ctx):
                 raise
     
         @staticmethod
    -    def resume_execution(ctx):
    +    def _resuming_execution(ctx):
    --- End diff --
    
    flatten


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #158: ARIA-236 Resumable workflow execution...

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/158#discussion_r123252174
  
    --- Diff: aria/orchestrator/workflow_runner.py ---
    @@ -86,12 +89,17 @@ def __init__(self, workflow_name, service_id, inputs,
             # transforming the execution inputs to dict, to pass them to the workflow function
             execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
     
    -        self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
    -        compile.create_execution_tasks(
    -            self._workflow_context, self._tasks_graph, executor.__class__)
    +        if execution_id is None:
    +            # Not an existing execution
    +            workflow_fn = self._get_workflow_fn()
    +            tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
    +            compile.create_execution_tasks(self._workflow_context, tasks_graph, executor.__class__)
     
             self._engine = engine.Engine(executors={executor.__class__: executor})
     
    +        if workflow_name is None:
    +            self._engine.resume_execution(self._workflow_context)
    --- End diff --
    
    move to function


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca issue #158: wip

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/incubator-ariatosca/pull/158
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #158: ARIA-236 Resumable workflow execution...

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/158#discussion_r123451368
  
    --- Diff: aria/orchestrator/workflow_runner.py ---
    @@ -55,23 +55,29 @@ def __init__(self, inputs, model_storage, resource_storage, plugin_manager,
             :param task_retry_interval: Retry interval in between retry attempts of a failing task
             """
     
    +        self._is_resume = execution_id is not None
    --- End diff --
    
    single boolean


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #158: ARIA-236 Resumable workflow execution...

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/158#discussion_r123255169
  
    --- Diff: tests/orchestrator/test_workflow_runner.py ---
    @@ -293,3 +304,129 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
             resource_storage=resource,
             plugin_manager=plugin_manager,
             **task_configuration_kwargs)
    +
    +
    +class TestResumableWorkflows(object):
    +
    +    def test_resume_workflow(self, workflow_context, executor):
    +        self._create_interface(workflow_context, mock_success_task)
    +
    +        service = workflow_context.service
    +        service.workflows['custom_workflow'] = tests_mock.models.create_operation(
    +            'custom_workflow',
    +            operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)}
    +        )
    +        workflow_context.model.service.update(service)
    +
    +        wf_runner = WorkflowRunner(
    +            service_id=workflow_context.service.id,
    +            inputs={},
    +            model_storage=workflow_context.model,
    +            resource_storage=workflow_context.resource,
    +            plugin_manager=None,
    +            workflow_name='custom_workflow',
    +            executor=executor)
    +        wf_thread = Thread(target=wf_runner.execute)
    +        try:
    +            wf_thread.start()
    +
    +            # Wait for the execution to start
    +            while global_test_holder.get('state') != 'active':
    +                pass
    +            global_test_holder['state'] = 'terminated'
    +            wf_runner.cancel()
    +
    +            # Make sure the execution was canceled and the task has not ended
    +            while wf_runner.execution.status != workflow_context.execution.CANCELLED:
    +                pass
    +            task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
    +            assert task.status in (task.FAILED, task.RETRYING)
    +            assert global_test_holder['state'] == 'idle'
    +
    +            # Create a new workflow runner, with an existing execution id. This would cause
    +            # the old execution to restart.
    +            new_wf_runner = WorkflowRunner(
    +                service_id=wf_runner.service.id,
    +                inputs={},
    +                model_storage=workflow_context.model,
    +                resource_storage=workflow_context.resource,
    +                plugin_manager=None,
    +                execution_id=wf_runner.execution.id,
    +                executor=executor)
    +
    +            # Set the resumed to True, for the execution to succeed.
    +            global_test_holder['resumed'] = True
    +            new_wf_runner.execute()
    +
    +            # Wait for it to finish and assert changes.
    +            while global_test_holder.get('state') != 'ended':
    +                pass
    +            assert task.status == task.SUCCESS
    +            assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
    +
    +        except:
    +            global_test_holder['state'] = 'terminated'
    +            wf_thread.join(0.5)
    +            raise
    +
    +    @staticmethod
    +    @pytest.fixture
    +    def executor():
    +        result = thread.ThreadExecutor()
    +        try:
    +            yield result
    +        finally:
    +            result.close()
    +
    +    @staticmethod
    +    @pytest.fixture
    +    def workflow_context(tmpdir):
    +        workflow_context = tests_mock.context.simple(str(tmpdir))
    +        yield workflow_context
    +        storage.release_sqlite_storage(workflow_context.model)
    +
    +    @staticmethod
    +    def _create_interface(ctx, func, arguments=None):
    +        node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
    +        interface_name = 'aria.interfaces.lifecycle'
    +        operation_kwargs = dict(function='{name}.{func.__name__}'.format(
    +            name=__name__, func=func))
    +        if arguments:
    +            # the operation has to declare the arguments before those may be passed
    +            operation_kwargs['arguments'] = arguments
    +        operation_name = 'create'
    +        interface = tests_mock.models.create_interface(node.service, interface_name, operation_name,
    +                                                       operation_kwargs=operation_kwargs)
    +        node.interfaces[interface.name] = interface
    +        ctx.model.node.update(node)
    +
    +        return node, interface_name, operation_name
    +
    +    @staticmethod
    +    def _engine(workflow_func, workflow_context, executor):
    +        graph = workflow_func(ctx=workflow_context)
    +        execution = workflow_context.execution
    +        compile.create_execution_tasks(execution, graph, executor.__class__)
    +        workflow_context.execution = execution
    +
    +        return engine.Engine(executors={executor.__class__: executor})
    +
    +
    +@workflow
    +def mock_workflow(ctx, graph):
    +    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
    +    graph.add_tasks(
    +        api.task.OperationTask(
    +            node, interface_name='aria.interfaces.lifecycle', operation_name='create')
    +    )
    +
    +
    +@operation
    +def mock_success_task(**_):
    +    global_test_holder['state'] = 'active'
    +    while global_test_holder.get('state') != 'terminated':
    +        if global_test_holder.get('resumed') is True:
    --- End diff --
    
    wait


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #158: ARIA-236 Resumable workflow execution...

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/158#discussion_r123254413
  
    --- Diff: tests/orchestrator/test_workflow_runner.py ---
    @@ -293,3 +304,129 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
             resource_storage=resource,
             plugin_manager=plugin_manager,
             **task_configuration_kwargs)
    +
    +
    +class TestResumableWorkflows(object):
    +
    +    def test_resume_workflow(self, workflow_context, executor):
    +        self._create_interface(workflow_context, mock_success_task)
    +
    +        service = workflow_context.service
    +        service.workflows['custom_workflow'] = tests_mock.models.create_operation(
    +            'custom_workflow',
    +            operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)}
    +        )
    +        workflow_context.model.service.update(service)
    +
    +        wf_runner = WorkflowRunner(
    +            service_id=workflow_context.service.id,
    +            inputs={},
    +            model_storage=workflow_context.model,
    +            resource_storage=workflow_context.resource,
    +            plugin_manager=None,
    +            workflow_name='custom_workflow',
    +            executor=executor)
    +        wf_thread = Thread(target=wf_runner.execute)
    +        try:
    +            wf_thread.start()
    +
    +            # Wait for the execution to start
    +            while global_test_holder.get('state') != 'active':
    +                pass
    +            global_test_holder['state'] = 'terminated'
    +            wf_runner.cancel()
    +
    +            # Make sure the execution was canceled and the task has not ended
    +            while wf_runner.execution.status != workflow_context.execution.CANCELLED:
    +                pass
    --- End diff --
    
    same


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #158: ARIA-236 Resumable workflow execution...

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/158#discussion_r123252512
  
    --- Diff: aria/orchestrator/workflows/core/engine.py ---
    @@ -69,6 +69,10 @@ def execute(self, ctx):
                 raise
     
         @staticmethod
    +    def resume_execution(ctx):
    +        events.on_resume_workflow_signal.send(ctx)
    --- End diff --
    
    ???


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #158: ARIA-236 Resumable workflow execution...

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/158#discussion_r123252017
  
    --- Diff: aria/orchestrator/workflow_runner.py ---
    @@ -57,26 +57,29 @@ def __init__(self, workflow_name, service_id, inputs,
     
             self._model_storage = model_storage
             self._resource_storage = resource_storage
    -        self._workflow_name = workflow_name
     
             # the IDs are stored rather than the models themselves, so this module could be used
             # by several threads without raising errors on model objects shared between threads
    -        self._service_id = service_id
     
    -        self._validate_workflow_exists_for_service()
    -
    -        workflow_fn = self._get_workflow_fn()
    -
    -        execution = self._create_execution_model(inputs)
    -        self._execution_id = execution.id
    +        if workflow_name is not None and service_id is not None and execution_id is None:
    +            self._service_id = service_id
    +            self._workflow_name = workflow_name
    +            self._validate_workflow_exists_for_service()
    +            self._execution_id = self._create_execution_model(inputs).id
    +        elif execution_id is not None:
    +            self._execution_id = execution_id
    +            self._service_id = self.execution.service.id
    +            self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
    +        else:
    +            raise exceptions.InvalidWorkflowRunnerParams("")
    --- End diff --
    
    msg..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #158: ARIA-236 Resumable workflow execution...

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/158#discussion_r123456241
  
    --- Diff: tests/orchestrator/test_workflow_runner.py ---
    @@ -327,47 +335,39 @@ def test_resume_workflow(self, workflow_context, executor):
                 workflow_name='custom_workflow',
                 executor=executor)
             wf_thread = Thread(target=wf_runner.execute)
    -        try:
    -            wf_thread.start()
    -
    -            # Wait for the execution to start
    -            while global_test_holder.get('state') != 'active':
    -                pass
    -            global_test_holder['state'] = 'terminated'
    -            wf_runner.cancel()
    -
    -            # Make sure the execution was canceled and the task has not ended
    -            while wf_runner.execution.status != workflow_context.execution.CANCELLED:
    -                pass
    -            task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
    -            assert task.status in (task.FAILED, task.RETRYING)
    -            assert global_test_holder['state'] == 'idle'
    -
    -            # Create a new workflow runner, with an existing execution id. This would cause
    -            # the old execution to restart.
    -            new_wf_runner = WorkflowRunner(
    -                service_id=wf_runner.service.id,
    -                inputs={},
    -                model_storage=workflow_context.model,
    -                resource_storage=workflow_context.resource,
    -                plugin_manager=None,
    -                execution_id=wf_runner.execution.id,
    -                executor=executor)
    -
    -            # Set the resumed to True, for the execution to succeed.
    -            global_test_holder['resumed'] = True
    -            new_wf_runner.execute()
    -
    -            # Wait for it to finish and assert changes.
    -            while global_test_holder.get('state') != 'ended':
    -                pass
    -            assert task.status == task.SUCCESS
    -            assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
    -
    -        except:
    -            global_test_holder['state'] = 'terminated'
    -            wf_thread.join(0.5)
    -            raise
    +        wf_thread.daemon = True
    +        wf_thread.start()
    +
    +        # Wait for the execution to start
    +        events['is_active'].wait(5)
    --- End diff --
    
    raise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #158: ARIA-236 Resumable workflow execution...

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/158#discussion_r123254882
  
    --- Diff: tests/orchestrator/test_workflow_runner.py ---
    @@ -293,3 +304,129 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
             resource_storage=resource,
             plugin_manager=plugin_manager,
             **task_configuration_kwargs)
    +
    +
    +class TestResumableWorkflows(object):
    +
    +    def test_resume_workflow(self, workflow_context, executor):
    +        self._create_interface(workflow_context, mock_success_task)
    +
    +        service = workflow_context.service
    +        service.workflows['custom_workflow'] = tests_mock.models.create_operation(
    +            'custom_workflow',
    +            operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)}
    +        )
    +        workflow_context.model.service.update(service)
    +
    +        wf_runner = WorkflowRunner(
    +            service_id=workflow_context.service.id,
    +            inputs={},
    +            model_storage=workflow_context.model,
    +            resource_storage=workflow_context.resource,
    +            plugin_manager=None,
    +            workflow_name='custom_workflow',
    +            executor=executor)
    +        wf_thread = Thread(target=wf_runner.execute)
    +        try:
    +            wf_thread.start()
    +
    +            # Wait for the execution to start
    +            while global_test_holder.get('state') != 'active':
    +                pass
    +            global_test_holder['state'] = 'terminated'
    +            wf_runner.cancel()
    +
    +            # Make sure the execution was canceled and the task has not ended
    +            while wf_runner.execution.status != workflow_context.execution.CANCELLED:
    +                pass
    +            task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
    +            assert task.status in (task.FAILED, task.RETRYING)
    +            assert global_test_holder['state'] == 'idle'
    +
    +            # Create a new workflow runner, with an existing execution id. This would cause
    +            # the old execution to restart.
    +            new_wf_runner = WorkflowRunner(
    +                service_id=wf_runner.service.id,
    +                inputs={},
    +                model_storage=workflow_context.model,
    +                resource_storage=workflow_context.resource,
    +                plugin_manager=None,
    +                execution_id=wf_runner.execution.id,
    +                executor=executor)
    +
    +            # Set the resumed to True, for the execution to succeed.
    +            global_test_holder['resumed'] = True
    +            new_wf_runner.execute()
    +
    +            # Wait for it to finish and assert changes.
    +            while global_test_holder.get('state') != 'ended':
    +                pass
    +            assert task.status == task.SUCCESS
    +            assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
    +
    +        except:
    --- End diff --
    
    no needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #158: ARIA-236 Resumable workflow execution...

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/158#discussion_r123254985
  
    --- Diff: tests/orchestrator/test_workflow_runner.py ---
    @@ -293,3 +304,129 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
             resource_storage=resource,
             plugin_manager=plugin_manager,
             **task_configuration_kwargs)
    +
    +
    +class TestResumableWorkflows(object):
    +
    +    def test_resume_workflow(self, workflow_context, executor):
    +        self._create_interface(workflow_context, mock_success_task)
    +
    +        service = workflow_context.service
    +        service.workflows['custom_workflow'] = tests_mock.models.create_operation(
    +            'custom_workflow',
    +            operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)}
    +        )
    +        workflow_context.model.service.update(service)
    +
    +        wf_runner = WorkflowRunner(
    +            service_id=workflow_context.service.id,
    +            inputs={},
    +            model_storage=workflow_context.model,
    +            resource_storage=workflow_context.resource,
    +            plugin_manager=None,
    +            workflow_name='custom_workflow',
    +            executor=executor)
    +        wf_thread = Thread(target=wf_runner.execute)
    --- End diff --
    
    daemon


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #158: ARIA-236 Resumable workflow execution...

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/158#discussion_r123251954
  
    --- Diff: aria/orchestrator/workflow_runner.py ---
    @@ -57,26 +57,29 @@ def __init__(self, workflow_name, service_id, inputs,
     
             self._model_storage = model_storage
             self._resource_storage = resource_storage
    -        self._workflow_name = workflow_name
     
             # the IDs are stored rather than the models themselves, so this module could be used
             # by several threads without raising errors on model objects shared between threads
    -        self._service_id = service_id
     
    -        self._validate_workflow_exists_for_service()
    -
    -        workflow_fn = self._get_workflow_fn()
    -
    -        execution = self._create_execution_model(inputs)
    -        self._execution_id = execution.id
    +        if workflow_name is not None and service_id is not None and execution_id is None:
    --- End diff --
    
    create single boolean


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-ariatosca pull request #158: ARIA-236 Resumable workflow execution...

Posted by mxmrlv <gi...@git.apache.org>.
Github user mxmrlv commented on a diff in the pull request:

    https://github.com/apache/incubator-ariatosca/pull/158#discussion_r123249445
  
    --- Diff: aria/cli/commands/executions.py ---
    @@ -134,18 +134,64 @@ def start(workflow_name,
         executor = DryExecutor() if dry else None  # use WorkflowRunner's default executor
     
         workflow_runner = \
    -        WorkflowRunner(workflow_name, service.id, inputs,
    -                       model_storage, resource_storage, plugin_manager,
    -                       executor, task_max_attempts, task_retry_interval)
    +        WorkflowRunner(
    +            inputs, model_storage, resource_storage, plugin_manager,
    +            service_id=service.id, workflow_name=workflow_name, executor=executor,
    +            task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
    +        )
    +    logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
    +
    +    _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)
    +
     
    -    execution_thread_name = '{0}_{1}'.format(service_name, workflow_name)
    +@executions.command(name='resume',
    +                    short_help='Resume a workflow')
    +@aria.argument('execution-id')
    +@aria.options.inputs(help=helptexts.EXECUTION_INPUTS)
    +@aria.options.dry_execution
    +@aria.options.task_max_attempts()
    +@aria.options.task_retry_interval()
    +@aria.options.mark_pattern()
    +@aria.options.verbose()
    +@aria.pass_model_storage
    +@aria.pass_resource_storage
    +@aria.pass_plugin_manager
    +@aria.pass_logger
    +def resume(execution_id,
    +           inputs,
    --- End diff --
    
    remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---