You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/11/19 08:09:59 UTC

incubator-ariatosca git commit: wip

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/extract_execution_creation_from_workflow_runner 730750f3d -> 9611f6147


wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/9611f614
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/9611f614
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/9611f614

Branch: refs/heads/extract_execution_creation_from_workflow_runner
Commit: 9611f61474bd5f51baa08f87a83bed24e15442ca
Parents: 730750f
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Nov 19 10:09:54 2017 +0200
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Sun Nov 19 10:09:54 2017 +0200

----------------------------------------------------------------------
 aria/cli/commands/executions.py      |  24 +++--
 aria/orchestrator/workflow_runner.py | 152 +++++++++++++++++-------------
 2 files changed, 102 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9611f614/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index cecbbc5..2ab3a33 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -26,7 +26,7 @@ from .. import logger as cli_logger
 from .. import execution_logging
 from ..core import aria
 from ...modeling.models import Execution
-from ...orchestrator.workflow_runner import WorkflowRunner
+from ...orchestrator import workflow_runner
 from ...orchestrator.workflows.executor.dry import DryExecutor
 from ...utils import formatting
 from ...utils import threading
@@ -143,15 +143,20 @@ def start(workflow_name,
     service = model_storage.service.get_by_name(service_name)
     executor = DryExecutor() if dry else None  # use WorkflowRunner's default executor
 
-    workflow_runner = \
-        WorkflowRunner(
+    execution = workflow_runner.create_execution_model(
+        service, workflow_name, inputs
+    )
+    model_storage.execution.put(execution)
+
+    wf_runner = \
+        workflow_runner.WorkflowRunner(
             model_storage, resource_storage, plugin_manager,
-            service_id=service.id, workflow_name=workflow_name, inputs=inputs, executor=executor,
+            execution=execution, executor=executor,
             task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
         )
     logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
 
-    _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)
+    _run_execution(wf_runner, logger, model_storage, dry, mark_pattern)
 
 
 @executions.command(name='resume',
@@ -188,14 +193,15 @@ def resume(execution_id,
                     .format(execution=execution, valid_status=execution.CANCELLED))
         return
 
-    workflow_runner = \
-        WorkflowRunner(
+    wf_runner = \
+        workflow_runner.WorkflowRunner(
             model_storage, resource_storage, plugin_manager,
-            execution_id=execution_id, retry_failed_tasks=retry_failed_tasks, executor=executor,
+            execution=execution, executor=executor, resume=True,
+            retry_failed_tasks=retry_failed_tasks,
         )
 
     logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
-    _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)
+    _run_execution(wf_runner, logger, model_storage, dry, mark_pattern)
 
 
 def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9611f614/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 4dbf29b..aebde8e 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -37,11 +37,18 @@ DEFAULT_TASK_RETRY_INTERVAL = 30
 
 class WorkflowRunner(object):
 
-    def __init__(self, model_storage, resource_storage, plugin_manager,
-                 execution_id=None, retry_failed_tasks=False,
-                 service_id=None, workflow_name=None, inputs=None, executor=None,
-                 task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
-                 task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
+    def __init__(
+            self,
+            model_storage,
+            resource_storage,
+            plugin_manager,
+            execution,
+            executor=None,
+            resume=False,
+            retry_failed_tasks=False,
+            task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
+            task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL
+    ):
         """
         Manages a single workflow execution on a given service.
 
@@ -57,37 +64,23 @@ class WorkflowRunner(object):
         :param task_retry_interval: retry interval between retry attempts of a failing task
         """
 
-        if not (execution_id or (workflow_name and service_id)):
-            exceptions.InvalidWorkflowRunnerParams(
-                "Either provide execution id in order to resume a workflow or workflow name "
-                "and service id with inputs")
-
-        self._is_resume = execution_id is not None
+        self._is_resume = resume
         self._retry_failed_tasks = retry_failed_tasks
 
         self._model_storage = model_storage
         self._resource_storage = resource_storage
+        self._execution = execution
 
         # the IDs are stored rather than the models themselves, so this module could be used
         # by several threads without raising errors on model objects shared between threads
 
-        if self._is_resume:
-            self._execution_id = execution_id
-            self._service_id = self.execution.service.id
-            self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
-        else:
-            self._service_id = service_id
-            self._workflow_name = workflow_name
-            self._validate_workflow_exists_for_service()
-            self._execution_id = self._create_execution_model(inputs).id
-
         self._workflow_context = WorkflowContext(
             name=self.__class__.__name__,
             model_storage=self._model_storage,
             resource_storage=resource_storage,
-            service_id=service_id,
-            execution_id=self._execution_id,
-            workflow_name=self._workflow_name,
+            service_id=self.execution.service.id,
+            execution_id=self.execution.id,
+            workflow_name=self.execution.workflow_name,
             task_max_attempts=task_max_attempts,
             task_retry_interval=task_retry_interval)
 
@@ -107,7 +100,7 @@ class WorkflowRunner(object):
 
     @property
     def execution_id(self):
-        return self._execution_id
+        return self.execution.id
 
     @property
     def execution(self):
@@ -115,7 +108,7 @@ class WorkflowRunner(object):
 
     @property
     def service(self):
-        return self._model_storage.service.get(self._service_id)
+        return self._model_storage.service.get(self._execution.service.id)
 
     def execute(self):
         self._engine.execute(ctx=self._workflow_context,
@@ -125,49 +118,12 @@ class WorkflowRunner(object):
     def cancel(self):
         self._engine.cancel_execution(ctx=self._workflow_context)
 
-    def _create_execution_model(self, inputs):
-        execution = models.Execution(
-            created_at=datetime.utcnow(),
-            service=self.service,
-            workflow_name=self._workflow_name,
-            inputs={})
-
-        if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
-            workflow_inputs = dict()  # built-in workflows don't have any inputs
-        else:
-            workflow_inputs = self.service.workflows[self._workflow_name].inputs
-
-        modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
-                                                     supplied_inputs=inputs or {})
-        modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
-                                                             supplied_inputs=inputs or {})
-        execution.inputs = modeling_utils.merge_parameter_values(
-            inputs, workflow_inputs, model_cls=models.Input)
-        # TODO: these two following calls should execute atomically
-        self._validate_no_active_executions(execution)
-        self._model_storage.execution.put(execution)
-        return execution
-
-    def _validate_workflow_exists_for_service(self):
-        if self._workflow_name not in self.service.workflows and \
-                        self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
-            raise exceptions.UndeclaredWorkflowError(
-                'No workflow policy {0} declared in service {1}'
-                .format(self._workflow_name, self.service.name))
-
-    def _validate_no_active_executions(self, execution):
-        active_executions = [e for e in self.service.executions if e.is_active()]
-        if active_executions:
-            raise exceptions.ActiveExecutionsError(
-                "Can't start execution; Service {0} has an active execution with ID {1}"
-                .format(self.service.name, active_executions[0].id))
-
     def _get_workflow_fn(self):
         if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
             return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
-                                                    self._workflow_name))
+                                                    self.execution.workflow_name))
 
