You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mxmrlv <gi...@git.apache.org> on 2017/11/20 14:39:06 UTC
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
GitHub user mxmrlv opened a pull request:
https://github.com/apache/incubator-ariatosca/pull/208
ARIA-408 remove execution creation from workflow runner
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/apache/incubator-ariatosca ARIA-408-Remove-execution-creation-from-WorkflowRunner
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-ariatosca/pull/208.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #208
----
commit 9611f61474bd5f51baa08f87a83bed24e15442ca
Author: max-orlov <ma...@gigaspaces.com>
Date: 2017-11-19T08:09:54Z
wip
commit da233c730aee305806aaa83177b6b0dbd852c264
Author: max-orlov <ma...@gigaspaces.com>
Date: 2017-11-19T13:56:32Z
wip 2
commit 86c1c1ad9c77a31f970637025fae9eeba3b7a174
Author: max-orlov <ma...@gigaspaces.com>
Date: 2017-11-20T09:54:25Z
wip 3
commit 4cf88f3fa7281b36fdc0d2022a00c6bdf5699e81
Author: max-orlov <ma...@gigaspaces.com>
Date: 2017-11-20T14:08:17Z
final touches
----
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152233353
--- Diff: tests/orchestrator/execution/__init__.py ---
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
--- End diff --
What is this?
---
[GitHub] incubator-ariatosca issue #208: ARIA-408 remove execution creation from work...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:
https://github.com/apache/incubator-ariatosca/pull/208
Can one of the admins verify this patch?
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152252772
--- Diff: aria/orchestrator/execution_compiler.py ---
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from datetime import datetime
+
+from . import exceptions
+from .context.workflow import WorkflowContext
+from .workflows import builtin
+from .workflows.core import graph_compiler
+from .workflows.executor.process import ProcessExecutor
+from ..modeling import models
+from ..modeling import utils as modeling_utils
+from ..utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class ExecutionCompiler(object):
+ def __init__(
+ self,
+ model,
+ resource,
+ plugin,
+ service,
+ workflow_name,
+ task_max_attempts=None,
+ task_retry_interval=None
+ ):
+ self._model = model
+ self._resource = resource
+ self._plugin = plugin
+ self._service = service
+ self._workflow_name = workflow_name
+ self._workflow_context = None
+ self._execution = None
+ self._task_max_attempts = task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS
+ self._task_retry_interval = task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL
+
+ @property
+ def workflow_ctx(self):
+ if self._workflow_context is None:
+ self._workflow_context = WorkflowContext(
+ name=self.__class__.__name__,
--- End diff --
The current name does not make any sense.
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152273070
--- Diff: aria/orchestrator/execution_compiler.py ---
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from datetime import datetime
+
+from . import exceptions
+from .context.workflow import WorkflowContext
+from .workflows import builtin
+from .workflows.core import graph_compiler
+from .workflows.executor.process import ProcessExecutor
+from ..modeling import models
+from ..modeling import utils as modeling_utils
+from ..utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class ExecutionCompiler(object):
--- End diff --
So... Although it is not a great name as well, consider naming this `ExecutionPreparer` =(
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152312142
--- Diff: aria/orchestrator/execution_preparer.py ---
@@ -31,70 +31,79 @@
DEFAULT_TASK_RETRY_INTERVAL = 30
-class ExecutionCompiler(object):
+class ExecutionPreparer(object):
+ """
+ This class manages any execution and tasks related preparation for an execution of a workflow.
+ """
def __init__(
self,
- model,
- resource,
- plugin,
+ model_storage,
+ resource_storagee,
+ plugin_manager,
service,
workflow_name,
task_max_attempts=None,
task_retry_interval=None
):
- self._model = model
- self._resource = resource
- self._plugin = plugin
+ self._model = model_storage
+ self._resource = resource_storagee
+ self._plugin = plugin_manager
self._service = service
self._workflow_name = workflow_name
- self._workflow_context = None
- self._execution = None
self._task_max_attempts = task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS
self._task_retry_interval = task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL
- @property
- def workflow_ctx(self):
- if self._workflow_context is None:
- self._workflow_context = WorkflowContext(
- name=self.__class__.__name__,
- model_storage=self._model,
- resource_storage=self._resource,
- service_id=self._execution.service.id,
- execution_id=self._execution.id,
- workflow_name=self._execution.workflow_name,
- task_max_attempts=self._task_max_attempts,
- task_retry_interval=self._task_retry_interval,
- )
- return self._workflow_context
-
- def compile(self, execution_inputs=None, executor=None, execution_id=None):
+ def get_ctx(self, execution):
+ return WorkflowContext(
+ name=self._workflow_name,
+ model_storage=self._model,
+ resource_storage=self._resource,
+ service_id=execution.service.id,
+ execution_id=execution.id,
+ workflow_name=execution.workflow_name,
+ task_max_attempts=self._task_max_attempts,
+ task_retry_interval=self._task_retry_interval,
+ )
+
+ def prepare(self, execution_inputs=None, executor=None, execution_id=None):
+ """
+ Prepares the execution and return the workflow ctx. If the execution is new, an execution
--- End diff --
rephrase. The first sentence does not add anything anyway.
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152273125
--- Diff: aria/orchestrator/execution_compiler.py ---
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from datetime import datetime
+
+from . import exceptions
+from .context.workflow import WorkflowContext
+from .workflows import builtin
+from .workflows.core import graph_compiler
+from .workflows.executor.process import ProcessExecutor
+from ..modeling import models
+from ..modeling import utils as modeling_utils
+from ..utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class ExecutionCompiler(object):
+ def __init__(
+ self,
+ model,
+ resource,
+ plugin,
+ service,
+ workflow_name,
+ task_max_attempts=None,
+ task_retry_interval=None
+ ):
+ self._model = model
+ self._resource = resource
+ self._plugin = plugin
+ self._service = service
+ self._workflow_name = workflow_name
+ self._workflow_context = None
+ self._execution = None
+ self._task_max_attempts = task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS
+ self._task_retry_interval = task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL
+
+ @property
+ def workflow_ctx(self):
+ if self._workflow_context is None:
+ self._workflow_context = WorkflowContext(
+ name=self.__class__.__name__,
+ model_storage=self._model,
+ resource_storage=self._resource,
+ service_id=self._execution.service.id,
+ execution_id=self._execution.id,
+ workflow_name=self._execution.workflow_name,
+ task_max_attempts=self._task_max_attempts,
+ task_retry_interval=self._task_retry_interval,
+ )
+ return self._workflow_context
+
+ def compile(self, execution_inputs=None, executor=None, execution_id=None):
--- End diff --
prepare
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152257018
--- Diff: aria/orchestrator/execution_compiler.py ---
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from datetime import datetime
+
+from . import exceptions
+from .context.workflow import WorkflowContext
+from .workflows import builtin
+from .workflows.core import graph_compiler
+from .workflows.executor.process import ProcessExecutor
+from ..modeling import models
+from ..modeling import utils as modeling_utils
+from ..utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class ExecutionCompiler(object):
+ def __init__(
+ self,
+ model,
+ resource,
+ plugin,
+ service,
+ workflow_name,
+ task_max_attempts=None,
+ task_retry_interval=None
+ ):
+ self._model = model
+ self._resource = resource
+ self._plugin = plugin
+ self._service = service
+ self._workflow_name = workflow_name
+ self._workflow_context = None
+ self._execution = None
+ self._task_max_attempts = task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS
+ self._task_retry_interval = task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL
+
+ @property
+ def workflow_ctx(self):
+ if self._workflow_context is None:
+ self._workflow_context = WorkflowContext(
+ name=self.__class__.__name__,
+ model_storage=self._model,
+ resource_storage=self._resource,
+ service_id=self._execution.service.id,
+ execution_id=self._execution.id,
+ workflow_name=self._execution.workflow_name,
+ task_max_attempts=self._task_max_attempts,
+ task_retry_interval=self._task_retry_interval,
+ )
+ return self._workflow_context
+
+ def compile(self, execution_inputs=None, executor=None, execution_id=None):
+ assert not (execution_inputs and executor and execution_id)
+
+ if execution_id is None:
+ # If the execution is new
+ self._execution = self._create_execution_model(execution_inputs)
+ self._model.execution.put(self._execution)
+ self._create_tasks(executor)
+ self._model.execution.update(self._execution)
+ else:
+ # If resuming an execution
+ self._execution = self._model.execution.get(execution_id)
+
+ return self.workflow_ctx
+
+ def _create_tasks(self, executor=None):
+
+ # Set default executor and kwargs
+ executor = executor or ProcessExecutor(plugin_manager=self._plugin)
+
+ # transforming the execution inputs to dict, to pass them to the workflow function
+ execution_inputs_dict = dict(inp.unwrapped for inp in self._execution.inputs.itervalues())
+
+ if len(self._execution.tasks) == 0:
--- End diff --
This condition is not needed now, we already checked if we are in a workflow resuming situation.
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152252973
--- Diff: aria/orchestrator/execution_compiler.py ---
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from datetime import datetime
+
+from . import exceptions
+from .context.workflow import WorkflowContext
+from .workflows import builtin
+from .workflows.core import graph_compiler
+from .workflows.executor.process import ProcessExecutor
+from ..modeling import models
+from ..modeling import utils as modeling_utils
+from ..utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class ExecutionCompiler(object):
+ def __init__(
+ self,
+ model,
+ resource,
+ plugin,
+ service,
+ workflow_name,
+ task_max_attempts=None,
+ task_retry_interval=None
+ ):
+ self._model = model
+ self._resource = resource
+ self._plugin = plugin
+ self._service = service
+ self._workflow_name = workflow_name
+ self._workflow_context = None
+ self._execution = None
+ self._task_max_attempts = task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS
+ self._task_retry_interval = task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL
+
+ @property
+ def workflow_ctx(self):
+ if self._workflow_context is None:
+ self._workflow_context = WorkflowContext(
+ name=self.__class__.__name__,
+ model_storage=self._model,
+ resource_storage=self._resource,
+ service_id=self._execution.service.id,
+ execution_id=self._execution.id,
+ workflow_name=self._execution.workflow_name,
+ task_max_attempts=self._task_max_attempts,
+ task_retry_interval=self._task_retry_interval,
+ )
+ return self._workflow_context
+
+ def compile(self, execution_inputs=None, executor=None, execution_id=None):
--- End diff --
Add documentation
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152280899
--- Diff: tests/orchestrator/execution/test_execution_compiler.py ---
@@ -296,171 +230,161 @@ def _setup_mock_workflow_in_service(request, inputs=None):
return mock_workflow_name
-def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
- task_max_attempts=None, task_retry_interval=None):
+def _get_compiler(request, workflow_name):
# helper method for instantiating a workflow runner
- service_id = request.getfixturevalue('service').id
+ service = request.getfixturevalue('service')
model = request.getfixturevalue('model')
resource = request.getfixturevalue('resource')
plugin_manager = request.getfixturevalue('plugin_manager')
- # task configuration parameters can't be set to None, therefore only
- # passing those if they've been set by the test
- task_configuration_kwargs = dict()
- if task_max_attempts is not None:
- task_configuration_kwargs['task_max_attempts'] = task_max_attempts
- if task_retry_interval is not None:
- task_configuration_kwargs['task_retry_interval'] = task_retry_interval
-
- return WorkflowRunner(
- workflow_name=workflow_name,
- service_id=service_id,
- inputs=inputs or {},
- executor=executor,
- model_storage=model,
- resource_storage=resource,
- plugin_manager=plugin_manager,
- **task_configuration_kwargs)
+ return execution_compiler.ExecutionCompiler(
+ model,
+ resource,
+ plugin_manager,
+ service,
+ workflow_name
+ )
class TestResumableWorkflows(object):
- def _create_initial_workflow_runner(
- self, workflow_context, workflow, executor, inputs=None):
+ def _compile_execution(
--- End diff --
prepare_execution_(and_prepare_workflow_ctx)? Yes, that makes sense...
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152311655
--- Diff: aria/orchestrator/execution_preparer.py ---
@@ -31,70 +31,79 @@
DEFAULT_TASK_RETRY_INTERVAL = 30
-class ExecutionCompiler(object):
+class ExecutionPreparer(object):
+ """
+ This class manages any execution and tasks related preparation for an execution of a workflow.
+ """
def __init__(
self,
- model,
- resource,
- plugin,
+ model_storage,
+ resource_storagee,
+ plugin_manager,
service,
workflow_name,
task_max_attempts=None,
task_retry_interval=None
):
- self._model = model
- self._resource = resource
- self._plugin = plugin
+ self._model = model_storage
+ self._resource = resource_storagee
+ self._plugin = plugin_manager
self._service = service
self._workflow_name = workflow_name
- self._workflow_context = None
- self._execution = None
self._task_max_attempts = task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS
self._task_retry_interval = task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL
- @property
- def workflow_ctx(self):
- if self._workflow_context is None:
- self._workflow_context = WorkflowContext(
- name=self.__class__.__name__,
- model_storage=self._model,
- resource_storage=self._resource,
- service_id=self._execution.service.id,
- execution_id=self._execution.id,
- workflow_name=self._execution.workflow_name,
- task_max_attempts=self._task_max_attempts,
- task_retry_interval=self._task_retry_interval,
- )
- return self._workflow_context
-
- def compile(self, execution_inputs=None, executor=None, execution_id=None):
+ def get_ctx(self, execution):
--- End diff --
get_workflow_ctx
---
[GitHub] incubator-ariatosca issue #208: ARIA-408 remove execution creation from work...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on the issue:
https://github.com/apache/incubator-ariatosca/pull/208
Add a commit message explain why this change was done.
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152226255
--- Diff: aria/cli/commands/executions.py ---
@@ -141,17 +143,21 @@ def start(workflow_name,
WORKFLOW_NAME is the unique name of the workflow within the service (e.g. "uninstall").
"""
service = model_storage.service.get_by_name(service_name)
- executor = DryExecutor() if dry else None # use WorkflowRunner's default executor
-
- workflow_runner = \
- WorkflowRunner(
- model_storage, resource_storage, plugin_manager,
- service_id=service.id, workflow_name=workflow_name, inputs=inputs, executor=executor,
- task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
- )
+ executor = DryExecutor() if dry else ProcessExecutor(plugin_manager=plugin_manager)
+
+ compiler = execution_compiler.ExecutionCompiler(
--- End diff --
`execution_compiler_`
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152257452
--- Diff: aria/orchestrator/execution_compiler.py ---
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from datetime import datetime
+
+from . import exceptions
+from .context.workflow import WorkflowContext
+from .workflows import builtin
+from .workflows.core import graph_compiler
+from .workflows.executor.process import ProcessExecutor
+from ..modeling import models
+from ..modeling import utils as modeling_utils
+from ..utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class ExecutionCompiler(object):
+ def __init__(
+ self,
+ model,
+ resource,
+ plugin,
+ service,
+ workflow_name,
+ task_max_attempts=None,
+ task_retry_interval=None
+ ):
+ self._model = model
+ self._resource = resource
+ self._plugin = plugin
+ self._service = service
+ self._workflow_name = workflow_name
+ self._workflow_context = None
+ self._execution = None
+ self._task_max_attempts = task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS
+ self._task_retry_interval = task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL
+
+ @property
+ def workflow_ctx(self):
+ if self._workflow_context is None:
+ self._workflow_context = WorkflowContext(
+ name=self.__class__.__name__,
+ model_storage=self._model,
+ resource_storage=self._resource,
+ service_id=self._execution.service.id,
+ execution_id=self._execution.id,
+ workflow_name=self._execution.workflow_name,
+ task_max_attempts=self._task_max_attempts,
+ task_retry_interval=self._task_retry_interval,
+ )
+ return self._workflow_context
+
+ def compile(self, execution_inputs=None, executor=None, execution_id=None):
+ assert not (execution_inputs and executor and execution_id)
+
+ if execution_id is None:
+ # If the execution is new
+ self._execution = self._create_execution_model(execution_inputs)
+ self._model.execution.put(self._execution)
+ self._create_tasks(executor)
+ self._model.execution.update(self._execution)
+ else:
+ # If resuming an execution
+ self._execution = self._model.execution.get(execution_id)
+
+ return self.workflow_ctx
+
+ def _create_tasks(self, executor=None):
+
+ # Set default executor and kwargs
+ executor = executor or ProcessExecutor(plugin_manager=self._plugin)
+
+ # transforming the execution inputs to dict, to pass them to the workflow function
+ execution_inputs_dict = dict(inp.unwrapped for inp in self._execution.inputs.itervalues())
+
+ if len(self._execution.tasks) == 0:
+ workflow_fn = self._get_workflow_fn(self._execution.workflow_name)
+ tasks_graph = workflow_fn(ctx=self.workflow_ctx, **execution_inputs_dict)
--- End diff --
api_tasks_graph
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152273731
--- Diff: tests/orchestrator/execution/test_execution_compiler.py ---
@@ -78,24 +76,23 @@ def test_missing_workflow_implementation(service, request):
service.workflows['test_workflow'] = workflow
with pytest.raises(exceptions.WorkflowImplementationNotFoundError):
- _create_workflow_runner(request, 'test_workflow')
+ _get_compiler(request, 'test_workflow').compile()
-def test_builtin_workflow_instantiation(request):
+def test_builtin_workflow_instantiation(request, model):
--- End diff --
Model is not used
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152310821
--- Diff: aria/orchestrator/execution_preparer.py ---
@@ -31,70 +31,79 @@
DEFAULT_TASK_RETRY_INTERVAL = 30
-class ExecutionCompiler(object):
+class ExecutionPreparer(object):
+ """
+ This class manages any execution and tasks related preparation for an execution of a workflow.
+ """
def __init__(
self,
- model,
- resource,
- plugin,
+ model_storage,
+ resource_storagee,
--- End diff --
storage
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152273539
--- Diff: tests/orchestrator/execution/test_execution_compiler.py ---
@@ -66,7 +64,7 @@ class FailingTask(BaseException):
def test_undeclared_workflow(request):
# validating a proper error is raised when the workflow is not declared in the service
with pytest.raises(exceptions.UndeclaredWorkflowError):
- _create_workflow_runner(request, 'undeclared_workflow')
+ _get_compiler(request, 'undeclared_workflow').compile()
--- End diff --
compile -> prepare
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/incubator-ariatosca/pull/208
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152283978
--- Diff: tests/orchestrator/execution/test_execution_compiler.py ---
@@ -296,171 +230,161 @@ def _setup_mock_workflow_in_service(request, inputs=None):
return mock_workflow_name
-def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
- task_max_attempts=None, task_retry_interval=None):
+def _get_compiler(request, workflow_name):
# helper method for instantiating a workflow runner
- service_id = request.getfixturevalue('service').id
+ service = request.getfixturevalue('service')
model = request.getfixturevalue('model')
resource = request.getfixturevalue('resource')
plugin_manager = request.getfixturevalue('plugin_manager')
- # task configuration parameters can't be set to None, therefore only
- # passing those if they've been set by the test
- task_configuration_kwargs = dict()
- if task_max_attempts is not None:
- task_configuration_kwargs['task_max_attempts'] = task_max_attempts
- if task_retry_interval is not None:
- task_configuration_kwargs['task_retry_interval'] = task_retry_interval
-
- return WorkflowRunner(
- workflow_name=workflow_name,
- service_id=service_id,
- inputs=inputs or {},
- executor=executor,
- model_storage=model,
- resource_storage=resource,
- plugin_manager=plugin_manager,
- **task_configuration_kwargs)
+ return execution_compiler.ExecutionCompiler(
+ model,
+ resource,
+ plugin_manager,
+ service,
+ workflow_name
+ )
class TestResumableWorkflows(object):
- def _create_initial_workflow_runner(
- self, workflow_context, workflow, executor, inputs=None):
+ def _compile_execution(
+ self,
+ model,
+ resource,
+ service,
+ workflow,
+ executor,
+ inputs=None):
- service = workflow_context.service
service.workflows['custom_workflow'] = tests_mock.models.create_operation(
'custom_workflow',
operation_kwargs={
'function': '{0}.{1}'.format(__name__, workflow.__name__),
'inputs': dict((k, models.Input.wrap(k, v)) for k, v in (inputs or {}).items())
}
)
- workflow_context.model.service.update(service)
-
- wf_runner = WorkflowRunner(
- service_id=workflow_context.service.id,
- inputs=inputs or {},
- model_storage=workflow_context.model,
- resource_storage=workflow_context.resource,
- plugin_manager=None,
- workflow_name='custom_workflow',
- executor=executor)
- return wf_runner
+ model.service.update(service)
+ compiler = execution_compiler.ExecutionCompiler(
+ model, resource, None, service, 'custom_workflow'
+ )
+ ctx = compiler.compile(inputs, executor)
+ model.execution.update(ctx.execution)
+
+ return ctx
@staticmethod
- def _wait_for_active_and_cancel(workflow_runner):
+ def _wait_for_active_and_cancel(eng, ctx):
if custom_events['is_active'].wait(60) is False:
raise TimeoutError("is_active wasn't set to True")
- workflow_runner.cancel()
+ eng.cancel_execution(ctx)
if custom_events['execution_cancelled'].wait(60) is False:
raise TimeoutError("Execution did not end")
def test_resume_workflow(self, workflow_context, thread_executor):
node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
self._create_interface(workflow_context, node, mock_pass_first_task_only)
+ ctx = self._compile_execution(
+ workflow_context.model,
+ workflow_context.resource,
+ workflow_context.model.service.list()[0],
+ mock_parallel_tasks_workflow,
+ thread_executor,
+ inputs={'number_of_tasks': 2}
+ )
- wf_runner = self._create_initial_workflow_runner(
- workflow_context, mock_parallel_tasks_workflow, thread_executor,
- inputs={'number_of_tasks': 2})
+ eng = engine.Engine(thread_executor)
- wf_thread = Thread(target=wf_runner.execute)
+ wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx))
wf_thread.daemon = True
wf_thread.start()
# Wait for the execution to start
- self._wait_for_active_and_cancel(wf_runner)
- node = workflow_context.model.node.refresh(node)
+ self._wait_for_active_and_cancel(eng, ctx)
+ node = ctx.model.node.refresh(node)
- tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+ tasks = ctx.model.task.list(filters={'_stub_type': None})
assert any(task.status == task.SUCCESS for task in tasks)
assert any(task.status == task.RETRYING for task in tasks)
custom_events['is_resumed'].set()
assert any(task.status == task.RETRYING for task in tasks)
# Create a new workflow runner, with an existing execution id. This would cause
--- End diff --
This comment needs to be revised
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152233908
--- Diff: tests/orchestrator/execution_plugin/test_local.py ---
@@ -500,8 +500,9 @@ def mock_workflow(ctx, graph):
arguments=arguments))
return graph
tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter
- graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph)
- eng = engine.Engine({executor.__class__: executor})
+ graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(
--- End diff --
no need for newline
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152312803
--- Diff: aria/orchestrator/execution_preparer.py ---
@@ -31,70 +31,79 @@
DEFAULT_TASK_RETRY_INTERVAL = 30
-class ExecutionCompiler(object):
+class ExecutionPreparer(object):
+ """
+ This class manages any execution and tasks related preparation for an execution of a workflow.
+ """
def __init__(
self,
- model,
- resource,
- plugin,
+ model_storage,
+ resource_storagee,
+ plugin_manager,
service,
workflow_name,
task_max_attempts=None,
task_retry_interval=None
):
- self._model = model
- self._resource = resource
- self._plugin = plugin
+ self._model = model_storage
+ self._resource = resource_storagee
+ self._plugin = plugin_manager
self._service = service
self._workflow_name = workflow_name
- self._workflow_context = None
- self._execution = None
self._task_max_attempts = task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS
self._task_retry_interval = task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL
- @property
- def workflow_ctx(self):
- if self._workflow_context is None:
- self._workflow_context = WorkflowContext(
- name=self.__class__.__name__,
- model_storage=self._model,
- resource_storage=self._resource,
- service_id=self._execution.service.id,
- execution_id=self._execution.id,
- workflow_name=self._execution.workflow_name,
- task_max_attempts=self._task_max_attempts,
- task_retry_interval=self._task_retry_interval,
- )
- return self._workflow_context
-
- def compile(self, execution_inputs=None, executor=None, execution_id=None):
+ def get_ctx(self, execution):
+ return WorkflowContext(
+ name=self._workflow_name,
+ model_storage=self._model,
+ resource_storage=self._resource,
+ service_id=execution.service.id,
+ execution_id=execution.id,
+ workflow_name=execution.workflow_name,
+ task_max_attempts=self._task_max_attempts,
+ task_retry_interval=self._task_retry_interval,
+ )
+
+ def prepare(self, execution_inputs=None, executor=None, execution_id=None):
+ """
+ Prepares the execution and return the workflow ctx. If the execution is new, an execution
+ and tasks models would be created. A workflow context for the appropriate execution would
+ be created.
+
+ :param execution_inputs: Inputs for the execution.
--- End diff --
Inconsistency with capitalization
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152261088
--- Diff: aria/orchestrator/execution_compiler.py ---
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from datetime import datetime
+
+from . import exceptions
+from .context.workflow import WorkflowContext
+from .workflows import builtin
+from .workflows.core import graph_compiler
+from .workflows.executor.process import ProcessExecutor
+from ..modeling import models
+from ..modeling import utils as modeling_utils
+from ..utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class ExecutionCompiler(object):
+ def __init__(
+ self,
+ model,
+ resource,
+ plugin,
+ service,
+ workflow_name,
+ task_max_attempts=None,
+ task_retry_interval=None
+ ):
+ self._model = model
+ self._resource = resource
+ self._plugin = plugin
+ self._service = service
+ self._workflow_name = workflow_name
+ self._workflow_context = None
+ self._execution = None
+ self._task_max_attempts = task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS
+ self._task_retry_interval = task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL
+
+ @property
--- End diff --
Don't make it a property
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152252053
--- Diff: aria/orchestrator/execution_compiler.py ---
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from datetime import datetime
+
+from . import exceptions
+from .context.workflow import WorkflowContext
+from .workflows import builtin
+from .workflows.core import graph_compiler
+from .workflows.executor.process import ProcessExecutor
+from ..modeling import models
+from ..modeling import utils as modeling_utils
+from ..utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class ExecutionCompiler(object):
--- End diff --
Add some documentation explaining what this does
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152281650
--- Diff: tests/orchestrator/execution/test_execution_compiler.py ---
@@ -296,171 +230,161 @@ def _setup_mock_workflow_in_service(request, inputs=None):
return mock_workflow_name
-def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
- task_max_attempts=None, task_retry_interval=None):
+def _get_compiler(request, workflow_name):
# helper method for instantiating a workflow runner
- service_id = request.getfixturevalue('service').id
+ service = request.getfixturevalue('service')
model = request.getfixturevalue('model')
resource = request.getfixturevalue('resource')
plugin_manager = request.getfixturevalue('plugin_manager')
- # task configuration parameters can't be set to None, therefore only
- # passing those if they've been set by the test
- task_configuration_kwargs = dict()
- if task_max_attempts is not None:
- task_configuration_kwargs['task_max_attempts'] = task_max_attempts
- if task_retry_interval is not None:
- task_configuration_kwargs['task_retry_interval'] = task_retry_interval
-
- return WorkflowRunner(
- workflow_name=workflow_name,
- service_id=service_id,
- inputs=inputs or {},
- executor=executor,
- model_storage=model,
- resource_storage=resource,
- plugin_manager=plugin_manager,
- **task_configuration_kwargs)
+ return execution_compiler.ExecutionCompiler(
+ model,
+ resource,
+ plugin_manager,
+ service,
+ workflow_name
+ )
class TestResumableWorkflows(object):
- def _create_initial_workflow_runner(
- self, workflow_context, workflow, executor, inputs=None):
+ def _compile_execution(
+ self,
+ model,
+ resource,
+ service,
+ workflow,
+ executor,
+ inputs=None):
- service = workflow_context.service
service.workflows['custom_workflow'] = tests_mock.models.create_operation(
'custom_workflow',
operation_kwargs={
'function': '{0}.{1}'.format(__name__, workflow.__name__),
'inputs': dict((k, models.Input.wrap(k, v)) for k, v in (inputs or {}).items())
}
)
- workflow_context.model.service.update(service)
-
- wf_runner = WorkflowRunner(
- service_id=workflow_context.service.id,
- inputs=inputs or {},
- model_storage=workflow_context.model,
- resource_storage=workflow_context.resource,
- plugin_manager=None,
- workflow_name='custom_workflow',
- executor=executor)
- return wf_runner
+ model.service.update(service)
+ compiler = execution_compiler.ExecutionCompiler(
+ model, resource, None, service, 'custom_workflow'
+ )
+ ctx = compiler.compile(inputs, executor)
+ model.execution.update(ctx.execution)
+
+ return ctx
@staticmethod
- def _wait_for_active_and_cancel(workflow_runner):
+ def _wait_for_active_and_cancel(eng, ctx):
--- End diff --
`wait_for_execution_to_be_active_and_cancel_it` or `cancel_active_execution`.
The former is a lot better of course!
---
[GitHub] incubator-ariatosca pull request #208: ARIA-408 remove execution creation fr...
Posted by aviyoop <gi...@git.apache.org>.
Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152252420
--- Diff: aria/orchestrator/execution_compiler.py ---
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from datetime import datetime
+
+from . import exceptions
+from .context.workflow import WorkflowContext
+from .workflows import builtin
+from .workflows.core import graph_compiler
+from .workflows.executor.process import ProcessExecutor
+from ..modeling import models
+from ..modeling import utils as modeling_utils
+from ..utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class ExecutionCompiler(object):
+ def __init__(
+ self,
+ model,
--- End diff --
model_storage, resource_storage, plugin_manager...
---