You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/05/08 08:20:27 UTC
[3/4] incubator-ariatosca git commit: ARIA-214 Dry execution changes
the state of non implemented operations
ARIA-214 Dry execution changes the state of non implemented operations
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/0ec23707
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/0ec23707
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/0ec23707
Branch: refs/heads/ARIA-165-Make-node-name-suffix-UUIDs-become-more-readable
Commit: 0ec237071ebdeb28cd2feabbc1b51854543d398d
Parents: 3e1ed14
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun May 7 16:12:56 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Sun May 7 22:29:53 2017 +0300
----------------------------------------------------------------------
aria/orchestrator/workflows/core/task.py | 3 ---
aria/orchestrator/workflows/core/translation.py | 6 +----
aria/orchestrator/workflows/executor/base.py | 19 ++++++++------
aria/orchestrator/workflows/executor/celery.py | 2 +-
aria/orchestrator/workflows/executor/dry.py | 26 ++++++++++----------
aria/orchestrator/workflows/executor/process.py | 2 +-
aria/orchestrator/workflows/executor/thread.py | 2 +-
tests/orchestrator/workflows/core/test_task.py | 7 ++----
.../workflows/executor/test_process_executor.py | 18 +-------------
9 files changed, 31 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index 78159c4..b3dfb3c 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -163,9 +163,6 @@ class OperationTask(BaseTask):
self._task_id = task_model.id
self._update_fields = None
- def execute(self):
- super(OperationTask, self).execute()
-
@contextmanager
def _update(self):
"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
index 0bbce90..fec108b 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -48,11 +48,7 @@ def build_execution_graph(
execution_graph, dependencies, default=[start_task])
if isinstance(api_task, api.task.OperationTask):
- if api_task.implementation:
- operation_task = core_task.OperationTask(api_task, executor=default_executor)
- else:
- operation_task = core_task.OperationTask(api_task,
- executor=base.EmptyOperationExecutor())
+ operation_task = core_task.OperationTask(api_task, executor=default_executor)
_add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
elif isinstance(api_task, api.task.WorkflowTask):
# Build the graph recursively while adding start and end markers
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index a225837..c543278 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -25,13 +25,22 @@ class BaseExecutor(logger.LoggerMixin):
"""
Base class for executors for running tasks
"""
+ def _execute(self, task):
+ raise NotImplementedError
def execute(self, task):
"""
Execute a task
:param task: task to execute
"""
- raise NotImplementedError
+ if task.implementation:
+ self._execute(task)
+ else:
+ # In this case the task is missing an implementation. This task still gets to an
+ # executor, but since there is nothing to run, we by default simply skip the execution
+ # itself.
+ self._task_started(task)
+ self._task_succeeded(task)
def close(self):
"""
@@ -52,12 +61,6 @@ class BaseExecutor(logger.LoggerMixin):
events.on_success_task_signal.send(task)
-class StubTaskExecutor(BaseExecutor):
+class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method
def execute(self, task):
task.status = task.SUCCESS
-
-
-class EmptyOperationExecutor(BaseExecutor):
- def execute(self, task):
- events.start_task_signal.send(task)
- events.on_success_task_signal.send(task)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/aria/orchestrator/workflows/executor/celery.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/celery.py b/aria/orchestrator/workflows/executor/celery.py
index 7bd9b7c..bbddc25 100644
--- a/aria/orchestrator/workflows/executor/celery.py
+++ b/aria/orchestrator/workflows/executor/celery.py
@@ -42,7 +42,7 @@ class CeleryExecutor(BaseExecutor):
self._receiver_thread.start()
self._started_queue.get(timeout=30)
- def execute(self, task):
+ def _execute(self, task):
self._tasks[task.id] = task
inputs = dict(inp.unwrap() for inp in task.inputs.values())
inputs['ctx'] = task.context
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index eb70a41..f6fb7a6 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -21,11 +21,10 @@ from datetime import datetime
from .base import BaseExecutor
-class DryExecutor(BaseExecutor):
+class DryExecutor(BaseExecutor): # pylint: disable=abstract-method
"""
Executor which dry runs tasks - prints task information without causing any side effects
"""
-
def execute(self, task):
# updating the task manually instead of calling self._task_started(task),
# to avoid any side effects raising that event might cause
@@ -33,19 +32,20 @@ class DryExecutor(BaseExecutor):
task.started_at = datetime.utcnow()
task.status = task.STARTED
- if hasattr(task.actor, 'source_node'):
- name = '{source_node.name}->{target_node.name}'.format(
- source_node=task.actor.source_node, target_node=task.actor.target_node)
- else:
- name = task.actor.name
+ if task.implementation:
+ if hasattr(task.actor, 'source_node'):
+ name = '{source_node.name}->{target_node.name}'.format(
+ source_node=task.actor.source_node, target_node=task.actor.target_node)
+ else:
+ name = task.actor.name
- task.context.logger.info(
- '<dry> {name} {task.interface_name}.{task.operation_name} started...'
- .format(name=name, task=task))
+ task.context.logger.info(
+ '<dry> {name} {task.interface_name}.{task.operation_name} started...'
+ .format(name=name, task=task))
- task.context.logger.info(
- '<dry> {name} {task.interface_name}.{task.operation_name} successful'
- .format(name=name, task=task))
+ task.context.logger.info(
+ '<dry> {name} {task.interface_name}.{task.operation_name} successful'
+ .format(name=name, task=task))
# updating the task manually instead of calling self._task_succeeded(task),
# to avoid any side effects raising that event might cause
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index f3daf04..e464f7d 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -116,7 +116,7 @@ class ProcessExecutor(base.BaseExecutor):
self._server_socket.close()
self._listener_thread.join(timeout=60)
- def execute(self, task):
+ def _execute(self, task):
self._check_closed()
self._tasks[task.id] = task
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 836b2bf..f53362a 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -46,7 +46,7 @@ class ThreadExecutor(BaseExecutor):
thread.start()
self._pool.append(thread)
- def execute(self, task):
+ def _execute(self, task):
self._queue.put(task)
def close(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/tests/orchestrator/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py
index 748ee20..50ca7f5 100644
--- a/tests/orchestrator/workflows/core/test_task.py
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -24,7 +24,6 @@ from aria.orchestrator.workflows import (
api,
core,
exceptions,
- executor
)
from tests import mock, storage
@@ -71,8 +70,7 @@ class TestOperationTask(object):
node,
interface_name=NODE_INTERFACE_NAME,
operation_name=NODE_OPERATION_NAME)
- core_task = core.task.OperationTask(api_task=api_task,
- executor=executor.base.EmptyOperationExecutor())
+ core_task = core.task.OperationTask(api_task=api_task, executor=None)
return api_task, core_task
def _create_relationship_operation_task(self, ctx, relationship):
@@ -81,8 +79,7 @@ class TestOperationTask(object):
relationship,
interface_name=RELATIONSHIP_INTERFACE_NAME,
operation_name=RELATIONSHIP_OPERATION_NAME)
- core_task = core.task.OperationTask(api_task=api_task,
- executor=executor.base.EmptyOperationExecutor())
+ core_task = core.task.OperationTask(api_task=api_task, executor=None)
return api_task, core_task
def test_node_operation_task_creation(self, ctx):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index b353518..5f240b2 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
import os
import Queue
@@ -66,7 +65,7 @@ class TestProcessExecutor(object):
def test_closed(self, executor):
executor.close()
with pytest.raises(RuntimeError) as exc_info:
- executor.execute(task=None)
+ executor.execute(task=MockTask(implementation='some.implementation'))
assert 'closed' in exc_info.value.message
@@ -82,18 +81,3 @@ def mock_plugin(plugin_manager, tmpdir):
source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1')
plugin_path = create_plugin(source=source, destination_dir=str(tmpdir))
return plugin_manager.install(source=plugin_path)
-
-
-class MockContext(object):
-
- def __init__(self, *args, **kwargs):
- self.logger = logging.getLogger('mock_logger')
- self.task = type('SubprocessMockTask', (object, ), {'plugin': None})
- self.serialization_dict = {'context_cls': self.__class__, 'context': {}}
-
- def __getattr__(self, item):
- return None
-
- @classmethod
- def deserialize_from_dict(cls, **kwargs):
- return cls()