-        workflow = self.service.workflows[self._workflow_name]
+        workflow = self.service.workflows[self.execution.workflow_name]
 
         # TODO: Custom workflow support needs improvement, currently this code uses internal
         # knowledge of the resource storage; Instead, workflows should probably be loaded
@@ -186,3 +142,69 @@ class WorkflowRunner(object):
                     self._workflow_name, workflow.function))
 
         return workflow_fn
+
+
+def create_execution_model(service, workflow_name, inputs):
+    _validate_workflow_exists_for_service(service, workflow_name)
+    _validate_no_active_executions(service)
+    execution = models.Execution(
+        created_at=datetime.utcnow(),
+        service_fk=service.id,
+        workflow_name=workflow_name,
+        inputs={})
+
+    if workflow_name in builtin.BUILTIN_WORKFLOWS:
+        workflow_inputs = dict()  # built-in workflows don't have any inputs
+    else:
+        workflow_inputs = service.workflows[workflow_name].inputs
+
+    modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
+                                                 supplied_inputs=inputs or {})
+    modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
+                                                         supplied_inputs=inputs or {})
+    execution.inputs = modeling_utils.merge_parameter_values(
+        inputs, workflow_inputs, model_cls=models.Input)
+
+    return execution
+
+
+
+def _create_execution_model(self, inputs):
+    execution = models.Execution(
+        created_at=datetime.utcnow(),
+        service=self.service,
+        workflow_name=self._workflow_name,
+        inputs={})
+
+    if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
+        workflow_inputs = dict()  # built-in workflows don't have any inputs
+    else:
+        workflow_inputs = self.service.workflows[self._workflow_name].inputs
+
+    modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
+                                                 supplied_inputs=inputs or {})
+    modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
+                                                         supplied_inputs=inputs or {})
+    execution.inputs = modeling_utils.merge_parameter_values(
+        inputs, workflow_inputs, model_cls=models.Input)
+    # TODO: these two following calls should execute atomically
+    self._validate_no_active_executions(execution)
+    self._model_storage.execution.put(execution)
+    return execution
+
+
+
+def _validate_no_active_executions(service):
+    active_executions = [e for e in service.executions if e.is_active()]
+    if active_executions:
+        raise exceptions.ActiveExecutionsError(
+            "Can't start execution; Service {0} has an active execution with ID {1}"
+            .format(service.name, active_executions[0].id))
+
+
+def _validate_workflow_exists_for_service(service, workflow_name):
+    if workflow_name not in service.workflows and \
+                    workflow_name not in builtin.BUILTIN_WORKFLOWS:
+            raise exceptions.UndeclaredWorkflowError(
+                'No workflow policy {0} declared in service {1}'
+                .format(workflow_name, service.name))
\ No newline at end of file