You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/14 14:33:31 UTC
[5/5] incubator-ariatosca git commit: deleted _task and fixed some
tests
deleted _task and fixed some tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/546e7af7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/546e7af7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/546e7af7
Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 546e7af7ae648c9e89a6b0c9d2ef565740a07136
Parents: 907ed6e
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed Jun 14 17:33:15 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed Jun 14 17:33:15 2017 +0300
----------------------------------------------------------------------
aria/orchestrator/workflows/core/_task.py | 267 -------------------
tests/orchestrator/execution_plugin/test_ssh.py | 11 +-
2 files changed, 6 insertions(+), 272 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/546e7af7/aria/orchestrator/workflows/core/_task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/_task.py b/aria/orchestrator/workflows/core/_task.py
deleted file mode 100644
index 399c177..0000000
--- a/aria/orchestrator/workflows/core/_task.py
+++ /dev/null
@@ -1,267 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Workflow tasks
-"""
-
-from contextlib import contextmanager
-from datetime import datetime
-from functools import (
- partial,
- wraps,
-)
-
-
-from ....modeling import models
-from ...context import operation as operation_context
-from .. import exceptions
-
-
-def _locked(func=None):
- if func is None:
- return partial(_locked, func=_locked)
-
- @wraps(func)
- def _wrapper(self, value, **kwargs):
- if self._update_fields is None:
- raise exceptions.TaskException('Task is not in update mode')
- return func(self, value, **kwargs)
- return _wrapper
-
-
-class BaseTask(object):
- """
- Base class for Task objects
- """
-
- def __init__(self, id, executor, *args, **kwargs):
- super(BaseTask, self).__init__(*args, **kwargs)
- self._id = id
- self._executor = executor
-
- def execute(self):
- return self._executor.execute(self)
-
- @property
- def id(self):
- """
- :return: the task's id
- """
- return self._id
-
-
-class StubTask(BaseTask):
- """
- Base stub task for marker user tasks that only mark the start/end of a workflow
- or sub-workflow
- """
- STARTED = models.Task.STARTED
- SUCCESS = models.Task.SUCCESS
-
- def __init__(self, *args, **kwargs):
- super(StubTask, self).__init__(*args, **kwargs)
- self.status = models.Task.PENDING
- self.due_at = datetime.utcnow()
-
-
-
-
-class StartWorkflowTask(StubTask):
- """
- Task marking a workflow start
- """
- pass
-
-
-class EndWorkflowTask(StubTask):
- """
- Task marking a workflow end
- """
- pass
-
-
-class StartSubWorkflowTask(StubTask):
- """
- Task marking a subworkflow start
- """
- pass
-
-
-class EndSubWorkflowTask(StubTask):
- """
- Task marking a subworkflow end
- """
- pass
-
-
-class OperationTask(BaseTask):
- """
- Operation task
- """
- def __init__(self, api_task, *args, **kwargs):
- # If no executor is provided, we infer that this is an empty task which does not need to be
- # executed.
- super(OperationTask, self).__init__(id=api_task.id, *args, **kwargs)
- self._workflow_context = api_task._workflow_context
- self.interface_name = api_task.interface_name
- self.operation_name = api_task.operation_name
- model_storage = api_task._workflow_context.model
-
- actor = getattr(api_task.actor, '_wrapped', api_task.actor)
-
- base_task_model = model_storage.task.model_cls
- if isinstance(actor, models.Node):
- context_cls = operation_context.NodeOperationContext
- create_task_model = base_task_model.for_node
- elif isinstance(actor, models.Relationship):
- context_cls = operation_context.RelationshipOperationContext
- create_task_model = base_task_model.for_relationship
- else:
- raise RuntimeError('No operation context could be created for {actor.model_cls}'
- .format(actor=actor))
-
- task_model = create_task_model(
- name=api_task.name,
- actor=actor,
- status=base_task_model.PENDING,
- max_attempts=api_task.max_attempts,
- retry_interval=api_task.retry_interval,
- ignore_failure=api_task.ignore_failure,
- execution=self._workflow_context.execution,
-
- # Only non-stub tasks have these fields
- plugin=api_task.plugin,
- function=api_task.function,
- arguments=api_task.arguments
- )
- self._workflow_context.model.task.put(task_model)
-
- self._ctx = context_cls(name=api_task.name,
- model_storage=self._workflow_context.model,
- resource_storage=self._workflow_context.resource,
- service_id=self._workflow_context._service_id,
- task_id=task_model.id,
- actor_id=actor.id,
- execution_id=self._workflow_context._execution_id,
- workdir=self._workflow_context._workdir)
- self._task_id = task_model.id
- self._update_fields = None
-
- @contextmanager
- def _update(self):
- """
- A context manager which puts the task into update mode, enabling fields update.
- :yields: None
- """
- self._update_fields = {}
- try:
- yield
- for key, value in self._update_fields.items():
- setattr(self.model_task, key, value)
- self.model_task = self.model_task
- finally:
- self._update_fields = None
-
- @property
- def model_task(self):
- """
- Returns the task model in storage
- :return: task in storage
- """
- return self._workflow_context.model.task.get(self._task_id)
-
- @model_task.setter
- def model_task(self, value):
- self._workflow_context.model.task.put(value)
-
- @property
- def context(self):
- """
- Contexts for the operation
- :return:
- """
- return self._ctx
-
- @property
- def status(self):
- """
- Returns the task status
- :return: task status
- """
- return self.model_task.status
-
- @status.setter
- @_locked
- def status(self, value):
- self._update_fields['status'] = value
-
- @property
- def started_at(self):
- """
- Returns when the task started
- :return: when task started
- """
- return self.model_task.started_at
-
- @started_at.setter
- @_locked
- def started_at(self, value):
- self._update_fields['started_at'] = value
-
- @property
- def ended_at(self):
- """
- Returns when the task ended
- :return: when task ended
- """
- return self.model_task.ended_at
-
- @ended_at.setter
- @_locked
- def ended_at(self, value):
- self._update_fields['ended_at'] = value
-
- @property
- def attempts_count(self):
- """
- Returns the attempts count for the task
- :return: attempts count
- """
- return self.model_task.attempts_count
-
- @attempts_count.setter
- @_locked
- def attempts_count(self, value):
- self._update_fields['attempts_count'] = value
-
- @property
- def due_at(self):
- """
- Returns the minimum datetime in which the task can be executed
- :return: eta
- """
- return self.model_task.due_at
-
- @due_at.setter
- @_locked
- def due_at(self, value):
- self._update_fields['due_at'] = value
-
- def __getattr__(self, attr):
- try:
- return getattr(self.model_task, attr)
- except AttributeError:
- return super(OperationTask, self).__getattribute__(attr)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/546e7af7/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index 8c4dd2d..e3ba2c4 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -25,7 +25,7 @@ from fabric.contrib import files
from fabric import context_managers
from aria.modeling import models
-from aria.orchestrator import events
+from aria.orchestrator import events, workflow_runner
from aria.orchestrator import workflow
from aria.orchestrator.workflows import api
from aria.orchestrator.workflows.executor import process
@@ -254,10 +254,11 @@ class TestWithActualSSHServer(object):
graph.sequence(*ops)
return graph
tasks_graph = mock_workflow(ctx=self._workflow_context) # pylint: disable=no-value-for-parameter
- eng = engine.Engine(
- executor=self._executor,
- workflow_context=self._workflow_context,
- tasks_graph=tasks_graph)
+ workflow_runner.construct_execution_tasks(
+ self._workflow_context.execution, tasks_graph, self._executor.__class__)
+ self._workflow_context.execution = self._workflow_context.execution
+ execution_graph = workflow_runner.get_execution_graph(self._workflow_context.execution)
+ eng = engine.Engine(self._executor, self._workflow_context, execution_graph)
eng.execute()
return self._workflow_context.model.node.get_by_name(
mock.models.DEPENDENCY_NODE_NAME).attributes