You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/14 14:17:21 UTC
incubator-ariatosca git commit: tasks no longer execute their executor
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-278-Remove-core-tasks 092b45f0d -> 58e212c7d
tasks no longer execute their executor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/58e212c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/58e212c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/58e212c7
Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 58e212c7dbdea7aa9b532bba9fea4a5359a49019
Parents: 092b45f
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed Jun 14 17:17:16 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed Jun 14 17:17:16 2017 +0300
----------------------------------------------------------------------
aria/modeling/orchestration.py | 36 ++----
aria/orchestrator/context/operation.py | 3 +
aria/orchestrator/workflow_runner.py | 123 +++++++++++++++++--
aria/orchestrator/workflows/core/__init__.py | 2 +-
aria/orchestrator/workflows/core/engine.py | 60 +++++----
.../workflows/core/events_handler.py | 2 +-
aria/orchestrator/workflows/core/translation.py | 112 -----------------
aria/orchestrator/workflows/executor/base.py | 13 +-
aria/orchestrator/workflows/executor/thread.py | 2 -
tests/orchestrator/context/__init__.py | 11 +-
tests/orchestrator/context/test_operation.py | 20 ++-
tests/orchestrator/context/test_serialize.py | 15 ++-
tests/orchestrator/context/test_toolbelt.py | 7 +-
.../orchestrator/execution_plugin/test_local.py | 15 ++-
tests/orchestrator/test_workflow_runner.py | 2 +-
.../orchestrator/workflows/core/test_engine.py | 21 ++--
.../orchestrator/workflows/core/test_events.py | 12 +-
.../test_task_graph_into_execution_graph.py | 48 +++-----
.../workflows/executor/test_executor.py | 39 +++---
19 files changed, 271 insertions(+), 272 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 4a5771a..c0b7f04 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -24,6 +24,7 @@ classes:
from contextlib import contextmanager
from datetime import datetime
+from networkx import DiGraph
from sqlalchemy import (
Column,
Integer,
@@ -309,7 +310,6 @@ class TaskBase(mixins.ModelMixin):
api_id = Column(String)
_executor = Column(PickleType)
- _executor_kwargs = Column(modeling_types.Dict)
_context_cls = Column(PickleType)
@declared_attr
@@ -400,14 +400,14 @@ class TaskBase(mixins.ModelMixin):
def retry(message=None, retry_interval=None):
raise TaskRetryException(message, retry_interval=retry_interval)
- # @declared_attr
- # def dependency_fk(cls):
- # """For Type one-to-many to Type"""
- # return relationship.foreign_key('task', nullable=True)
+ @declared_attr
+ def dependency_fk(self):
+ return relationship.foreign_key('task', nullable=True)
@declared_attr
- def dependent_tasks(cls):
- return relationship.many_to_many(cls, 'task', 'dependent', other_property='dependencies')
+ def dependencies(cls):
+ # symmetric relationship causes funky graphs
+ return relationship.one_to_many_self(cls, 'dependency_fk')
def has_ended(self):
if self.stub_type is not None:
@@ -422,7 +422,7 @@ class TaskBase(mixins.ModelMixin):
return self.status in (self.PENDING, self.RETRYING)
@classmethod
- def from_api_task(cls, api_task, executor, executor_kwargs=None, **kwargs):
+ def from_api_task(cls, api_task, executor, **kwargs):
from aria.orchestrator import context
instantiation_kwargs = {}
@@ -454,32 +454,12 @@ class TaskBase(mixins.ModelMixin):
'api_id': api_task.id,
'_context_cls': context_cls,
'_executor': executor,
- '_executor_kwargs': executor_kwargs or {}
})
instantiation_kwargs.update(**kwargs)
return cls(**instantiation_kwargs)
- def execute(self, ctx, executor_kwargs=None):
- from aria.orchestrator.context import operation
- context_cls = self._context_cls or operation.BaseOperationContext
- op_ctx = context_cls(
- model_storage=ctx.model,
- resource_storage=ctx.resource,
- workdir=ctx._workdir,
- task_id=self.id,
- actor_id=self.actor.id if self.actor else None,
- service_id=self.execution.service.id,
- execution_id=self.execution.id,
- name=self.name
- )
- executor = self._executor(**dict(self._executor_kwargs or {}, **(executor_kwargs or {})))
- try:
- return executor.execute(op_ctx)
- except BaseException:
- executor.close()
-
class LogBase(mixins.ModelMixin):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index af7220d..496739f 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -65,6 +65,9 @@ class BaseOperationContext(common.BaseContext):
self._thread_local.task = self.model.task.get(self._task_id)
return self._thread_local.task
+ def update_task(self):
+ self.model.task.update(self.task)
+
@property
def plugin_workdir(self):
"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 961796a..f09cb79 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -21,11 +21,15 @@ import os
import sys
from datetime import datetime
+from networkx import DiGraph
+
from . import exceptions
from .context.workflow import WorkflowContext
from .workflows import builtin
from .workflows.core.engine import Engine
from .workflows.executor.process import ProcessExecutor
+from .workflows.executor.base import StubTaskExecutor
+from .workflows.api import task
from ..modeling import models
from ..modeling import utils as modeling_utils
from ..utils.imports import import_fullname
@@ -39,7 +43,7 @@ class WorkflowRunner(object):
def __init__(self, workflow_name, service_id, inputs,
model_storage, resource_storage, plugin_manager,
- executor=None, executor_kwargs=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
+ executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
"""
Manages a single workflow execution on a given service.
@@ -80,20 +84,21 @@ class WorkflowRunner(object):
task_max_attempts=task_max_attempts,
task_retry_interval=task_retry_interval)
+ # Set default executor and kwargs
+ executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
+
# transforming the execution inputs to dict, to pass them to the workflow function
execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
+
self._tasks_graph = workflow_fn(ctx=workflow_context, **execution_inputs_dict)
+ construct_execution_tasks(self.execution, self._tasks_graph, executor.__class__)
- # Set default executor and kwargs
- if executor is None:
- executor = ProcessExecutor
- executor_kwargs = dict(plugin_manager=plugin_manager)
+ # Update the state
+ self._model_storage.execution.update(execution)
- self._engine = Engine(
- executor=executor,
- executor_kwargs=executor_kwargs,
- workflow_context=workflow_context,
- tasks_graph=self._tasks_graph)
+ self._engine = Engine(executor=executor,
+ workflow_context=workflow_context,
+ execution_graph=get_execution_graph(self.execution))
@property
def execution_id(self):
@@ -171,3 +176,101 @@ class WorkflowRunner(object):
self._workflow_name, workflow.function))
return workflow_fn
+
+
+def get_execution_graph(execution):
+ graph = DiGraph()
+ for task in execution.tasks:
+ for dependency in task.dependencies:
+ graph.add_edge(dependency, task)
+
+ return graph
+
+
+def construct_execution_tasks(execution,
+ task_graph,
+ default_executor,
+ stub_executor=StubTaskExecutor,
+ start_stub_type=models.Task.START_WORKFLOW,
+ end_stub_type=models.Task.END_WORKFLOW,
+ depends_on=()):
+ """
+ Translates the user graph to the execution graph
+ :param task_graph: The user's graph
+ :param start_stub_type: internal use
+ :param end_stub_type: internal use
+ :param depends_on: internal use
+ """
+ depends_on = list(depends_on)
+
+ # Insert start marker
+ start_task = models.Task(api_id=_start_graph_suffix(task_graph.id),
+ _executor=stub_executor,
+ execution=execution,
+ stub_type=start_stub_type,
+ dependencies=depends_on)
+
+ for api_task in task_graph.topological_order(reverse=True):
+ operation_dependencies = _get_tasks_from_dependencies(execution,
+ task_graph.get_dependencies(api_task), [start_task])
+
+ if isinstance(api_task, task.OperationTask):
+ models.Task.from_api_task(api_task=api_task,
+ executor=default_executor,
+ dependencies=operation_dependencies)
+
+ elif isinstance(api_task, task.WorkflowTask):
+ # Build the graph recursively while adding start and end markers
+ construct_execution_tasks(
+ execution=execution,
+ task_graph=api_task,
+ default_executor=default_executor,
+ stub_executor=stub_executor,
+ start_stub_type=models.Task.START_SUBWROFKLOW,
+ end_stub_type=models.Task.END_SUBWORKFLOW,
+ depends_on=operation_dependencies
+ )
+ elif isinstance(api_task, task.StubTask):
+ models.Task(api_id=api_task.id,
+ _executor=stub_executor,
+ execution=execution,
+ stub_type=models.Task.STUB,
+ dependencies=operation_dependencies)
+ else:
+ raise
+
+ # Insert end marker
+ models.Task(api_id=_end_graph_suffix(task_graph.id),
+ _executor=stub_executor,
+ execution=execution,
+ stub_type=end_stub_type,
+ dependencies=_get_non_dependent_tasks(execution) or [start_task])
+
+
+def _start_graph_suffix(api_id):
+ return '{0}-Start'.format(api_id)
+
+
+def _end_graph_suffix(api_id):
+ return '{0}-End'.format(api_id)
+
+
+def _get_non_dependent_tasks(execution):
+ dependency_tasks = set()
+ for task in execution.tasks:
+ dependency_tasks.update(task.dependencies)
+ return list(set(execution.tasks) - set(dependency_tasks))
+
+
+def _get_tasks_from_dependencies(execution, dependencies, default=()):
+ """
+ Returns task list from dependencies.
+ """
+ tasks = []
+ for dependency in dependencies:
+ if getattr(dependency, 'actor', False):
+ dependency_name = dependency.id
+ else:
+ dependency_name = _end_graph_suffix(dependency.id)
+ tasks.extend(task for task in execution.tasks if task.api_id == dependency_name)
+ return tasks or default
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/core/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/__init__.py b/aria/orchestrator/workflows/core/__init__.py
index 98938be..81db43f 100644
--- a/aria/orchestrator/workflows/core/__init__.py
+++ b/aria/orchestrator/workflows/core/__init__.py
@@ -17,4 +17,4 @@
Core for the workflow execution mechanism
"""
-from . import translation, engine
+from . import engine
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 67e8c1d..e1b6412 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -20,14 +20,12 @@ The workflow engine. Executes workflows
import time
from datetime import datetime
-import networkx
-
from aria import logger
from aria.modeling import models
from aria.orchestrator import events
+from aria.orchestrator.context import operation
from .. import exceptions
-from . import translation
# Import required so all signals are registered
from . import events_handler # pylint: disable=unused-import
@@ -37,18 +35,11 @@ class Engine(logger.LoggerMixin):
The workflow engine. Executes workflows
"""
- def __init__(self, executor, workflow_context, tasks_graph, executor_kwargs=None, **kwargs):
+ def __init__(self, executor, workflow_context, execution_graph, **kwargs):
super(Engine, self).__init__(**kwargs)
self._workflow_context = workflow_context
- self._execution_graph = networkx.DiGraph()
- self._executor_kwargs = executor_kwargs
- translation.store_tasks(task_graph=tasks_graph,
- execution_graph=self._execution_graph,
- default_executor=executor,
- execution=workflow_context.execution)
-
- # Flush changes
- workflow_context.model.execution.update(workflow_context.execution)
+ self._executors = {executor.__class__: executor}
+ self._execution_graph = execution_graph
def execute(self):
"""
@@ -83,38 +74,57 @@ class Engine(logger.LoggerMixin):
will be modified to 'cancelled' directly.
"""
events.on_cancelling_workflow_signal.send(self._workflow_context)
+ self._workflow_context.execution = self._workflow_context.execution
def _is_cancel(self):
- return self._workflow_context.execution.status in (models.Execution.CANCELLING,
- models.Execution.CANCELLED)
+ execution = self._workflow_context.model.execution.update(self._workflow_context.execution)
+ return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED)
def _executable_tasks(self):
now = datetime.utcnow()
- return (task for task in self._tasks_iter()
- if task.is_waiting() and
- task.due_at <= now and
- not self._task_has_dependencies(task))
+ return (
+ task for task in self._tasks_iter()
+ if task.is_waiting() and task.due_at <= now and not self._task_has_dependencies(task)
+ )
def _ended_tasks(self):
- return (task for task in self._tasks_iter() if task.has_ended())
+ return (task for task in self._tasks_iter()
+ if task.has_ended() and task in self._execution_graph)
def _task_has_dependencies(self, task):
- return len(self._execution_graph.pred.get(task.api_id, {})) > 0
+ return task.dependencies and all(d in self._execution_graph for d in task.dependencies)
def _all_tasks_consumed(self):
return len(self._execution_graph.node) == 0
def _tasks_iter(self):
- for _, data in self._execution_graph.nodes_iter(data=True):
- yield self._workflow_context.model.task.get(data['task'].id)
+ for task in self._workflow_context.execution.tasks:
+ yield self._workflow_context.model.task.refresh(task)
def _handle_executable_task(self, task):
if not task.stub_type:
events.sent_task_signal.send(task)
- task.execute(self._workflow_context, self._executor_kwargs)
+
+ if task._executor not in self._executors:
+ self._executors[task._executor] = task._executor()
+ executor = self._executors[task._executor]
+
+ context_cls = task._context_cls or operation.BaseOperationContext
+ op_ctx = context_cls(
+ model_storage=self._workflow_context.model,
+ resource_storage=self._workflow_context.resource,
+ workdir=self._workflow_context._workdir,
+ task_id=task.id,
+ actor_id=task.actor.id if task.actor else None,
+ service_id=task.execution.service.id,
+ execution_id=task.execution.id,
+ name=task.name
+ )
+
+ executor.execute(op_ctx)
def _handle_ended_tasks(self, task):
if task.status == models.Task.FAILED and not task.ignore_failure:
raise exceptions.ExecutorException('Workflow failed')
else:
- self._execution_graph.remove_node(task.api_id)
+ self._execution_graph.remove_node(task)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 5d979c4..8b217f5 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -47,7 +47,7 @@ def _task_failed(ctx, exception, *args, **kwargs):
not isinstance(exception, exceptions.TaskAbortException),
ctx.task.attempts_count < ctx.task.max_attempts or
ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
- # ignore_failure check here means the task will not be retries and it will be marked
+ # ignore_failure check here means the task will not be retried and it will be marked
# as failed. The engine will also look at ignore_failure so it won't fail the
# workflow.
not ctx.task.ignore_failure
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
deleted file mode 100644
index 8da9bd7..0000000
--- a/aria/orchestrator/workflows/core/translation.py
+++ /dev/null
@@ -1,112 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Translation of user graph's API to the execution graph
-"""
-
-from ....modeling import models
-from .. import api
-from ..executor import base
-
-
-def store_tasks(ctx, task_graph, default_executor, execution,
- start_stub_type=models.Task.START_WORKFLOW,
- end_stub_type=models.Task.END_WORKFLOW,
- depends_on=()):
- """
- Translates the user graph to the execution graph
- :param task_graph: The user's graph
- :param workflow_context: The workflow
- :param execution_graph: The execution graph that is being built
- :param start_stub_type: internal use
- :param end_stub_type: internal use
- :param depends_on: internal use
- """
- depends_on = list(depends_on)
-
- # Insert start marker
- start_task = models.Task(api_id=_start_graph_suffix(task_graph.id),
- _executor=base.StubTaskExecutor,
- execution=execution,
- stub_type=start_stub_type,
- dependencies=depends_on)
-
- for api_task in task_graph.topological_order(reverse=True):
- dependencies = task_graph.get_dependencies(api_task)
- operation_dependencies = _get_tasks_from_dependencies(ctx, dependencies, [start_task])
-
- if isinstance(api_task, api.task.OperationTask):
- models.Task.from_api_task(
- api_task=api_task, executor=default_executor, dependencies=operation_dependencies)
-
- elif isinstance(api_task, api.task.WorkflowTask):
- # Build the graph recursively while adding start and end markers
- store_tasks(
- ctx=ctx,
- task_graph=api_task,
- default_executor=default_executor,
- execution=execution,
- start_stub_type=models.Task.START_SUBWROFKLOW,
- end_stub_type=models.Task.END_SUBWORKFLOW,
- depends_on=operation_dependencies
- )
- elif isinstance(api_task, api.task.StubTask):
- models.Task(api_id=api_task.id,
- _executor=base.StubTaskExecutor,
- execution=execution,
- stub_type=models.Task.STUB,
- dependencies=operation_dependencies)
- else:
- raise RuntimeError('Undefined state')
-
- # Insert end marker
- workflow_dependencies = [task for task in ctx.model.task.list() if not task.dependent_tasks]
- models.Task(api_id=_end_graph_suffix(task_graph.id),
- _executor=base.StubTaskExecutor,
- execution=execution,
- stub_type=end_stub_type,
- dependencies=workflow_dependencies)
-
- ctx.model.execution.update(execution)
-
-def _start_graph_suffix(api_id):
- return '{0}-Start'.format(api_id)
-
-
-def _end_graph_suffix(api_id):
- return '{0}-End'.format(api_id)
-
-
-def construct_graph(graph, execution):
- for task in execution.tasks:
- for dependency in task.dependencies:
- graph.add_edge(dependency, task)
-
- return graph
-
-
-def _get_tasks_from_dependencies(ctx, dependencies, default=()):
- """
- Returns task list from dependencies.
- """
- tasks = []
- for dependency in dependencies:
- if isinstance(dependency, (api.task.OperationTask, api.task.StubTask)):
- dependency_name = dependency.id
- else:
- dependency_name = _end_graph_suffix(dependency.id)
- tasks.extend(list(ctx.model.task.list(filters={'name': dependency_name})))
- return tasks or default
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 54a9438..fc4b800 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -33,9 +33,9 @@ class BaseExecutor(logger.LoggerMixin):
Execute a task
:param task: task to execute
"""
+ ctx.update_task()
if ctx.task.function:
self._execute(ctx)
- ctx.model.task.update(ctx.task)
else:
# In this case the task is missing a function. This task still gets to an
# executor, but since there is nothing to run, we by default simply skip the execution
@@ -52,22 +52,17 @@ class BaseExecutor(logger.LoggerMixin):
@staticmethod
def _task_started(ctx):
events.start_task_signal.send(ctx)
- ctx.model.task.update(ctx.task)
+ ctx.update_task()
def _task_failed(self, ctx, exception, traceback=None):
events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback)
- ctx.model.task.update(ctx.task)
+ ctx.update_task()
def _task_succeeded(self, ctx):
events.on_success_task_signal.send(ctx)
- ctx.model.task.update(ctx.task)
+ ctx.update_task()
class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method
-
- def __init__(self, *args, **kwargs):
- # TODO: the executor kwargs delivery system is bad, so we need to aps the kwargs each time (and they are not persisted - this is bad!)
- super(StubTaskExecutor, self).__init__()
-
def execute(self, ctx, *args, **kwargs):
ctx.task.status = ctx.task.SUCCESS
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 074b54b..8c447b6 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -68,8 +68,6 @@ class ThreadExecutor(BaseExecutor):
self._task_failed(ctx,
exception=e,
traceback=exceptions.get_exception_as_string(*sys.exc_info()))
- finally:
- break
# Daemon threads
except BaseException as e:
pass
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/context/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py
index 89a7b2c..cb282a3 100644
--- a/tests/orchestrator/context/__init__.py
+++ b/tests/orchestrator/context/__init__.py
@@ -16,6 +16,7 @@
import sys
from aria.orchestrator.workflows.core import engine
+from aria.orchestrator import workflow_runner
def op_path(func, module_path=None):
@@ -23,8 +24,12 @@ def op_path(func, module_path=None):
return '{0}.{1}'.format(module_path, func.__name__)
-def execute(workflow_func, workflow_context, executor, executor_kwargs=None):
+def execute(workflow_func, workflow_context, executor):
graph = workflow_func(ctx=workflow_context)
- eng = engine.Engine(executor=executor, executor_kwargs=executor_kwargs,
- workflow_context=workflow_context, tasks_graph=graph)
+
+ workflow_runner.construct_execution_tasks(workflow_context.execution, graph, executor.__class__)
+ workflow_context.execution = workflow_context.execution
+ execution_graph = workflow_runner.get_execution_graph(workflow_context.execution)
+ eng = engine.Engine(executor, workflow_context, execution_graph)
+
eng.execute()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index e8f1a63..f654fe5 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -51,7 +51,11 @@ def ctx(tmpdir):
@pytest.fixture
def thread_executor():
- return thread.ThreadExecutor
+ result = thread.ThreadExecutor()
+ try:
+ yield result
+ finally:
+ result.close()
@pytest.fixture
@@ -253,7 +257,12 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
(process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
])
def executor(request):
- return request.param
+ ex_cls, kwargs = request.param
+ ex = ex_cls(**kwargs)
+ try:
+ yield ex
+ finally:
+ ex.close()
def test_node_operation_logging(ctx, executor):
@@ -286,8 +295,7 @@ def test_node_operation_logging(ctx, executor):
arguments=arguments
)
)
- execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor[0],
- executor_kwargs=executor[1])
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
_assert_loggins(ctx, arguments)
@@ -320,7 +328,7 @@ def test_relationship_operation_logging(ctx, executor):
)
)
- execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor[0], executor_kwargs=executor[1])
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
_assert_loggins(ctx, arguments)
@@ -377,7 +385,7 @@ def test_attribute_consumption(ctx, executor, dataholder):
)
)
- execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor[0], executor_kwargs=executor[1])
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
target_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
assert len(source_node.attributes) == len(target_node.attributes) == 2
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index f7e441e..4975d20 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -18,7 +18,7 @@ import pytest
from aria.orchestrator.workflows import api
from aria.orchestrator.workflows.core import engine
from aria.orchestrator.workflows.executor import process
-from aria.orchestrator import workflow, operation
+from aria.orchestrator import workflow, operation, workflow_runner
import tests
from tests import mock
from tests import storage
@@ -28,14 +28,16 @@ TEST_FILE_ENTRY_ID = 'entry'
TEST_FILE_NAME = 'test_file'
-def test_serialize_operation_context(context, executor, executor_kwargs, tmpdir):
+def test_serialize_operation_context(context, executor, tmpdir):
test_file = tmpdir.join(TEST_FILE_NAME)
test_file.write(TEST_FILE_CONTENT)
resource = context.resource
resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file))
graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph,
- executor_kwargs=executor_kwargs)
+ workflow_runner.construct_execution_tasks(context.execution, graph, executor.__class__)
+ context.execution = context.execution
+ execution_graph = workflow_runner.get_execution_graph(context.execution)
+ eng = engine.Engine(executor, context, execution_graph)
eng.execute()
@@ -83,8 +85,9 @@ def _operation_mapping():
@pytest.fixture
def executor():
- return process.ProcessExecutor
-
+ result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+ yield result
+ result.close()
@pytest.fixture
def executor_kwargs():
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/context/test_toolbelt.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py
index df54d9d..4de9e55 100644
--- a/tests/orchestrator/context/test_toolbelt.py
+++ b/tests/orchestrator/context/test_toolbelt.py
@@ -41,7 +41,12 @@ def workflow_context(tmpdir):
@pytest.fixture
def executor():
- return thread.ThreadExecutor
+ result = thread.ThreadExecutor()
+ try:
+ yield result
+ finally:
+ result.close()
+
@pytest.fixture
def dataholder(tmpdir):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 8e8c6e0..8414240 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -19,7 +19,7 @@ import os
import pytest
from aria import workflow
-from aria.orchestrator import events
+from aria.orchestrator import events, workflow_runner
from aria.orchestrator.workflows import api
from aria.orchestrator.workflows.exceptions import ExecutorException
from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException
@@ -498,17 +498,20 @@ if __name__ == '__main__':
arguments=arguments))
return graph
tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter
- eng = engine.Engine(
- executor=executor,
- workflow_context=workflow_context,
- tasks_graph=tasks_graph)
+ workflow_runner.construct_execution_tasks(
+ workflow_context.execution, tasks_graph, executor.__class__)
+ workflow_context.execution = workflow_context.execution
+ execution_graph = workflow_runner.get_execution_graph(workflow_context.execution)
+ eng = engine.Engine(executor, workflow_context, execution_graph)
eng.execute()
return workflow_context.model.node.get_by_name(
mock.models.DEPENDENCY_NODE_NAME).attributes
@pytest.fixture
def executor(self):
- return process.ProcessExecutor
+ result = process.ProcessExecutor()
+ yield result
+ result.close()
@pytest.fixture
def workflow_context(self, tmpdir):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index baf45d7..c5a62ae 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -99,7 +99,7 @@ def test_default_executor(request):
with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls:
_create_workflow_runner(request, mock_workflow)
_, engine_kwargs = mock_engine_cls.call_args
- assert engine_kwargs.get('executor') == ProcessExecutor
+ assert isinstance(engine_kwargs.get('executor'), ProcessExecutor)
def test_custom_executor(request):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index 7ffb92a..108360f 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -24,6 +24,7 @@ from aria.orchestrator import (
operation,
)
from aria.modeling import models
+from aria.orchestrator import workflow_runner
from aria.orchestrator.workflows import (
api,
exceptions,
@@ -40,21 +41,23 @@ global_test_holder = {}
class BaseTest(object):
@classmethod
- def _execute(cls, workflow_func, workflow_context, executor, executor_kwargs=None):
+ def _execute(cls, workflow_func, workflow_context, executor):
eng = cls._engine(workflow_func=workflow_func,
workflow_context=workflow_context,
- executor=executor,
- executor_kwargs=executor_kwargs)
+ executor=executor)
eng.execute()
return eng
@staticmethod
- def _engine(workflow_func, workflow_context, executor, executor_kwargs=None):
+ def _engine(workflow_func, workflow_context, executor):
graph = workflow_func(ctx=workflow_context)
+ execution = workflow_context.execution
+ workflow_runner.construct_execution_tasks(execution, graph, executor.__class__)
+ workflow_context.execution = execution
+
return engine.Engine(executor=executor,
- executor_kwargs=executor_kwargs,
workflow_context=workflow_context,
- tasks_graph=graph)
+ execution_graph=workflow_runner.get_execution_graph(execution))
@staticmethod
def _op(ctx,
@@ -128,7 +131,11 @@ class BaseTest(object):
@pytest.fixture
def executor(self):
- return thread.ThreadExecutor
+ result = thread.ThreadExecutor()
+ try:
+ yield result
+ finally:
+ result.close()
@pytest.fixture
def workflow_context(self, tmpdir):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/workflows/core/test_events.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py
index d63a8ef..92582a9 100644
--- a/tests/orchestrator/workflows/core/test_events.py
+++ b/tests/orchestrator/workflows/core/test_events.py
@@ -15,6 +15,7 @@
import pytest
+from aria.orchestrator import workflow_runner
from tests import mock, storage
from aria.modeling.service_instance import NodeBase
from aria.orchestrator.decorators import operation, workflow
@@ -112,13 +113,14 @@ def run_operation_on_node(ctx, op_name, interface_name):
operation_name=op_name,
operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func)))
node.interfaces[interface.name] = interface
+ workflow_runner.construct_execution_tasks(
+ ctx.execution, single_operation_workflow(
+ ctx=ctx, node=node, interface_name=interface_name, op_name=op_name), ThreadExecutor)
+ ctx.execution = ctx.execution
- eng = engine.Engine(executor=ThreadExecutor,
+ eng = engine.Engine(executor=ThreadExecutor(),
workflow_context=ctx,
- tasks_graph=single_operation_workflow(ctx=ctx, # pylint: disable=no-value-for-parameter
- node=node,
- interface_name=interface_name,
- op_name=op_name))
+ execution_graph=workflow_runner.get_execution_graph(ctx.execution))
eng.execute()
return node
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 4abed37..aebae38 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -13,13 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from networkx import topological_sort, DiGraph
+from networkx import topological_sort
from aria.modeling import models
-from aria.orchestrator import context
-from aria.orchestrator.workflows import api, core
+from aria.orchestrator import (
+ context,
+ workflow_runner
+)
+from aria.orchestrator.workflows import api
from aria.orchestrator.workflows.executor import base
-
from tests import mock
from tests import storage
@@ -66,12 +68,12 @@ def test_task_graph_into_execution_graph(tmpdir):
test_task_graph.add_dependency(simple_after_task, inner_task_graph)
# Direct check
- core.translation.store_tasks(ctx=task_context,
- task_graph=test_task_graph,
- execution=task_context.model.execution.list()[0],
- default_executor=base.StubTaskExecutor)
+ execution = task_context.model.execution.list()[0]
+
+ workflow_runner.construct_execution_tasks(execution, test_task_graph, base.StubTaskExecutor)
+ task_context.execution = execution
- execution_graph = core.translation.construct_graph(DiGraph(), task_context.execution)
+ execution_graph = workflow_runner.get_execution_graph(execution)
execution_tasks = topological_sort(execution_graph)
assert len(execution_tasks) == 7
@@ -87,27 +89,17 @@ def test_task_graph_into_execution_graph(tmpdir):
]
assert expected_tasks_names == [t.api_id for t in execution_tasks]
- assert all(isinstance(_get_task_by_name(task_name, execution_graph), models.Task)
- for task_name in execution_tasks)
-
- first_task = _get_task_by_name(execution_tasks[0], execution_graph)
- assert first_task.stub_type == models.Task.START_WORKFLOW
-
- second_task = _get_task_by_name(execution_tasks[1], execution_graph)
- _assert_execution_is_api_task(second_task, simple_before_task)
-
- third_task = _get_task_by_name(execution_tasks[2], execution_graph)
- assert third_task.stub_type == models.Task.START_SUBWROFKLOW
+ assert all(isinstance(task, models.Task) for task in execution_tasks)
+ execution_tasks = iter(execution_tasks)
- fourth_task = _get_task_by_name(execution_tasks[3], execution_graph)
- _assert_execution_is_api_task(fourth_task, inner_task)
- fifth_task = _get_task_by_name(execution_tasks[4], execution_graph)
- assert fifth_task.stub_type == models.Task.END_SUBWORKFLOW
+ assert next(execution_tasks).stub_type == models.Task.START_WORKFLOW
+ _assert_execution_is_api_task(next(execution_tasks), simple_before_task)
+ assert next(execution_tasks).stub_type == models.Task.START_SUBWROFKLOW
+ _assert_execution_is_api_task(next(execution_tasks), inner_task)
+ assert next(execution_tasks).stub_type == models.Task.END_SUBWORKFLOW
+ _assert_execution_is_api_task(next(execution_tasks), simple_after_task)
+ assert next(execution_tasks).stub_type == models.Task.END_WORKFLOW
- sixth_task = _get_task_by_name(execution_tasks[5], execution_graph)
- _assert_execution_is_api_task(sixth_task, simple_after_task)
- seventh_task = _get_task_by_name(execution_tasks[6], execution_graph)
- assert seventh_task.stub_type == models.Task.END_WORKFLOW
storage.release_sqlite_storage(task_context.model)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 3fa75ad..410a982 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -36,7 +36,6 @@ from aria.orchestrator.workflows.executor import (
)
import tests
-from . import MockContext
def _get_function(func):
@@ -45,18 +44,24 @@ def _get_function(func):
def execute_and_assert(executor, ctx):
node = ctx.model.node.list()[0]
+ execution = ctx.model.execution.list()[0]
expected_value = 'value'
- successful_ctx = ctx.model.task.model_cls(function=_get_function(mock_successful_task),
- node=node,
- _executor=executor)
- failing_ctx = ctx.model.task.model_cls(function=_get_function(mock_failing_task), node=node)
- ctx_with_inputs = ctx.model.task.model_cls(
+ successful_ctx = models.Task(function=_get_function(mock_successful_task),
+ node=node, _executor=executor, execution=execution)
+ failing_ctx = models.Task(
+ function=_get_function(mock_failing_task), node=node, _executor=executor, execution=execution)
+ ctx_with_inputs = models.Task(
node=node,
function=_get_function(mock_task_with_input),
- arguments={'input': models.Argument.wrap('input', 'value')})
+ arguments={'input': models.Argument.wrap('input', 'value')},
+ _executor=executor,
+ execution=execution)
- for task in [successful_ctx, failing_ctx, ctx_with_inputs]:
- task.execute(ctx)
+ ctx.model.execution.update(execution)
+
+ for op_ctx in [successful_ctx, failing_ctx, ctx_with_inputs]:
+ op_ctx.states = []
+ op_ctx.execute(ctx)
@retrying.retry(stop_max_delay=10000, wait_fixed=100)
def assertion():
@@ -101,22 +106,14 @@ class MockException(Exception):
@pytest.fixture
def ctx(tmpdir):
context = mock.context.simple(str(tmpdir))
+ ctx.states = []
yield context
storage.release_sqlite_storage(context.model)
-@pytest.fixture(params=[
- (thread.ThreadExecutor, {'pool_size': 1}),
- # (thread.ThreadExecutor, {'pool_size': 2}),
- # subprocess needs to load a tests module so we explicitly add the root directory as if
- # the project has been installed in editable mode
- # (celery.CeleryExecutor, {'app': app})
-])
-def thread_executor(request):
- executor_cls, executor_kwargs = request.param
- result = executor_cls(**executor_kwargs)
- yield result
- result.close()
+@pytest.fixture
+def thread_executor():
+ return thread.ThreadExecutor
@pytest.fixture