You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/20 13:25:11 UTC

incubator-ariatosca git commit: phase 2 fixes [Forced Update!]

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-278-Remove-core-tasks 958e28354 -> 2459a3e05 (forced update)


phase 2 fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/2459a3e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/2459a3e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/2459a3e0

Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 2459a3e055f25a5397abb4d73fb095440fd09060
Parents: 7762a49
Author: max-orlov <ma...@gigaspaces.com>
Authored: Tue Jun 20 16:17:11 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Jun 20 16:25:06 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflow_runner.py            |  5 +----
 aria/orchestrator/workflows/core/engine.py      | 12 ++++++----
 aria/orchestrator/workflows/core/task.py        | 23 +++++++++++++-------
 tests/orchestrator/context/__init__.py          |  3 +--
 tests/orchestrator/context/test_serialize.py    |  3 +--
 .../orchestrator/execution_plugin/test_local.py |  3 +--
 tests/orchestrator/execution_plugin/test_ssh.py |  4 +---
 .../orchestrator/workflows/core/test_engine.py  |  3 +--
 .../orchestrator/workflows/core/test_events.py  |  5 ++---
 .../test_task_graph_into_execution_graph.py     |  8 ++-----
 .../executor/test_process_executor_extension.py |  3 +--
 .../test_process_executor_tracked_changes.py    |  3 +--
 12 files changed, 35 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 66eec4a..a57a34e 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -87,10 +87,7 @@ class WorkflowRunner(object):
         execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
 
         self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
-        task.construct_execution_tasks(self.execution, self._tasks_graph, executor.__class__)
-
-        # Update the state
-        self._model_storage.execution.update(execution)
+        task.create_execution_tasks(self._workflow_context, self._tasks_graph, executor.__class__)
 
         self._engine = engine.Engine(executors={executor.__class__: executor})
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 7d542d0..9f0ddd7 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -25,7 +25,8 @@ from aria.modeling import models
 from aria.orchestrator import events
 from aria.orchestrator.context import operation
 
-from .. import exceptions, executor, api
+from .. import exceptions
+from ..executor.base import StubTaskExecutor
 # Import required so all signals are registered
 from . import events_handler  # pylint: disable=unused-import
 
@@ -37,7 +38,8 @@ class Engine(logger.LoggerMixin):
 
     def __init__(self, executors, **kwargs):
         super(Engine, self).__init__(**kwargs)
-        self._executors = executors.copy
+        self._executors = executors.copy()
+        self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
 
     def execute(self, ctx):
         """
@@ -88,7 +90,8 @@ class Engine(logger.LoggerMixin):
             not self._task_has_dependencies(ctx, task)
         )
 
-    def _ended_tasks(self, ctx, executing_tasks):
+    @staticmethod
+    def _ended_tasks(ctx, executing_tasks):
         for task in executing_tasks:
             if task.has_ended() and task in ctx._graph:
                 yield task
@@ -128,7 +131,8 @@ class Engine(logger.LoggerMixin):
             events.sent_task_signal.send(op_ctx)
         task_executor.execute(op_ctx)
 
-    def _handle_ended_tasks(self, ctx, task, executing_tasks):
+    @staticmethod
+    def _handle_ended_tasks(ctx, task, executing_tasks):
         executing_tasks.remove(task)
         if task.status == models.Task.FAILED and not task.ignore_failure:
             raise exceptions.ExecutorException('Workflow failed')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index cf1b7bc..84ce819 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -21,13 +21,20 @@ from ....modeling import models
 from .. import executor, api
 
 
-def construct_execution_tasks(execution,
-                              task_graph,
-                              default_executor,
-                              stub_executor=executor.base.StubTaskExecutor,
-                              start_stub_type=models.Task.START_WORKFLOW,
-                              end_stub_type=models.Task.END_WORKFLOW,
-                              depends_on=()):
+def create_execution_tasks(ctx, task_graph, default_executor):
+    execution = ctx.execution
+    _construct_execution_tasks(execution, task_graph, default_executor)
+    ctx.model.execution.update(execution)
+    return execution.tasks
+
+
+def _construct_execution_tasks(execution,
+                               task_graph,
+                               default_executor,
+                               stub_executor=executor.base.StubTaskExecutor,
+                               start_stub_type=models.Task.START_WORKFLOW,
+                               end_stub_type=models.Task.END_WORKFLOW,
+                               depends_on=()):
     """
     Translates the user graph to the execution graph
     :param task_graph: The user's graph
