You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/15 10:58:48 UTC

incubator-ariatosca git commit: wip2 [Forced Update!]

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-278-Remove-core-tasks ecf5fc9f0 -> 4d9112085 (forced update)


wip2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4d911208
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4d911208
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4d911208

Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 4d911208504f41e581376d0c5f770fa59571119e
Parents: 7f5c620
Author: max-orlov <ma...@gigaspaces.com>
Authored: Thu Jun 15 13:49:19 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu Jun 15 13:58:43 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflow_runner.py            |  4 +-
 aria/orchestrator/workflows/core/engine.py      | 78 ++++++++++----------
 tests/orchestrator/test_workflow_runner.py      | 31 ++++----
 .../orchestrator/workflows/core/test_engine.py  |  6 +-
 .../test_task_graph_into_execution_graph.py     | 17 ++---
 5 files changed, 67 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4d911208/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 919da58..c4d8666 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -109,10 +109,10 @@ class WorkflowRunner(object):
         return self._model_storage.service.get(self._service_id)
 
     def execute(self):
-        self._engine.execute(self._workflow_context)
+        self._engine.execute(ctx=self._workflow_context)
 
     def cancel(self):
-        self._engine.cancel_execution()
+        self._engine.cancel_execution(ctx=self._workflow_context)
 
     def _create_execution_model(self, inputs):
         execution = models.Execution(

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4d911208/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index b9c3439..ade3661 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -38,75 +38,78 @@ class Engine(logger.LoggerMixin):
     def __init__(self, executor, **kwargs):
         super(Engine, self).__init__(**kwargs)
         self._executors = {executor.__class__: executor}
-        self._workflow_context = None
 
     def execute(self, ctx):
         """
         execute the workflow
         """
-        self._workflow_context = ctx
-
         try:
-            events.start_workflow_signal.send(self._workflow_context)
+            events.start_workflow_signal.send(ctx)
             while True:
-                cancel = self._is_cancel()
+                cancel = self._is_cancel(ctx)
                 if cancel:
                     break
-                for task in self._ended_tasks():
-                    self._handle_ended_tasks(task)
-                for task in self._executable_tasks():
-                    self._handle_executable_task(task)
-                if self._all_tasks_consumed():
+                for task in self._ended_tasks(ctx):
+                    self._handle_ended_tasks(ctx, task)
+                for task in self._executable_tasks(ctx):
+                    self._handle_executable_task(ctx, task)
+                if self._all_tasks_consumed(ctx):
                     break
                 else:
                     time.sleep(0.1)
             if cancel:
-                events.on_cancelled_workflow_signal.send(self._workflow_context)
+                events.on_cancelled_workflow_signal.send(ctx)
             else:
-                events.on_success_workflow_signal.send(self._workflow_context)
+                events.on_success_workflow_signal.send(ctx)
         except BaseException as e:
-            events.on_failure_workflow_signal.send(self._workflow_context, exception=e)
+            events.on_failure_workflow_signal.send(ctx, exception=e)
             raise
 
-    def cancel_execution(self):
+    @staticmethod
+    def cancel_execution(ctx):
         """
         Send a cancel request to the engine. If execution already started, execution status
         will be modified to 'cancelling' status. If execution is in pending mode, execution status
         will be modified to 'cancelled' directly.
         """
-        events.on_cancelling_workflow_signal.send(self._workflow_context)
-        self._workflow_context.execution = self._workflow_context.execution
+        events.on_cancelling_workflow_signal.send(ctx)
+        ctx.execution = ctx.execution
 
-    def _is_cancel(self):
-        execution = self._workflow_context.model.execution.update(self._workflow_context.execution)
+    @staticmethod
+    def _is_cancel(ctx):
+        execution = ctx.model.execution.update(ctx.execution)
         return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED)
 
-    def _executable_tasks(self):
+    def _executable_tasks(self, ctx):
         now = datetime.utcnow()
         return (
-            task for task in self._tasks_iter()
-            if task.is_waiting() and task.due_at <= now and not self._task_has_dependencies(task)
+            task for task in self._tasks_iter(ctx)
+            if task.is_waiting() and task.due_at <= now and \
+            not self._task_has_dependencies(ctx, task)
         )
 
