You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by ra...@apache.org on 2017/04/04 10:21:57 UTC
[03/24] incubator-ariatosca git commit: created dry-run executor
created dry-run executor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/c5624260
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/c5624260
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/c5624260
Branch: refs/heads/ARIA-48-aria-cli
Commit: c56242604f4025e7e62d8304dba89bf2e80ff1e0
Parents: 108d8a6
Author: Ran Ziv <ra...@gigaspaces.com>
Authored: Tue Apr 4 12:02:11 2017 +0300
Committer: Ran Ziv <ra...@gigaspaces.com>
Committed: Tue Apr 4 13:20:46 2017 +0300
----------------------------------------------------------------------
aria/cli/cli/aria.py | 5 +++
aria/cli/cli/helptexts.py | 2 +
aria/cli/commands/executions.py | 15 +++++--
aria/orchestrator/workflow_runner.py | 5 ++-
aria/orchestrator/workflows/executor/dry.py | 54 ++++++++++++++++++++++++
5 files changed, 76 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c5624260/aria/cli/cli/aria.py
----------------------------------------------------------------------
diff --git a/aria/cli/cli/aria.py b/aria/cli/cli/aria.py
index baa72eb..1664ce5 100644
--- a/aria/cli/cli/aria.py
+++ b/aria/cli/cli/aria.py
@@ -304,6 +304,11 @@ class Options(object):
is_flag=True,
help=helptexts.JSON_OUTPUT)
+ self.dry_execution = click.option(
+ '--dry',
+ is_flag=True,
+ help=helptexts.DRY_EXECUTION)
+
self.init_hard_reset = click.option(
'--hard',
is_flag=True,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c5624260/aria/cli/cli/helptexts.py
----------------------------------------------------------------------
diff --git a/aria/cli/cli/helptexts.py b/aria/cli/cli/helptexts.py
index 02519cb..0d66d6b 100644
--- a/aria/cli/cli/helptexts.py
+++ b/aria/cli/cli/helptexts.py
@@ -32,6 +32,8 @@ HARD_RESET = "Hard reset the configuration, including coloring and loggers"
ENABLE_COLORS = "Enable colors in logger (use --hard when working with" \
" an initialized environment) [default: False]"
+DRY_EXECUTION = "Execute a workflow dry run (prints operations information without causing side " \
+ "effects)"
SERVICE_TEMPLATE_FILENAME = (
"The name of the archive's main service template file. "
"This is only relevant if uploading an archive")
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c5624260/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index 6d8b949..82ee51a 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -18,6 +18,7 @@ from ..table import print_data
from ..cli import aria
from ...modeling.models import Execution
from ...orchestrator.workflow_runner import WorkflowRunner
+from ...orchestrator.workflows.executor.dry import DryExecutor
from ...utils import formatting
from ...utils import threading
@@ -101,6 +102,7 @@ def list(service_name,
@aria.argument('workflow-name')
@aria.options.service_name(required=True)
@aria.options.inputs
+@aria.options.dry_execution
@aria.options.task_max_attempts()
@aria.options.task_retry_interval()
@aria.options.verbose()
@@ -111,6 +113,7 @@ def list(service_name,
def start(workflow_name,
service_name,
inputs,
+ dry,
task_max_attempts,
task_retry_interval,
model_storage,
@@ -119,19 +122,21 @@ def start(workflow_name,
logger):
"""Execute a workflow
- `WORKFLOW_ID` is the id of the workflow to execute (e.g. `uninstall`)
+ `WORKFLOW_NAME` is the name of the workflow to execute (e.g. `uninstall`)
"""
+ executor = DryExecutor() if dry else None # use WorkflowRunner's default executor
+
workflow_runner = \
WorkflowRunner(workflow_name, service_name, inputs,
model_storage, resource_storage, plugin_manager,
- task_max_attempts, task_retry_interval)
+ executor, task_max_attempts, task_retry_interval)
execution_thread_name = '{0}_{1}'.format(service_name, workflow_name)
execution_thread = threading.ExceptionThread(target=workflow_runner.execute,
name=execution_thread_name)
execution_thread.daemon = True # allows force-cancel to exit immediately
- logger.info('Starting execution. Press Ctrl+C cancel')
+ logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
execution_thread.start()
try:
while execution_thread.is_alive():
@@ -148,6 +153,10 @@ def start(workflow_name,
if execution.status == Execution.FAILED:
logger.info('Execution error:\n{0}'.format(execution.error))
+ if dry:
+ # remove traces of the dry execution (including tasks, logs, inputs..)
+ model_storage.execution.delete(execution)
+
def _cancel_execution(workflow_runner, execution_thread, logger):
logger.info('Cancelling execution. Press Ctrl+C again to force-cancel')
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c5624260/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 78b17b8..982dff1 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -40,7 +40,7 @@ class WorkflowRunner(object):
def __init__(self, workflow_name, service_name, inputs,
model_storage, resource_storage, plugin_manager,
- task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
+ executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
self._model_storage = model_storage
@@ -71,8 +71,9 @@ class WorkflowRunner(object):
execution_inputs_dict = models.Parameter.unwrap_dict(self.execution.inputs)
self._tasks_graph = workflow_fn(ctx=workflow_context, **execution_inputs_dict)
+ executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
self._engine = Engine(
- executor=ProcessExecutor(plugin_manager=plugin_manager),
+ executor=executor,
workflow_context=workflow_context,
tasks_graph=self._tasks_graph)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c5624260/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
new file mode 100644
index 0000000..69ce53c
--- /dev/null
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Dry executor
+"""
+
+from datetime import datetime
+
+from .base import BaseExecutor
+from ....modeling.models import Parameter
+
+
+class DryExecutor(BaseExecutor):
+ """
+ Executor which dry runs tasks - prints task information without causing any side effects
+ """
+
+ def execute(self, task):
+ # updating the task manually instead of calling self._task_started(task),
+ # to avoid any side effects raising that event might cause
+ with task._update():
+ task.started_at = datetime.utcnow()
+ task.status = task.STARTED
+
+ actor_type = type(task.actor).__name__.lower()
+ implementation = '{0} > '.format(task.plugin) if task.plugin else ''
+ implementation += task.implementation
+ inputs = Parameter.unwrap_dict(task.inputs)
+
+ self.logger.info(
+ 'Executing {actor_type} {actor_name} operation {interface_name} {operation_name}: '
+ '{implementation} (Inputs: {inputs})'
+ .format(actor_type=actor_type, actor_name=task.actor.name,
+ interface_name=task.interface_name, operation_name=task.operation_name,
+ implementation=implementation, inputs=inputs))
+
+ # updating the task manually instead of calling self._task_succeeded(task),
+ # to avoid any side effects raising that event might cause
+ with task._update():
+ task.ended_at = datetime.utcnow()
+ task.status = task.SUCCESS