You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/11/19 08:09:59 UTC
incubator-ariatosca git commit: wip
Repository: incubator-ariatosca
Updated Branches:
refs/heads/extract_execution_creation_from_workflow_runner 730750f3d -> 9611f6147
wip
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/9611f614
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/9611f614
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/9611f614
Branch: refs/heads/extract_execution_creation_from_workflow_runner
Commit: 9611f61474bd5f51baa08f87a83bed24e15442ca
Parents: 730750f
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Nov 19 10:09:54 2017 +0200
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Sun Nov 19 10:09:54 2017 +0200
----------------------------------------------------------------------
aria/cli/commands/executions.py | 24 +++--
aria/orchestrator/workflow_runner.py | 152 +++++++++++++++++-------------
2 files changed, 102 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9611f614/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index cecbbc5..2ab3a33 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -26,7 +26,7 @@ from .. import logger as cli_logger
from .. import execution_logging
from ..core import aria
from ...modeling.models import Execution
-from ...orchestrator.workflow_runner import WorkflowRunner
+from ...orchestrator import workflow_runner
from ...orchestrator.workflows.executor.dry import DryExecutor
from ...utils import formatting
from ...utils import threading
@@ -143,15 +143,20 @@ def start(workflow_name,
service = model_storage.service.get_by_name(service_name)
executor = DryExecutor() if dry else None # use WorkflowRunner's default executor
- workflow_runner = \
- WorkflowRunner(
+ execution = workflow_runner.create_execution_model(
+ service, workflow_name, inputs
+ )
+ model_storage.execution.put(execution)
+
+ wf_runner = \
+ workflow_runner.WorkflowRunner(
model_storage, resource_storage, plugin_manager,
- service_id=service.id, workflow_name=workflow_name, inputs=inputs, executor=executor,
+ execution=execution, executor=executor,
task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
)
logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
- _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)
+ _run_execution(wf_runner, logger, model_storage, dry, mark_pattern)
@executions.command(name='resume',
@@ -188,14 +193,15 @@ def resume(execution_id,
.format(execution=execution, valid_status=execution.CANCELLED))
return
- workflow_runner = \
- WorkflowRunner(
+ wf_runner = \
+ workflow_runner.WorkflowRunner(
model_storage, resource_storage, plugin_manager,
- execution_id=execution_id, retry_failed_tasks=retry_failed_tasks, executor=executor,
+ execution=execution, executor=executor, resume=True,
+ retry_failed_tasks=retry_failed_tasks,
)
logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
- _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)
+ _run_execution(wf_runner, logger, model_storage, dry, mark_pattern)
def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9611f614/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 4dbf29b..aebde8e 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -37,11 +37,18 @@ DEFAULT_TASK_RETRY_INTERVAL = 30
class WorkflowRunner(object):
- def __init__(self, model_storage, resource_storage, plugin_manager,
- execution_id=None, retry_failed_tasks=False,
- service_id=None, workflow_name=None, inputs=None, executor=None,
- task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
- task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
+ def __init__(
+ self,
+ model_storage,
+ resource_storage,
+ plugin_manager,
+ execution,
+ executor=None,
+ resume=False,
+ retry_failed_tasks=False,
+ task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
+ task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL
+ ):
"""
Manages a single workflow execution on a given service.
@@ -57,37 +64,23 @@ class WorkflowRunner(object):
:param task_retry_interval: retry interval between retry attempts of a failing task
"""
- if not (execution_id or (workflow_name and service_id)):
- exceptions.InvalidWorkflowRunnerParams(
- "Either provide execution id in order to resume a workflow or workflow name "
- "and service id with inputs")
-
- self._is_resume = execution_id is not None
+ self._is_resume = resume
self._retry_failed_tasks = retry_failed_tasks
self._model_storage = model_storage
self._resource_storage = resource_storage
+ self._execution = execution
# the IDs are stored rather than the models themselves, so this module could be used
# by several threads without raising errors on model objects shared between threads
- if self._is_resume:
- self._execution_id = execution_id
- self._service_id = self.execution.service.id
- self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
- else:
- self._service_id = service_id
- self._workflow_name = workflow_name
- self._validate_workflow_exists_for_service()
- self._execution_id = self._create_execution_model(inputs).id
-
self._workflow_context = WorkflowContext(
name=self.__class__.__name__,
model_storage=self._model_storage,
resource_storage=resource_storage,
- service_id=service_id,
- execution_id=self._execution_id,
- workflow_name=self._workflow_name,
+ service_id=self.execution.service.id,
+ execution_id=self.execution.id,
+ workflow_name=self.execution.workflow_name,
task_max_attempts=task_max_attempts,
task_retry_interval=task_retry_interval)
@@ -107,7 +100,7 @@ class WorkflowRunner(object):
@property
def execution_id(self):
- return self._execution_id
+ return self.execution.id
@property
def execution(self):
@@ -115,7 +108,7 @@ class WorkflowRunner(object):
@property
def service(self):
- return self._model_storage.service.get(self._service_id)
+ return self._model_storage.service.get(self._execution.service.id)
def execute(self):
self._engine.execute(ctx=self._workflow_context,
@@ -125,49 +118,12 @@ class WorkflowRunner(object):
def cancel(self):
self._engine.cancel_execution(ctx=self._workflow_context)
- def _create_execution_model(self, inputs):
- execution = models.Execution(
- created_at=datetime.utcnow(),
- service=self.service,
- workflow_name=self._workflow_name,
- inputs={})
-
- if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
- workflow_inputs = dict() # built-in workflows don't have any inputs
- else:
- workflow_inputs = self.service.workflows[self._workflow_name].inputs
-
- modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
- supplied_inputs=inputs or {})
- modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
- supplied_inputs=inputs or {})
- execution.inputs = modeling_utils.merge_parameter_values(
- inputs, workflow_inputs, model_cls=models.Input)
- # TODO: these two following calls should execute atomically
- self._validate_no_active_executions(execution)
- self._model_storage.execution.put(execution)
- return execution
-
- def _validate_workflow_exists_for_service(self):
- if self._workflow_name not in self.service.workflows and \
- self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
- raise exceptions.UndeclaredWorkflowError(
- 'No workflow policy {0} declared in service {1}'
- .format(self._workflow_name, self.service.name))
-
- def _validate_no_active_executions(self, execution):
- active_executions = [e for e in self.service.executions if e.is_active()]
- if active_executions:
- raise exceptions.ActiveExecutionsError(
- "Can't start execution; Service {0} has an active execution with ID {1}"
- .format(self.service.name, active_executions[0].id))
-
def _get_workflow_fn(self):
if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
- self._workflow_name))
+ self.execution.workflow_name))
- workflow = self.service.workflows[self._workflow_name]
+ workflow = self.service.workflows[self.execution.workflow_name]
# TODO: Custom workflow support needs improvement, currently this code uses internal
# knowledge of the resource storage; Instead, workflows should probably be loaded
@@ -186,3 +142,69 @@ class WorkflowRunner(object):
self._workflow_name, workflow.function))
return workflow_fn
+
+
+def create_execution_model(service, workflow_name, inputs):
+ _validate_workflow_exists_for_service(service, workflow_name)
+ _validate_no_active_executions(service)
+ execution = models.Execution(
+ created_at=datetime.utcnow(),
+ service_fk=service.id,
+ workflow_name=workflow_name,
+ inputs={})
+
+ if workflow_name in builtin.BUILTIN_WORKFLOWS:
+ workflow_inputs = dict() # built-in workflows don't have any inputs
+ else:
+ workflow_inputs = service.workflows[workflow_name].inputs
+
+ modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
+ supplied_inputs=inputs or {})
+ modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
+ supplied_inputs=inputs or {})
+ execution.inputs = modeling_utils.merge_parameter_values(
+ inputs, workflow_inputs, model_cls=models.Input)
+
+ return execution
+
+
+
+def _create_execution_model(self, inputs):
+ execution = models.Execution(
+ created_at=datetime.utcnow(),
+ service=self.service,
+ workflow_name=self._workflow_name,
+ inputs={})
+
+ if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
+ workflow_inputs = dict() # built-in workflows don't have any inputs
+ else:
+ workflow_inputs = self.service.workflows[self._workflow_name].inputs
+
+ modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
+ supplied_inputs=inputs or {})
+ modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
+ supplied_inputs=inputs or {})
+ execution.inputs = modeling_utils.merge_parameter_values(
+ inputs, workflow_inputs, model_cls=models.Input)
+ # TODO: these two following calls should execute atomically
+ self._validate_no_active_executions(execution)
+ self._model_storage.execution.put(execution)
+ return execution
+
+
+
+def _validate_no_active_executions(service):
+ active_executions = [e for e in service.executions if e.is_active()]
+ if active_executions:
+ raise exceptions.ActiveExecutionsError(
+ "Can't start execution; Service {0} has an active execution with ID {1}"
+ .format(service.name, active_executions[0].id))
+
+
+def _validate_workflow_exists_for_service(service, workflow_name):
+ if workflow_name not in service.workflows and \
+ workflow_name not in builtin.BUILTIN_WORKFLOWS:
+ raise exceptions.UndeclaredWorkflowError(
+ 'No workflow policy {0} declared in service {1}'
+ .format(workflow_name, service.name))
\ No newline at end of file