-    def _ended_tasks(self):
-        for task in self._tasks_iter():
-            if task.has_ended() and task in self._workflow_context.graph:
+    def _ended_tasks(self, ctx):
+        for task in self._tasks_iter(ctx):
+            if task.has_ended() and task in ctx.graph:
                 yield task
 
-    def _task_has_dependencies(self, task):
+    @staticmethod
+    def _task_has_dependencies(ctx, task):
         return task.dependencies and \
-               all(d in self._workflow_context.graph for d in task.dependencies)
+               all(d in ctx.graph for d in task.dependencies)
 
-    def _all_tasks_consumed(self):
-        return len(self._workflow_context.graph.node) == 0
+    @staticmethod
+    def _all_tasks_consumed(ctx):
+        return len(ctx.graph.node) == 0
 
-    def _tasks_iter(self):
-        for task in self._workflow_context.execution.tasks:
+    @staticmethod
+    def _tasks_iter(ctx):
+        for task in ctx.execution.tasks:
             if not task.has_ended():
-                task = self._workflow_context.model.task.refresh(task)
+                task = ctx.model.task.refresh(task)
             yield task
 
-    def _handle_executable_task(self, task):
+    def _handle_executable_task(self, ctx, task):
         if not task.stub_type:
             events.sent_task_signal.send(task)
 
@@ -116,9 +119,9 @@ class Engine(logger.LoggerMixin):
 
         context_cls = task._context_cls or operation.BaseOperationContext
         op_ctx = context_cls(
-            model_storage=self._workflow_context.model,
-            resource_storage=self._workflow_context.resource,
-            workdir=self._workflow_context._workdir,
+            model_storage=ctx.model,
+            resource_storage=ctx.resource,
+            workdir=ctx._workdir,
             task_id=task.id,
             actor_id=task.actor.id if task.actor else None,
             service_id=task.execution.service.id,
@@ -128,8 +131,9 @@ class Engine(logger.LoggerMixin):
 
         executor.execute(op_ctx)
 
-    def _handle_ended_tasks(self, task):
+    @staticmethod
+    def _handle_ended_tasks(ctx, task):
         if task.status == models.Task.FAILED and not task.ignore_failure:
             raise exceptions.ExecutorException('Workflow failed')
         else:
-            self._workflow_context.graph.remove_node(task)
+            ctx.graph.remove_node(task)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4d911208/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index cd50580..bc4bab0 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -117,30 +117,28 @@ def test_task_configuration_parameters(request):
 
     task_max_attempts = 5
     task_retry_interval = 7
-    with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls:
+    with mock.patch('aria.orchestrator.workflow_runner.Engine.execute') as mock_engine_execute:
         _create_workflow_runner(request, mock_workflow, task_max_attempts=task_max_attempts,
-                                task_retry_interval=task_retry_interval)
-        _, engine_kwargs = mock_engine_cls.call_args
-        # TODO: fix
-        # assert engine_kwargs['workflow_context']._task_max_attempts == task_max_attempts
-        # assert engine_kwargs['workflow_context']._task_retry_interval == task_retry_interval
+                                task_retry_interval=task_retry_interval).execute()
+        _, engine_kwargs = mock_engine_execute.call_args
+        assert engine_kwargs['ctx']._task_max_attempts == task_max_attempts
+        assert engine_kwargs['ctx']._task_retry_interval == task_retry_interval
 
 
 def test_execute(request, service):
     mock_workflow = _setup_mock_workflow_in_service(request)
 
     mock_engine = mock.MagicMock()
-    with mock.patch('aria.orchestrator.workflow_runner.Engine', return_value=mock_engine) \
-            as mock_engine_cls:
+    with mock.patch('aria.orchestrator.workflow_runner.Engine.execute', return_value=mock_engine) \
+            as mock_engine_execute:
         workflow_runner = _create_workflow_runner(request, mock_workflow)
+        workflow_runner.execute()
 
-        _, engine_kwargs = mock_engine_cls.call_args
-        # TODO: fix
-        # assert engine_kwargs['workflow_context'].service.id == service.id
-        # assert engine_kwargs['workflow_context'].execution.workflow_name == 'test_workflow'
+        _, engine_kwargs = mock_engine_execute.call_args
+        assert engine_kwargs['ctx'].service.id == service.id
+        assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow'
 
