You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/20 13:25:11 UTC
incubator-ariatosca git commit: phase 2 fixes [Forced Update!]
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-278-Remove-core-tasks 958e28354 -> 2459a3e05 (forced update)
phase 2 fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/2459a3e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/2459a3e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/2459a3e0
Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 2459a3e055f25a5397abb4d73fb095440fd09060
Parents: 7762a49
Author: max-orlov <ma...@gigaspaces.com>
Authored: Tue Jun 20 16:17:11 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Jun 20 16:25:06 2017 +0300
----------------------------------------------------------------------
aria/orchestrator/workflow_runner.py | 5 +----
aria/orchestrator/workflows/core/engine.py | 12 ++++++----
aria/orchestrator/workflows/core/task.py | 23 +++++++++++++-------
tests/orchestrator/context/__init__.py | 3 +--
tests/orchestrator/context/test_serialize.py | 3 +--
.../orchestrator/execution_plugin/test_local.py | 3 +--
tests/orchestrator/execution_plugin/test_ssh.py | 4 +---
.../orchestrator/workflows/core/test_engine.py | 3 +--
.../orchestrator/workflows/core/test_events.py | 5 ++---
.../test_task_graph_into_execution_graph.py | 8 ++-----
.../executor/test_process_executor_extension.py | 3 +--
.../test_process_executor_tracked_changes.py | 3 +--
12 files changed, 35 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 66eec4a..a57a34e 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -87,10 +87,7 @@ class WorkflowRunner(object):
execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
- task.construct_execution_tasks(self.execution, self._tasks_graph, executor.__class__)
-
- # Update the state
- self._model_storage.execution.update(execution)
+ task.create_execution_tasks(self._workflow_context, self._tasks_graph, executor.__class__)
self._engine = engine.Engine(executors={executor.__class__: executor})
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 7d542d0..9f0ddd7 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -25,7 +25,8 @@ from aria.modeling import models
from aria.orchestrator import events
from aria.orchestrator.context import operation
-from .. import exceptions, executor, api
+from .. import exceptions
+from ..executor.base import StubTaskExecutor
# Import required so all signals are registered
from . import events_handler # pylint: disable=unused-import
@@ -37,7 +38,8 @@ class Engine(logger.LoggerMixin):
def __init__(self, executors, **kwargs):
super(Engine, self).__init__(**kwargs)
- self._executors = executors.copy
+ self._executors = executors.copy()
+ self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
def execute(self, ctx):
"""
@@ -88,7 +90,8 @@ class Engine(logger.LoggerMixin):
not self._task_has_dependencies(ctx, task)
)
- def _ended_tasks(self, ctx, executing_tasks):
+ @staticmethod
+ def _ended_tasks(ctx, executing_tasks):
for task in executing_tasks:
if task.has_ended() and task in ctx._graph:
yield task
@@ -128,7 +131,8 @@ class Engine(logger.LoggerMixin):
events.sent_task_signal.send(op_ctx)
task_executor.execute(op_ctx)
- def _handle_ended_tasks(self, ctx, task, executing_tasks):
+ @staticmethod
+ def _handle_ended_tasks(ctx, task, executing_tasks):
executing_tasks.remove(task)
if task.status == models.Task.FAILED and not task.ignore_failure:
raise exceptions.ExecutorException('Workflow failed')
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index cf1b7bc..84ce819 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -21,13 +21,20 @@ from ....modeling import models
from .. import executor, api
-def construct_execution_tasks(execution,
- task_graph,
- default_executor,
- stub_executor=executor.base.StubTaskExecutor,
- start_stub_type=models.Task.START_WORKFLOW,
- end_stub_type=models.Task.END_WORKFLOW,
- depends_on=()):
+def create_execution_tasks(ctx, task_graph, default_executor):
+ execution = ctx.execution
+ _construct_execution_tasks(execution, task_graph, default_executor)
+ ctx.model.execution.update(execution)
+ return execution.tasks
+
+
+def _construct_execution_tasks(execution,
+ task_graph,
+ default_executor,
+ stub_executor=executor.base.StubTaskExecutor,
+ start_stub_type=models.Task.START_WORKFLOW,
+ end_stub_type=models.Task.END_WORKFLOW,
+ depends_on=()):
"""
Translates the user graph to the execution graph
:param task_graph: The user's graph
@@ -55,7 +62,7 @@ def construct_execution_tasks(execution,
elif isinstance(task, api.task.WorkflowTask):
# Build the graph recursively while adding start and end markers
- construct_execution_tasks(
+ _construct_execution_tasks(
execution=execution,
task_graph=task,
default_executor=default_executor,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/context/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py
index d6c5d26..b55755b 100644
--- a/tests/orchestrator/context/__init__.py
+++ b/tests/orchestrator/context/__init__.py
@@ -26,8 +26,7 @@ def op_path(func, module_path=None):
def execute(workflow_func, workflow_context, executor):
graph = workflow_func(ctx=workflow_context)
- task.construct_execution_tasks(workflow_context.execution, graph, executor.__class__)
- workflow_context.execution = workflow_context.execution
+ task.create_execution_tasks(workflow_context, graph, executor.__class__)
eng = engine.Engine(executor)
eng.execute(workflow_context)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index ee02b41..934b914 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -48,8 +48,7 @@ def test_serialize_operation_context(context, executor, tmpdir):
context.model.node.update(node)
graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- task.construct_execution_tasks(context.execution, graph, executor.__class__)
- context.execution = context.execution
+ task.create_execution_tasks(context, graph, executor.__class__)
eng = engine.Engine(executor)
eng.execute(context)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 8f48e4a..3936931 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -500,8 +500,7 @@ if __name__ == '__main__':
arguments=arguments))
return graph
tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter
- task.construct_execution_tasks(workflow_context.execution, tasks_graph, executor.__class__)
- workflow_context.execution = workflow_context.execution
+ task.create_execution_tasks(workflow_context, tasks_graph, executor.__class__)
eng = engine.Engine(executor)
eng.execute(workflow_context)
return workflow_context.model.node.get_by_name(
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index bfc0b98..8936a98 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -254,9 +254,7 @@ class TestWithActualSSHServer(object):
graph.sequence(*ops)
return graph
tasks_graph = mock_workflow(ctx=self._workflow_context) # pylint: disable=no-value-for-parameter
- task.construct_execution_tasks(
- self._workflow_context.execution, tasks_graph, self._executor.__class__)
- self._workflow_context.execution = self._workflow_context.execution
+ task.create_execution_tasks(self._workflow_context, tasks_graph, self._executor.__class__)
eng = engine.Engine(self._executor)
eng.execute(self._workflow_context)
return self._workflow_context.model.node.get_by_name(
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index f6f638f..36aa63a 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -51,8 +51,7 @@ class BaseTest(object):
def _engine(workflow_func, workflow_context, executor):
graph = workflow_func(ctx=workflow_context)
execution = workflow_context.execution
- task.construct_execution_tasks(execution, graph, executor.__class__)
- workflow_context.execution = execution
+ task.create_execution_tasks(execution, graph, executor.__class__)
return engine.Engine(executors=executor)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/workflows/core/test_events.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py
index c30b373..befca1a 100644
--- a/tests/orchestrator/workflows/core/test_events.py
+++ b/tests/orchestrator/workflows/core/test_events.py
@@ -113,11 +113,10 @@ def run_operation_on_node(ctx, op_name, interface_name):
operation_name=op_name,
operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func)))
node.interfaces[interface.name] = interface
- task.construct_execution_tasks(
- ctx.execution,
+ task.create_execution_tasks(
+ ctx,
single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name),
ThreadExecutor)
- ctx.execution = ctx.execution
eng = engine.Engine(executors=ThreadExecutor())
eng.execute(ctx)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index faac35f..1c25967 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -17,10 +17,7 @@ from networkx import topological_sort
from aria.modeling import models
from aria.orchestrator import context
-from aria.orchestrator.workflows import (
- api,
- core
-)
+from aria.orchestrator.workflows import api
from aria.orchestrator.workflows.core import task
from aria.orchestrator.workflows.executor import base
from tests import mock
@@ -71,8 +68,7 @@ def test_task_graph_into_execution_graph(tmpdir):
# Direct check
execution = workflow_context.model.execution.list()[0]
- task.construct_execution_tasks(execution, test_task_graph, base.StubTaskExecutor)
- workflow_context.execution = execution
+ task.create_execution_tasks(workflow_context, test_task_graph, base.StubTaskExecutor)
execution_tasks = topological_sort(workflow_context._graph)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 4566ec3..9e4becd 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -57,8 +57,7 @@ def test_decorate_extension(context, executor):
graph.add_tasks(task)
return graph
graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- task.construct_execution_tasks(context.execution, graph, executor.__class__)
- context.execution = context.execution
+ task.create_execution_tasks(context, graph, executor.__class__)
eng = engine.Engine(executor)
eng.execute(context)
out = get_node(context).attributes.get('out').value
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2459a3e0/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 0c15c8a..bfbb359 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -107,8 +107,7 @@ def _run_workflow(context, executor, op_func, arguments=None):
graph.add_tasks(task)
return graph
graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- task.construct_execution_tasks(context.execution, graph, executor.__class__)
- context.execution = context.execution
+ task.create_execution_tasks(context, graph, executor.__class__)
eng = engine.Engine(executor)
eng.execute(context)
out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')