@@ -55,7 +62,7 @@ def construct_execution_tasks(execution,
 
         elif isinstance(task, api.task.WorkflowTask):
             # Build the graph recursively while adding start and end markers
-            construct_execution_tasks(
+            _construct_execution_tasks(
                 execution=execution,
                 task_graph=task,
                 default_executor=default_executor,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/context/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py
index d6c5d26..b55755b 100644
--- a/tests/orchestrator/context/__init__.py
+++ b/tests/orchestrator/context/__init__.py
@@ -26,8 +26,7 @@ def op_path(func, module_path=None):
 def execute(workflow_func, workflow_context, executor):
     graph = workflow_func(ctx=workflow_context)
 
-    task.construct_execution_tasks(workflow_context.execution, graph, executor.__class__)
-    workflow_context.execution = workflow_context.execution
+    task.create_execution_tasks(workflow_context, graph, executor.__class__)
     eng = engine.Engine(executor)
 
     eng.execute(workflow_context)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index ee02b41..934b914 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -48,8 +48,7 @@ def test_serialize_operation_context(context, executor, tmpdir):
     context.model.node.update(node)
 
     graph = _mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    task.construct_execution_tasks(context.execution, graph, executor.__class__)
-    context.execution = context.execution
+    task.create_execution_tasks(context, graph, executor.__class__)
     eng = engine.Engine(executor)
     eng.execute(context)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 8f48e4a..3936931 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -500,8 +500,7 @@ if __name__ == '__main__':
                 arguments=arguments))
             return graph
         tasks_graph = mock_workflow(ctx=workflow_context)  # pylint: disable=no-value-for-parameter
-        task.construct_execution_tasks(workflow_context.execution, tasks_graph, executor.__class__)
-        workflow_context.execution = workflow_context.execution
+        task.create_execution_tasks(workflow_context, tasks_graph, executor.__class__)
         eng = engine.Engine(executor)
         eng.execute(workflow_context)
         return workflow_context.model.node.get_by_name(

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index bfc0b98..8936a98 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -254,9 +254,7 @@ class TestWithActualSSHServer(object):
             graph.sequence(*ops)
             return graph
         tasks_graph = mock_workflow(ctx=self._workflow_context)  # pylint: disable=no-value-for-parameter
-        task.construct_execution_tasks(
-            self._workflow_context.execution, tasks_graph, self._executor.__class__)
-        self._workflow_context.execution = self._workflow_context.execution
+        task.create_execution_tasks(self._workflow_context, tasks_graph, self._executor.__class__)
         eng = engine.Engine(self._executor)
         eng.execute(self._workflow_context)
         return self._workflow_context.model.node.get_by_name(

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index f6f638f..36aa63a 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -51,8 +51,7 @@ class BaseTest(object):
     def _engine(workflow_func, workflow_context, executor):
         graph = workflow_func(ctx=workflow_context)
         execution = workflow_context.execution
-        task.construct_execution_tasks(execution, graph, executor.__class__)
-        workflow_context.execution = execution
+        task.create_execution_tasks(execution, graph, executor.__class__)
 
         return engine.Engine(executors=executor)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/workflows/core/test_events.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py
index c30b373..befca1a 100644
--- a/tests/orchestrator/workflows/core/test_events.py
+++ b/tests/orchestrator/workflows/core/test_events.py
@@ -113,11 +113,10 @@ def run_operation_on_node(ctx, op_name, interface_name):
         operation_name=op_name,
         operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func)))
     node.interfaces[interface.name] = interface
-    task.construct_execution_tasks(
-        ctx.execution,
+    task.create_execution_tasks(
+        ctx,
         single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name),
         ThreadExecutor)
-    ctx.execution = ctx.execution
 
     eng = engine.Engine(executors=ThreadExecutor())
     eng.execute(ctx)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index faac35f..1c25967 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -17,10 +17,7 @@ from networkx import topological_sort
 
 from aria.modeling import models
 from aria.orchestrator import context
-from aria.orchestrator.workflows import (
-    api,
-    core
-)
+from aria.orchestrator.workflows import api
 from aria.orchestrator.workflows.core import task
 from aria.orchestrator.workflows.executor import base
 from tests import mock
@@ -71,8 +68,7 @@ def test_task_graph_into_execution_graph(tmpdir):
     # Direct check
     execution = workflow_context.model.execution.list()[0]
 
-    task.construct_execution_tasks(execution, test_task_graph, base.StubTaskExecutor)
-    workflow_context.execution = execution
+    task.create_execution_tasks(workflow_context, test_task_graph, base.StubTaskExecutor)
 
     execution_tasks = topological_sort(workflow_context._graph)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 4566ec3..9e4becd 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -57,8 +57,7 @@ def test_decorate_extension(context, executor):
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    task.construct_execution_tasks(context.execution, graph, executor.__class__)
-    context.execution = context.execution
+    task.create_execution_tasks(context, graph, executor.__class__)
     eng = engine.Engine(executor)
     eng.execute(context)
     out = get_node(context).attributes.get('out').value

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 0c15c8a..bfbb359 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -107,8 +107,7 @@ def _run_workflow(context, executor, op_func, arguments=None):
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    task.construct_execution_tasks(context.execution, graph, executor.__class__)
-    context.execution = context.execution
+    task.create_execution_tasks(context, graph, executor.__class__)
     eng = engine.Engine(executor)
     eng.execute(context)
     out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')