-        workflow_runner.execute()
-        mock_engine.execute.assert_called_once_with()
+        mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context)
 
 
 def test_cancel_execution(request):
@@ -156,12 +154,9 @@ def test_cancel_execution(request):
 def test_execution_model_creation(request, service, model):
     mock_workflow = _setup_mock_workflow_in_service(request)
 
-    with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls:
+    with mock.patch('aria.orchestrator.workflow_runner.Engine'):
         workflow_runner = _create_workflow_runner(request, mock_workflow)
 
-        _, engine_kwargs = mock_engine_cls.call_args
-        # TODO: fix
-        # assert engine_kwargs['workflow_context'].execution == workflow_runner.execution
         assert model.execution.get(workflow_runner.execution.id) == workflow_runner.execution
         assert workflow_runner.execution.service.id == service.id
         assert workflow_runner.execution.workflow_name == mock_workflow

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4d911208/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index 3a14a44..2fbf4a9 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -45,7 +45,7 @@ class BaseTest(object):
         eng = cls._engine(workflow_func=workflow_func,
                           workflow_context=workflow_context,
                           executor=executor)
-        eng.execute(execution_graph=workflow_runner.get_execution_graph(workflow_context.execution))
+        eng.execute(ctx=workflow_context)
         return eng
 
     @staticmethod
@@ -262,7 +262,7 @@ class TestCancel(BaseTest):
         t = threading.Thread(target=eng.execute, kwargs=dict(ctx=workflow_context))
         t.start()
         time.sleep(10)
-        eng.cancel_execution()
+        eng.cancel_execution(workflow_context)
         t.join(timeout=60) # we need to give this a *lot* of time because Travis can be *very* slow
         assert not t.is_alive() # if join is timed out it will not raise an exception
         assert workflow_context.states == ['start', 'cancel']
@@ -281,7 +281,7 @@ class TestCancel(BaseTest):
         eng = self._engine(workflow_func=mock_workflow,
                            workflow_context=workflow_context,
                            executor=executor)
-        eng.cancel_execution()
+        eng.cancel_execution(workflow_context)
         execution = workflow_context.execution
         assert execution.status == models.Execution.CANCELLED
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4d911208/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index aebae38..61b7ce7 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -29,8 +29,8 @@ from tests import storage
 def test_task_graph_into_execution_graph(tmpdir):
     interface_name = 'Standard'
     operation_name = 'create'
-    task_context = mock.context.simple(str(tmpdir))
-    node = task_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+    workflow_context = mock.context.simple(str(tmpdir))
+    node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
     interface = mock.models.create_interface(
         node.service,
         interface_name,
@@ -38,12 +38,12 @@ def test_task_graph_into_execution_graph(tmpdir):
         operation_kwargs=dict(function='test')
     )
     node.interfaces[interface.name] = interface
-    task_context.model.node.update(node)
+    workflow_context.model.node.update(node)
 
     def sub_workflow(name, **_):
         return api.task_graph.TaskGraph(name)
 
-    with context.workflow.current.push(task_context):
+    with context.workflow.current.push(workflow_context):
         test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph')
         simple_before_task = api.task.OperationTask(
             node,
@@ -68,13 +68,12 @@ def test_task_graph_into_execution_graph(tmpdir):
     test_task_graph.add_dependency(simple_after_task, inner_task_graph)
 
     # Direct check
-    execution = task_context.model.execution.list()[0]
+    execution = workflow_context.model.execution.list()[0]
 
     workflow_runner.construct_execution_tasks(execution, test_task_graph, base.StubTaskExecutor)
-    task_context.execution = execution
+    workflow_context.execution = execution
 
-    execution_graph = workflow_runner.get_execution_graph(execution)
-    execution_tasks = topological_sort(execution_graph)
+    execution_tasks = topological_sort(workflow_context.graph)
 
     assert len(execution_tasks) == 7
 
@@ -100,7 +99,7 @@ def test_task_graph_into_execution_graph(tmpdir):
     _assert_execution_is_api_task(next(execution_tasks), simple_after_task)
     assert next(execution_tasks).stub_type == models.Task.END_WORKFLOW
 
-    storage.release_sqlite_storage(task_context.model)
+    storage.release_sqlite_storage(workflow_context.model)
 
 
 def _assert_execution_is_api_task(execution_task, api_task):