You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/22 11:29:03 UTC
[1/4] incubator-ariatosca git commit: ARIA-282 Make SSH capability
opt-in [Forced Update!]
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution a86ba295e -> 827230da2 (forced update)
ARIA-282 Make SSH capability opt-in
Since the Fabric library uses Paramiko, which is a library
using a license which is incompatible with Apache's,
ARIA's SSH capabilities are now opt-in and no longer part
of the default installation.
Instead, users who would like to use SSH operations
should install ARIA's extra "[ssh]", which would install
Fabric and allow to take advantage of the execution-plugin's
SSH capabilities.
Users who won't install this extra will still be able to use
ARIA as well as the execution plugin, only without SSH.
Additional changes:
- A new tox environment has been created for running
SSH tests. The remaining envs only install plain ARIA.
- requirements.in commented lines were removed -
the bug that used to exist regarding environment markers
has been fixed, and there's no longer the need
to copy these manually to requirements.txt.
- Environment-marked dependencies are now installed
via "install_requires" rather than "extra_requires".
- Added requirements.in to the manifest file,
as well as fixed a bug in setup.py, which caused
source distribution to make aria get installed
without any dependencies before this fix.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/105971f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/105971f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/105971f8
Branch: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution
Commit: 105971f8ebc81de5ce5a98ce11a1d8580e671c21
Parents: 1fee85c
Author: Ran Ziv <ra...@gigaspaces.com>
Authored: Wed Jun 21 15:39:34 2017 +0300
Committer: Ran Ziv <ra...@gigaspaces.com>
Committed: Wed Jun 21 17:37:57 2017 +0300
----------------------------------------------------------------------
.travis.yml | 2 ++
MANIFEST.in | 1 +
Makefile | 5 +--
.../orchestrator/execution_plugin/operations.py | 13 ++++++--
requirements.in | 16 ++--------
requirements.txt | 32 ++++---------------
setup.py | 33 +++++++++-----------
tox.ini | 18 ++++++++---
8 files changed, 53 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/105971f8/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index b11ed62..de02d78 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -21,6 +21,8 @@ env:
- TOX_ENV=py26
- TOX_ENV=py27e2e
- TOX_ENV=py26e2e
+- TOX_ENV=py27ssh
+- TOX_ENV=py26ssh
install:
- pip install --upgrade pip
- pip install --upgrade setuptools
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/105971f8/MANIFEST.in
----------------------------------------------------------------------
diff --git a/MANIFEST.in b/MANIFEST.in
index 877a7dd..020b00e 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -4,6 +4,7 @@ include NOTICE
include VERSION
include CHANGELOG.rst
include README.rst
+include requirements.in
include requirements.txt
recursive-include docs/html *
recursive-include examples *
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/105971f8/Makefile
----------------------------------------------------------------------
diff --git a/Makefile b/Makefile
index cb4b58f..f5f2e66 100644
--- a/Makefile
+++ b/Makefile
@@ -33,10 +33,10 @@ clean:
-find . -type d -name '*.egg-info' -exec rm -rf {} \; 2>/dev/null
install:
- pip install .
+ pip install .[ssh]
install-virtual:
- pip install --editable .
+ pip install --editable .[ssh]
# "pip install --editable" will not add our extensions to the path, so we will patch the virtualenv
EXTENSIONS_PATH="$$(head -n 1 "$(EASY_INSTALL_PTH)")/extensions" && \
@@ -55,6 +55,7 @@ test:
tox -e pylint_tests
tox -e py$(PYTHON_VERSION)
tox -e py$(PYTHON_VERSION)e2e
+ tox -e py$(PYTHON_VERSION)ssh
dist: docs
python ./setup.py sdist bdist_wheel
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/105971f8/aria/orchestrator/execution_plugin/operations.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/operations.py b/aria/orchestrator/execution_plugin/operations.py
index 5effa8a..0bc8083 100644
--- a/aria/orchestrator/execution_plugin/operations.py
+++ b/aria/orchestrator/execution_plugin/operations.py
@@ -15,7 +15,6 @@
from aria.orchestrator import operation
from . import local as local_operations
-from .ssh import operations as ssh_operations
@operation
@@ -38,7 +37,7 @@ def run_script_with_ssh(ctx,
use_sudo=False,
hide_output=None,
**kwargs):
- return ssh_operations.run_script(
+ return _try_import_ssh().run_script(
ctx=ctx,
script_path=script_path,
fabric_env=fabric_env,
@@ -55,9 +54,17 @@ def run_commands_with_ssh(ctx,
use_sudo=False,
hide_output=None,
**_):
- return ssh_operations.run_commands(
+ return _try_import_ssh().run_commands(
ctx=ctx,
commands=commands,
fabric_env=fabric_env,
use_sudo=use_sudo,
hide_output=hide_output)
+
+
+def _try_import_ssh():
+ try:
+ from .ssh import operations as ssh_operations
+ return ssh_operations
+ except Exception:
+ raise RuntimeError('Failed to import SSH modules; Have you installed the ARIA SSH extra?')
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/105971f8/requirements.in
----------------------------------------------------------------------
diff --git a/requirements.in b/requirements.in
index d205c7a..cecc9fd 100644
--- a/requirements.in
+++ b/requirements.in
@@ -26,7 +26,6 @@ clint>=0.5.0, <0.6
SQLAlchemy>=1.1.0, <1.2 # version 1.2 dropped support of python 2.6
wagon==0.6.0
bottle>=0.12.0, <0.13
-Fabric>=1.13.0, <1.14
setuptools>=35.0.0, <36.0.0
click>=6.0, < 7.0
colorama>=0.3.7, <=0.3.9
@@ -34,15 +33,6 @@ PrettyTable>=0.7,<0.8
click_didyoumean==0.0.3
backports.shutil_get_terminal_size==1.0.0
logutils==0.3.4.1
-
-# Since the tool we are using to generate our requirements.txt, `pip-tools`,
-# does not currently support conditional dependencies (;), we're adding our original
-# conditional dependencies here as comments, and manually adding them to our
-# generated requirements.txt file.
-# The relevant pip-tools issue: https://github.com/jazzband/pip-tools/issues/435
-
-# importlib ; python_version < '2.7'
-# ordereddict ; python_version < '2.7'
-# total-ordering ; python_version < '2.7' # only one version on pypi
-# Fabric makes use of this library, but doesn't bring it :(
-# pypiwin32==219 ; sys_platform == 'win32'
+importlib ; python_version < '2.7'
+ordereddict ; python_version < '2.7'
+total-ordering ; python_version < '2.7' # only one version on pypi
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/105971f8/requirements.txt
----------------------------------------------------------------------
diff --git a/requirements.txt b/requirements.txt
index 6cf2ade..9f929a9 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,39 +2,20 @@
# This file is autogenerated by pip-compile
# To update, run:
#
-# pip-compile --output-file ./requirements.txt ./requirements.in
+# pip-compile --output-file requirements.txt requirements.in
#
-# Since the tool we are using to generate our requirements.txt, `pip-tools`,
-# does not currently support conditional dependencies (;), we're adding our original
-# conditional dependencies here as comments, and manually adding them to our
-# generated requirements.txt file.
-# The relevant pip-tools issue: https://github.com/jazzband/pip-tools/issues/435
-
-importlib ; python_version < '2.7'
-ordereddict ; python_version < '2.7'
-total-ordering ; python_version < '2.7' # only one version on pypi
-# Fabric makes use of this library, but doesn't bring it :(
-pypiwin32==219 ; sys_platform == 'win32'
-# ----------------------------------------------------------------------------------
-
appdirs==1.4.3 # via setuptools
args==0.1.0 # via clint
-asn1crypto==0.22.0 # via cryptography
backports.shutil_get_terminal_size==1.0.0
blinker==1.4
bottle==0.12.13
cachecontrol[filecache]==0.12.1
-cffi==1.10.0 # via cryptography
click==6.7
click_didyoumean==0.0.3
clint==0.5.1
colorama==0.3.9
-cryptography==1.8.1 # via paramiko
decorator==4.0.11 # via networkx
-enum34==1.1.6 # via cryptography
-fabric==1.13.1
-idna==2.5 # via cryptography
-ipaddress==1.0.18 # via cryptography
+importlib==1.0.4 ; python_version < "2.7"
jinja2==2.8.1
jsonpickle==0.9.4
lockfile==0.12.2 # via cachecontrol
@@ -42,19 +23,18 @@ logutils==0.3.4.1
markupsafe==1.0 # via jinja2
msgpack-python==0.4.8 # via cachecontrol
networkx==1.9.1
-packaging==16.8 # via cryptography, setuptools
-paramiko==2.1.2 # via fabric
+ordereddict==1.1 ; python_version < "2.7"
+packaging==16.8 # via setuptools
prettytable==0.7.2
-pyasn1==0.2.3 # via paramiko
-pycparser==2.17 # via cffi
pyparsing==2.2.0 # via packaging
requests==2.13.0
retrying==1.3.3
ruamel.ordereddict==0.4.9 # via ruamel.yaml
ruamel.yaml==0.11.15
shortuuid==0.5.0
-six==1.10.0 # via cryptography, packaging, retrying, setuptools
+six==1.10.0 # via packaging, retrying, setuptools
sqlalchemy==1.1.6
+total-ordering==0.1.0 ; python_version < "2.7"
wagon==0.6.0
wheel==0.29.0 # via wagon
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/105971f8/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 8d5f463..d2a914c 100644
--- a/setup.py
+++ b/setup.py
@@ -43,26 +43,21 @@ with open(os.path.join(root_dir, 'README.rst')) as readme:
long_description = readme.read()
install_requires = []
-extras_require = {}
-
-# We need to parse the requirements for the conditional dependencies to work for wheels and
-# standard installation
-try:
- with open(os.path.join(root_dir, 'requirements.in')) as requirements:
- for requirement in requirements.readlines():
- install_requires.append(requirement.strip())
- # We are using the install_requires mechanism in order to specify
- # conditional dependencies since reading them from a file in their
- # standard ';' from does silently nothing.
- extras_require = {":python_version<'2.7'": ['importlib',
- 'ordereddict',
- 'total-ordering',
- ],
- ":sys_platform=='win32'": 'pypiwin32'}
-except IOError:
- install_requires = []
- extras_require = {}
+# We need to parse the requirements for the conditional dependencies to work for wheels creation
+# as well as source dist installation
+with open(os.path.join(root_dir, 'requirements.in')) as requirements:
+ for requirement in requirements.readlines():
+ install_requires.append(requirement.strip())
+
+ssh_requires = [
+ 'Fabric>=1.13.0, <1.14',
+ "pypiwin32==219 ; sys_platform == 'win32'"
+]
+
+extras_require = {
+ 'ssh': ssh_requires
+}
console_scripts = ['aria = aria.cli.main:main']
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/105971f8/tox.ini
----------------------------------------------------------------------
diff --git a/tox.ini b/tox.ini
index 58e62c3..3e1fb3c 100644
--- a/tox.ini
+++ b/tox.ini
@@ -11,7 +11,7 @@
# limitations under the License.
[tox]
-envlist=py27,py26,py27e2e,py26e2e,pywin,pylint_code,pylint_tests
+envlist=py27,py26,py27e2e,py26e2e,pywin,py27ssh,pylint_code,pylint_tests
[testenv]
passenv =
@@ -27,15 +27,17 @@ basepython =
py27: python2.7
py26e2e: python2.6
py27e2e: python2.7
+ py26ssh: python2.6
+ py27ssh: python2.7
pywin: {env:PYTHON:}\python.exe
pylint_code: python2.7
pylint_tests: python2.7
[testenv:py27]
-commands=pytest tests --ignore=tests/end2end --cov-report term-missing --cov aria
+commands=pytest tests --ignore=tests/end2end --ignore=tests/orchestrator/execution_plugin/test_ssh.py --cov-report term-missing --cov aria
[testenv:py26]
-commands=pytest tests --ignore=tests/end2end --cov-report term-missing --cov aria
+commands=pytest tests --ignore=tests/end2end --ignore=tests/orchestrator/execution_plugin/test_ssh.py --cov-report term-missing --cov aria
[testenv:py27e2e]
commands=pytest tests/end2end --cov-report term-missing --cov aria
@@ -44,7 +46,15 @@ commands=pytest tests/end2end --cov-report term-missing --cov aria
commands=pytest tests/end2end --cov-report term-missing --cov aria
[testenv:pywin]
-commands=pytest tests --ignore=tests/end2end --cov-report term-missing --cov aria
+commands=pytest tests --ignore=tests/end2end --ignore=tests/orchestrator/execution_plugin/test_ssh.py --cov-report term-missing --cov aria
+
+[testenv:py27ssh]
+install_command=pip install {opts} {packages} .[ssh]
+commands=pytest tests/orchestrator/execution_plugin/test_ssh.py
+
+[testenv:py26ssh]
+install_command=pip install {opts} {packages} .[ssh]
+commands=pytest tests/orchestrator/execution_plugin/test_ssh.py
[testenv:pylint_code]
commands=pylint --rcfile=aria/.pylintrc --disable=fixme,missing-docstring aria extensions/aria_extension_tosca/
[3/4] incubator-ariatosca git commit: ARIA-236 Resumable workflow
executions
Posted by mx...@apache.org.
ARIA-236 Resumable workflow executions
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/75112ab0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/75112ab0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/75112ab0
Branch: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution
Commit: 75112ab052c7de7162901a7a46b5e843316cc63d
Parents: a751934
Author: max-orlov <ma...@gigaspaces.com>
Authored: Mon Jun 19 17:44:45 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu Jun 22 14:21:54 2017 +0300
----------------------------------------------------------------------
aria/cli/commands/executions.py | 57 +++++-
aria/cli/logger.py | 4 +-
aria/modeling/orchestration.py | 3 +-
aria/orchestrator/context/workflow.py | 5 +
aria/orchestrator/events.py | 1 +
aria/orchestrator/exceptions.py | 7 +
aria/orchestrator/workflow_runner.py | 43 +++--
aria/orchestrator/workflows/core/engine.py | 6 +-
.../workflows/core/events_handler.py | 7 +
tests/mock/__init__.py | 2 +-
tests/mock/models.py | 14 +-
tests/modeling/test_models.py | 5 +-
tests/orchestrator/test_workflow_runner.py | 175 +++++++++++++++++--
13 files changed, 282 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index 6176ea2..b337e84 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -134,18 +134,63 @@ def start(workflow_name,
executor = DryExecutor() if dry else None # use WorkflowRunner's default executor
workflow_runner = \
- WorkflowRunner(workflow_name, service.id, inputs,
- model_storage, resource_storage, plugin_manager,
- executor, task_max_attempts, task_retry_interval)
+ WorkflowRunner(
+ model_storage, resource_storage, plugin_manager,
+ service_id=service.id, workflow_name=workflow_name, inputs=inputs, executor=executor,
+ task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
+ )
+ logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
+
+ _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)
+
- execution_thread_name = '{0}_{1}'.format(service_name, workflow_name)
+@executions.command(name='resume',
+ short_help='Resume a workflow')
+@aria.argument('execution-id')
+@aria.options.inputs(help=helptexts.EXECUTION_INPUTS)
+@aria.options.dry_execution
+@aria.options.task_max_attempts()
+@aria.options.task_retry_interval()
+@aria.options.mark_pattern()
+@aria.options.verbose()
+@aria.pass_model_storage
+@aria.pass_resource_storage
+@aria.pass_plugin_manager
+@aria.pass_logger
+def resume(execution_id,
+ dry,
+ task_max_attempts,
+ task_retry_interval,
+ mark_pattern,
+ model_storage,
+ resource_storage,
+ plugin_manager,
+ logger):
+ executor = DryExecutor() if dry else None # use WorkflowRunner's default executor
+
+ workflow_runner = \
+ WorkflowRunner(
+ model_storage, resource_storage, plugin_manager,
+ execution_id=execution_id, executor=executor,
+ task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
+ )
+
+ logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
+ _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)
+
+
+def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern):
+ execution_thread_name = '{0}_{1}'.format(workflow_runner.service.name,
+ workflow_runner.execution.workflow_name)
execution_thread = threading.ExceptionThread(target=workflow_runner.execute,
name=execution_thread_name)
- logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
execution_thread.start()
- log_iterator = cli_logger.ModelLogIterator(model_storage, workflow_runner.execution_id)
+ last_task_id = workflow_runner.execution.logs[-1].id if workflow_runner.execution.logs else 0
+ log_iterator = cli_logger.ModelLogIterator(model_storage,
+ workflow_runner.execution_id,
+ offset=last_task_id)
try:
while execution_thread.is_alive():
execution_logging.log_list(log_iterator, mark_pattern=mark_pattern)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/cli/logger.py
----------------------------------------------------------------------
diff --git a/aria/cli/logger.py b/aria/cli/logger.py
index 5de3701..96f3fb3 100644
--- a/aria/cli/logger.py
+++ b/aria/cli/logger.py
@@ -115,8 +115,8 @@ class Logging(object):
class ModelLogIterator(object):
- def __init__(self, model_storage, execution_id, filters=None, sort=None):
- self._last_visited_id = 0
+ def __init__(self, model_storage, execution_id, filters=None, sort=None, offset=0):
+ self._last_visited_id = offset
self._model_storage = model_storage
self._execution_id = execution_id
self._additional_filters = filters or {}
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 17d2476..276b68e 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -68,7 +68,8 @@ class ExecutionBase(mixins.ModelMixin):
VALID_TRANSITIONS = {
PENDING: (STARTED, CANCELLED),
STARTED: END_STATES + (CANCELLING,),
- CANCELLING: END_STATES
+ CANCELLING: END_STATES,
+ CANCELLED: PENDING
}
@orm.validates('status')
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index aa5a786..adcd635 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -97,10 +97,15 @@ class WorkflowContext(BaseContext):
@property
def _graph(self):
+ # Constructing a graph with only not ended nodes
if self._execution_graph is None:
graph = DiGraph()
for task in self.execution.tasks:
+ if task.has_ended():
+ continue
for dependency in task.dependencies:
+ if dependency.has_ended():
+ continue
graph.add_edge(dependency, task)
self._execution_graph = graph
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/orchestrator/events.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/events.py b/aria/orchestrator/events.py
index a1c4922..aa1b5bc 100644
--- a/aria/orchestrator/events.py
+++ b/aria/orchestrator/events.py
@@ -34,3 +34,4 @@ on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal')
on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal')
on_success_workflow_signal = signal('on_success_workflow_signal')
on_failure_workflow_signal = signal('on_failure_workflow_signal')
+on_resume_workflow_signal = signal('on_resume_workflow_signal')
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/orchestrator/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py
index 8d3dcc6..71b6401 100644
--- a/aria/orchestrator/exceptions.py
+++ b/aria/orchestrator/exceptions.py
@@ -74,3 +74,10 @@ class WorkflowImplementationNotFoundError(AriaError):
Raised when attempting to import a workflow's code but the implementation is not found
"""
pass
+
+
+class InvalidWorkflowRunnerParams(AriaError):
+ """
+ Raised when invalid combination of arguments is passed to the workflow runner
+ """
+ pass
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 9e6b3ad..3ccb1ee 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -37,9 +37,9 @@ DEFAULT_TASK_RETRY_INTERVAL = 30
class WorkflowRunner(object):
- def __init__(self, workflow_name, service_id, inputs,
- model_storage, resource_storage, plugin_manager,
- executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
+ def __init__(self, model_storage, resource_storage, plugin_manager,
+ execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None,
+ task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
"""
Manages a single workflow execution on a given service.
@@ -55,28 +55,36 @@ class WorkflowRunner(object):
:param task_retry_interval: Retry interval in between retry attempts of a failing task
"""
+ if not (execution_id or (workflow_name and service_id)):
+ exceptions.InvalidWorkflowRunnerParams(
+ "Either provide execution id in order to resume a workflow or workflow name "
+ "and service id with inputs")
+
+ self._is_resume = execution_id is not None
+
self._model_storage = model_storage
self._resource_storage = resource_storage
- self._workflow_name = workflow_name
# the IDs are stored rather than the models themselves, so this module could be used
# by several threads without raising errors on model objects shared between threads
- self._service_id = service_id
-
- self._validate_workflow_exists_for_service()
- workflow_fn = self._get_workflow_fn()
-
- execution = self._create_execution_model(inputs)
- self._execution_id = execution.id
+ if self._is_resume:
+ self._execution_id = execution_id
+ self._service_id = self.execution.service.id
+ self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
+ else:
+ self._service_id = service_id
+ self._workflow_name = workflow_name
+ self._validate_workflow_exists_for_service()
+ self._execution_id = self._create_execution_model(inputs).id
self._workflow_context = WorkflowContext(
name=self.__class__.__name__,
model_storage=self._model_storage,
resource_storage=resource_storage,
service_id=service_id,
- execution_id=execution.id,
- workflow_name=workflow_name,
+ execution_id=self._execution_id,
+ workflow_name=self._workflow_name,
task_max_attempts=task_max_attempts,
task_retry_interval=task_retry_interval)
@@ -86,9 +94,10 @@ class WorkflowRunner(object):
# transforming the execution inputs to dict, to pass them to the workflow function
execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
- self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
- compile.create_execution_tasks(
- self._workflow_context, self._tasks_graph, executor.__class__)
+ if not self._is_resume:
+ workflow_fn = self._get_workflow_fn()
+ tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
+ compile.create_execution_tasks(self._workflow_context, tasks_graph, executor.__class__)
self._engine = engine.Engine(executors={executor.__class__: executor})
@@ -105,7 +114,7 @@ class WorkflowRunner(object):
return self._model_storage.service.get(self._service_id)
def execute(self):
- self._engine.execute(ctx=self._workflow_context)
+ self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume)
def cancel(self):
self._engine.cancel_execution(ctx=self._workflow_context)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 9f0ddd7..d5a6e70 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -41,11 +41,15 @@ class Engine(logger.LoggerMixin):
self._executors = executors.copy()
self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
- def execute(self, ctx):
+ def execute(self, ctx, resuming=False):
"""
execute the workflow
"""
executing_tasks = []
+
+ if resuming:
+ events.on_resume_workflow_signal.send(ctx)
+
try:
events.start_workflow_signal.send(ctx)
while True:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 2d71d2a..7380db8 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -121,6 +121,13 @@ def _workflow_cancelled(workflow_context, *args, **kwargs):
execution.ended_at = datetime.utcnow()
+@events.on_resume_workflow_signal.connect
+def _workflow_resume(workflow_context, *args, **kwargs):
+ with workflow_context.persist_changes:
+ execution = workflow_context.execution
+ execution.status = execution.PENDING
+
+
@events.on_cancelling_workflow_signal.connect
def _workflow_cancelling(workflow_context, *args, **kwargs):
with workflow_context.persist_changes:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/tests/mock/__init__.py
----------------------------------------------------------------------
diff --git a/tests/mock/__init__.py b/tests/mock/__init__.py
index 9004b4c..9183b77 100644
--- a/tests/mock/__init__.py
+++ b/tests/mock/__init__.py
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from . import models, context, topology, operations
+from . import models, context, topology, operations, workflow
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index 7f6bbea..23a14bd 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -225,20 +225,24 @@ def create_interface_template(service_template, interface_name, operation_name,
)
-def create_interface(service, interface_name, operation_name, operation_kwargs=None,
- interface_kwargs=None):
- the_type = service.service_template.interface_types.get_descendant('test_interface_type')
-
+def create_operation(operation_name, operation_kwargs=None):
if operation_kwargs and operation_kwargs.get('arguments'):
operation_kwargs['arguments'] = dict(
(argument_name, models.Argument.wrap(argument_name, argument_value))
for argument_name, argument_value in operation_kwargs['arguments'].iteritems()
if argument_value is not None)
- operation = models.Operation(
+ return models.Operation(
name=operation_name,
**(operation_kwargs or {})
)
+
+
+def create_interface(service, interface_name, operation_name, operation_kwargs=None,
+ interface_kwargs=None):
+ the_type = service.service_template.interface_types.get_descendant('test_interface_type')
+ operation = create_operation(operation_name, operation_kwargs)
+
return models.Interface(
type=the_type,
operations=_dictify(operation),
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/tests/modeling/test_models.py
----------------------------------------------------------------------
diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py
index 464f432..bbc7352 100644
--- a/tests/modeling/test_models.py
+++ b/tests/modeling/test_models.py
@@ -314,7 +314,7 @@ class TestExecution(object):
Execution.CANCELLING],
Execution.FAILED: [Execution.FAILED],
Execution.SUCCEEDED: [Execution.SUCCEEDED],
- Execution.CANCELLED: [Execution.CANCELLED]
+ Execution.CANCELLED: [Execution.CANCELLED, Execution.PENDING]
}
invalid_transitions = {
@@ -334,8 +334,7 @@ class TestExecution(object):
Execution.FAILED,
Execution.CANCELLED,
Execution.CANCELLING],
- Execution.CANCELLED: [Execution.PENDING,
- Execution.STARTED,
+ Execution.CANCELLED: [Execution.STARTED,
Execution.FAILED,
Execution.SUCCEEDED,
Execution.CANCELLING],
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index 40f9035..ae82476 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -14,21 +14,31 @@
# limitations under the License.
import json
+from threading import Thread, Event
from datetime import datetime
-import pytest
import mock
+import pytest
from aria.modeling import exceptions as modeling_exceptions
from aria.modeling import models
from aria.orchestrator import exceptions
+from aria.orchestrator.events import on_cancelled_workflow_signal
from aria.orchestrator.workflow_runner import WorkflowRunner
from aria.orchestrator.workflows.executor.process import ProcessExecutor
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine, compile
+from aria.orchestrator.workflows.executor import thread
+from aria.orchestrator import (
+ workflow,
+ operation,
+)
-from ..mock import (
- topology,
- workflow as workflow_mocks
+from tests import (
+ mock as tests_mock,
+ storage
)
+
from ..fixtures import ( # pylint: disable=unused-import
plugins_dir,
plugin_manager,
@@ -36,6 +46,16 @@ from ..fixtures import ( # pylint: disable=unused-import
resource_storage as resource
)
+events = {
+ 'is_resumed': Event(),
+ 'is_active': Event(),
+ 'execution_ended': Event()
+}
+
+
+class TimeoutError(BaseException):
+ pass
+
def test_undeclared_workflow(request):
# validating a proper error is raised when the workflow is not declared in the service
@@ -59,8 +79,8 @@ def test_builtin_workflow_instantiation(request):
# validates the workflow runner instantiates properly when provided with a builtin workflow
# (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
workflow_runner = _create_workflow_runner(request, 'install')
- tasks = list(workflow_runner._tasks_graph.tasks)
- assert len(tasks) == 2 # expecting two WorkflowTasks
+ tasks = list(workflow_runner.execution.tasks)
+ assert len(tasks) == 18 # expecting 18 tasks for 2 node topology
def test_custom_workflow_instantiation(request):
@@ -68,8 +88,8 @@ def test_custom_workflow_instantiation(request):
# (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
mock_workflow = _setup_mock_workflow_in_service(request)
workflow_runner = _create_workflow_runner(request, mock_workflow)
- tasks = list(workflow_runner._tasks_graph.tasks)
- assert len(tasks) == 0 # mock workflow creates no tasks
+ tasks = list(workflow_runner.execution.tasks)
+ assert len(tasks) == 2 # mock workflow creates only start workflow and end workflow task
def test_existing_active_executions(request, service, model):
@@ -139,7 +159,8 @@ def test_execute(request, service):
assert engine_kwargs['ctx'].service.id == service.id
assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow'
- mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context)
+ mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context,
+ resuming=False)
def test_cancel_execution(request):
@@ -240,7 +261,7 @@ def test_workflow_function_parameters(request, tmpdir):
@pytest.fixture
def service(model):
# sets up a service in the storage
- service_id = topology.create_simple_topology_two_nodes(model)
+ service_id = tests_mock.topology.create_simple_topology_two_nodes(model)
service = model.service.get(service_id)
return service
@@ -251,7 +272,7 @@ def _setup_mock_workflow_in_service(request, inputs=None):
service = request.getfuncargvalue('service')
resource = request.getfuncargvalue('resource')
- source = workflow_mocks.__file__
+ source = tests_mock.workflow.__file__
resource.service_template.upload(str(service.service_template.id), source)
mock_workflow_name = 'test_workflow'
arguments = {}
@@ -293,3 +314,135 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
resource_storage=resource,
plugin_manager=plugin_manager,
**task_configuration_kwargs)
+
+
+class TestResumableWorkflows(object):
+
+ def test_resume_workflow(self, workflow_context, executor):
+ node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+ self._create_interface(workflow_context, node, mock_resuming_task)
+
+ service = workflow_context.service
+ service.workflows['custom_workflow'] = tests_mock.models.create_operation(
+ 'custom_workflow',
+ operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)}
+ )
+ workflow_context.model.service.update(service)
+
+ wf_runner = WorkflowRunner(
+ service_id=workflow_context.service.id,
+ inputs={},
+ model_storage=workflow_context.model,
+ resource_storage=workflow_context.resource,
+ plugin_manager=None,
+ workflow_name='custom_workflow',
+ executor=executor)
+ wf_thread = Thread(target=wf_runner.execute)
+ wf_thread.daemon = True
+ wf_thread.start()
+
+ # Wait for the execution to start
+ if events['is_active'].wait(5) is False:
+ raise TimeoutError("is_active wasn't set to True")
+ wf_runner.cancel()
+
+ if events['execution_ended'].wait(60) is False:
+ raise TimeoutError("Execution did not end")
+
+ first_task, second_task = workflow_context.model.task.list(filters={'_stub_type': None})
+ assert first_task.status == first_task.SUCCESS
+ assert second_task.status in (second_task.FAILED, second_task.RETRYING)
+ events['is_resumed'].set()
+ assert second_task.status in (second_task.FAILED, second_task.RETRYING)
+
+ # Create a new workflow runner, with an existing execution id. This would cause
+ # the old execution to restart.
+ new_wf_runner = WorkflowRunner(
+ service_id=wf_runner.service.id,
+ inputs={},
+ model_storage=workflow_context.model,
+ resource_storage=workflow_context.resource,
+ plugin_manager=None,
+ execution_id=wf_runner.execution.id,
+ executor=executor)
+
+ new_wf_runner.execute()
+
+ # Wait for it to finish and assert changes.
+ assert second_task.status == second_task.SUCCESS
+ assert node.attributes['invocations'].value == 3
+ assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
+ @staticmethod
+ @pytest.fixture
+ def executor():
+ result = thread.ThreadExecutor()
+ try:
+ yield result
+ finally:
+ result.close()
+
+ @staticmethod
+ @pytest.fixture
+ def workflow_context(tmpdir):
+ workflow_context = tests_mock.context.simple(str(tmpdir))
+ yield workflow_context
+ storage.release_sqlite_storage(workflow_context.model)
+
+ @staticmethod
+ def _create_interface(ctx, node, func, arguments=None):
+ interface_name = 'aria.interfaces.lifecycle'
+ operation_kwargs = dict(function='{name}.{func.__name__}'.format(
+ name=__name__, func=func))
+ if arguments:
+ # the operation has to declare the arguments before those may be passed
+ operation_kwargs['arguments'] = arguments
+ operation_name = 'create'
+ interface = tests_mock.models.create_interface(node.service, interface_name, operation_name,
+ operation_kwargs=operation_kwargs)
+ node.interfaces[interface.name] = interface
+ ctx.model.node.update(node)
+
+ return node, interface_name, operation_name
+
+ @staticmethod
+ def _engine(workflow_func, workflow_context, executor):
+ graph = workflow_func(ctx=workflow_context)
+ execution = workflow_context.execution
+ compile.create_execution_tasks(execution, graph, executor.__class__)
+ workflow_context.execution = execution
+
+ return engine.Engine(executors={executor.__class__: executor})
+
+ @pytest.fixture(autouse=True)
+ def register_to_events(self):
+ def execution_ended(*args, **kwargs):
+ events['execution_ended'].set()
+
+ on_cancelled_workflow_signal.connect(execution_ended)
+ yield
+ on_cancelled_workflow_signal.disconnect(execution_ended)
+
+
+@workflow
+def mock_workflow(ctx, graph):
+ node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ graph.add_tasks(
+ api.task.OperationTask(
+ node, interface_name='aria.interfaces.lifecycle', operation_name='create'),
+ api.task.OperationTask(
+ node, interface_name='aria.interfaces.lifecycle', operation_name='create')
+ )
+
+
+@operation
+def mock_resuming_task(ctx):
+ ctx.node.attributes['invocations'] += 1
+
+ if ctx.node.attributes['invocations'] != 1:
+ events['is_active'].set()
+ if not events['is_resumed'].isSet():
+ # if resume was called, increase by one. o/w fail the execution - second task should
+ # fail as long it was not a part of resuming the workflow
+ raise BaseException("wasn't resumed yet")
[4/4] incubator-ariatosca git commit: wip
Posted by mx...@apache.org.
wip
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/827230da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/827230da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/827230da
Branch: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution
Commit: 827230da2f515c2c087d847a68137f27241de161
Parents: 75112ab
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed Jun 21 12:41:33 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu Jun 22 14:28:55 2017 +0300
----------------------------------------------------------------------
aria/modeling/orchestration.py | 2 -
aria/orchestrator/context/workflow.py | 19 --
aria/orchestrator/workflow_runner.py | 5 +-
aria/orchestrator/workflows/core/compile.py | 198 ++++++++++---------
aria/orchestrator/workflows/core/engine.py | 110 +++++++----
tests/orchestrator/context/__init__.py | 2 +-
tests/orchestrator/context/test_serialize.py | 2 +-
.../orchestrator/execution_plugin/test_local.py | 2 +-
tests/orchestrator/execution_plugin/test_ssh.py | 3 +-
.../orchestrator/workflows/core/test_engine.py | 2 +-
.../orchestrator/workflows/core/test_events.py | 7 +-
.../test_task_graph_into_execution_graph.py | 19 +-
.../executor/test_process_executor_extension.py | 2 +-
.../test_process_executor_tracked_changes.py | 2 +-
14 files changed, 198 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/827230da/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 276b68e..5b02d1b 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -306,7 +306,6 @@ class TaskBase(mixins.ModelMixin):
ended_at = Column(DateTime, default=None)
attempts_count = Column(Integer, default=1)
- _api_id = Column(String)
_executor = Column(PickleType)
_context_cls = Column(PickleType)
_stub_type = Column(Enum(*STUB_TYPES))
@@ -442,7 +441,6 @@ class TaskBase(mixins.ModelMixin):
'plugin': api_task.plugin,
'function': api_task.function,
'arguments': api_task.arguments,
- '_api_id': api_task.id,
'_context_cls': api_task._context_cls,
'_executor': executor,
}
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/827230da/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index adcd635..18334f3 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -20,8 +20,6 @@ Workflow and operation contexts
import threading
from contextlib import contextmanager
-from networkx import DiGraph
-
from .exceptions import ContextException
from .common import BaseContext
@@ -96,23 +94,6 @@ class WorkflowContext(BaseContext):
)
@property
- def _graph(self):
- # Constructing a graph with only not ended nodes
- if self._execution_graph is None:
- graph = DiGraph()
- for task in self.execution.tasks:
- if task.has_ended():
- continue
- for dependency in task.dependencies:
- if dependency.has_ended():
- continue
- graph.add_edge(dependency, task)
-
- self._execution_graph = graph
-
- return self._execution_graph
-
- @property
@contextmanager
def persist_changes(self):
yield
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/827230da/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 3ccb1ee..b3f100d 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -96,8 +96,9 @@ class WorkflowRunner(object):
if not self._is_resume:
workflow_fn = self._get_workflow_fn()
- tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
- compile.create_execution_tasks(self._workflow_context, tasks_graph, executor.__class__)
+ self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
+ compile.GraphCompiler(self._workflow_context, executor.__class__).compile(
+ self._tasks_graph)
self._engine = engine.Engine(executors={executor.__class__: executor})
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/827230da/aria/orchestrator/workflows/core/compile.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/compile.py b/aria/orchestrator/workflows/core/compile.py
index 932268a..83de22c 100644
--- a/aria/orchestrator/workflows/core/compile.py
+++ b/aria/orchestrator/workflows/core/compile.py
@@ -18,99 +18,105 @@ from ....modeling import models
from .. import executor, api
-def create_execution_tasks(ctx, task_graph, default_executor):
- execution = ctx.execution
- _construct_execution_tasks(execution, task_graph, default_executor)
- ctx.model.execution.update(execution)
- return execution.tasks
-
-
-def _construct_execution_tasks(execution,
- task_graph,
- default_executor,
- stub_executor=executor.base.StubTaskExecutor,
- start_stub_type=models.Task.START_WORKFLOW,
- end_stub_type=models.Task.END_WORKFLOW,
- depends_on=()):
- """
- Translates the user graph to the execution graph
- :param task_graph: The user's graph
- :param start_stub_type: internal use
- :param end_stub_type: internal use
- :param depends_on: internal use
- """
- depends_on = list(depends_on)
-
- # Insert start marker
- start_task = models.Task(execution=execution,
- dependencies=depends_on,
- _api_id=_start_graph_suffix(task_graph.id),
- _stub_type=start_stub_type,
- _executor=stub_executor)
-
- for task in task_graph.topological_order(reverse=True):
- operation_dependencies = _get_tasks_from_dependencies(
- execution, task_graph.get_dependencies(task), [start_task])
-
- if isinstance(task, api.task.OperationTask):
- models.Task.from_api_task(api_task=task,
- executor=default_executor,
- dependencies=operation_dependencies)
-
- elif isinstance(task, api.task.WorkflowTask):
- # Build the graph recursively while adding start and end markers
- _construct_execution_tasks(
- execution=execution,
- task_graph=task,
- default_executor=default_executor,
- stub_executor=stub_executor,
- start_stub_type=models.Task.START_SUBWROFKLOW,
- end_stub_type=models.Task.END_SUBWORKFLOW,
- depends_on=operation_dependencies
- )
- elif isinstance(task, api.task.StubTask):
- models.Task(execution=execution,
- dependencies=operation_dependencies,
- _api_id=task.id,
- _executor=stub_executor,
- _stub_type=models.Task.STUB,
- )
- else:
- raise RuntimeError('Undefined state')
-
- # Insert end marker
- models.Task(dependencies=_get_non_dependent_tasks(execution) or [start_task],
- execution=execution,
- _api_id=_end_graph_suffix(task_graph.id),
- _executor=stub_executor,
- _stub_type=end_stub_type)
-
-
-def _start_graph_suffix(api_id):
- return '{0}-Start'.format(api_id)
-
-
-def _end_graph_suffix(api_id):
- return '{0}-End'.format(api_id)
-
-
-def _get_non_dependent_tasks(execution):
- tasks_with_dependencies = set()
- for task in execution.tasks:
- tasks_with_dependencies.update(task.dependencies)
- return list(set(execution.tasks) - set(tasks_with_dependencies))
-
-
-def _get_tasks_from_dependencies(execution, dependencies, default=()):
- """
- Returns task list from dependencies.
- """
- tasks = []
- for dependency in dependencies:
- if getattr(dependency, 'actor', False):
- # This is
- dependency_name = dependency.id
- else:
- dependency_name = _end_graph_suffix(dependency.id)
- tasks.extend(task for task in execution.tasks if task._api_id == dependency_name)
- return tasks or default
+# TODO: is class really needed?
+
+class GraphCompiler(object):
+ def __init__(self, ctx, default_executor):
+ self._ctx = ctx
+ self._default_executor = default_executor
+ self._stub_executor = executor.base.StubTaskExecutor
+ self._model_to_api_id = {}
+
+ def compile(self,
+ task_graph,
+ start_stub_type=models.Task.START_WORKFLOW,
+ end_stub_type=models.Task.END_WORKFLOW,
+ depends_on=()):
+ """
+ Translates the user graph to the execution graph
+ :param task_graph: The user's graph
+ :param start_stub_type: internal use
+ :param end_stub_type: internal use
+ :param depends_on: internal use
+ """
+ task_graph = task_graph or self._task_graph
+ depends_on = list(depends_on)
+
+ # Insert start marker
+ start_task = self._create_stub_task(
+ start_stub_type, depends_on, self._start_graph_suffix(task_graph.id), task_graph.name,
+ )
+
+ for task in task_graph.topological_order(reverse=True):
+ dependencies = \
+ (self._get_tasks_from_dependencies(task_graph.get_dependencies(task))
+ or [start_task])
+
+ if isinstance(task, api.task.OperationTask):
+ self._create_operation_task(task, dependencies)
+
+ elif isinstance(task, api.task.WorkflowTask):
+ # Build the graph recursively while adding start and end markers
+ self.compile(
+ task, models.Task.START_SUBWROFKLOW, models.Task.END_SUBWORKFLOW, dependencies
+ )
+ elif isinstance(task, api.task.StubTask):
+ self._create_stub_task(models.Task.STUB, dependencies, task.id)
+ else:
+ raise RuntimeError('Undefined state')
+
+ # Insert end marker
+ self._create_stub_task(
+ end_stub_type,
+ self._get_non_dependent_tasks(self._ctx.execution) or [start_task],
+ self._end_graph_suffix(task_graph.id),
+ task_graph.name
+ )
+
+ def _create_stub_task(self, stub_type, dependencies, api_id, name=None):
+ model_task = models.Task(
+ name=name,
+ dependencies=dependencies,
+ execution=self._ctx.execution,
+ _executor=self._stub_executor,
+ _stub_type=stub_type)
+ self._ctx.model.task.put(model_task)
+ self._model_to_api_id[model_task.id] = api_id
+ return model_task
+
+ def _create_operation_task(self, api_task, dependencies):
+ model_task = models.Task.from_api_task(
+ api_task, self._default_executor, dependencies=dependencies)
+ self._ctx.model.task.put(model_task)
+ self._model_to_api_id[model_task.id] = api_task.id
+ return model_task
+
+ @staticmethod
+ def _start_graph_suffix(api_id):
+ return '{0}-Start'.format(api_id)
+
+ @staticmethod
+ def _end_graph_suffix(api_id):
+ return '{0}-End'.format(api_id)
+
+ @staticmethod
+ def _get_non_dependent_tasks(execution):
+ tasks_with_dependencies = set()
+ for task in execution.tasks:
+ tasks_with_dependencies.update(task.dependencies)
+ return list(set(execution.tasks) - set(tasks_with_dependencies))
+
+ def _get_tasks_from_dependencies(self, dependencies):
+ """
+ Returns task list from dependencies.
+ """
+ tasks = []
+ for dependency in dependencies:
+ if getattr(dependency, 'actor', False):
+ # This is
+ dependency_name = dependency.id
+ else:
+ dependency_name = self._end_graph_suffix(dependency.id)
+ tasks.extend(task for task in self._ctx.execution.tasks
+ if self._model_to_api_id.get(task.id, None) == dependency_name)
+ return tasks
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/827230da/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index d5a6e70..f594e36 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -45,22 +45,23 @@ class Engine(logger.LoggerMixin):
"""
execute the workflow
"""
- executing_tasks = []
-
if resuming:
events.on_resume_workflow_signal.send(ctx)
+ task_tracker = _TasksTracker(ctx)
try:
events.start_workflow_signal.send(ctx)
while True:
cancel = self._is_cancel(ctx)
if cancel:
break
- for task in self._ended_tasks(ctx, executing_tasks):
- self._handle_ended_tasks(ctx, task, executing_tasks)
- for task in self._executable_tasks(ctx):
- self._handle_executable_task(ctx, task, executing_tasks)
- if self._all_tasks_consumed(ctx):
+ for task in task_tracker.ended_tasks:
+ self._handle_ended_tasks(task)
+ task_tracker.finished_(task)
+ for task in task_tracker.executable_tasks:
+ task_tracker.executing_(task)
+ self._handle_executable_task(ctx, task)
+ if task_tracker.all_tasks_consumed:
break
else:
time.sleep(0.1)
@@ -86,34 +87,7 @@ class Engine(logger.LoggerMixin):
execution = ctx.model.execution.refresh(ctx.execution)
return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED)
- def _executable_tasks(self, ctx):
- now = datetime.utcnow()
- return (
- task for task in self._tasks_iter(ctx)
- if task.is_waiting() and task.due_at <= now and \
- not self._task_has_dependencies(ctx, task)
- )
-
- @staticmethod
- def _ended_tasks(ctx, executing_tasks):
- for task in executing_tasks:
- if task.has_ended() and task in ctx._graph:
- yield task
-
- @staticmethod
- def _task_has_dependencies(ctx, task):
- return len(ctx._graph.pred.get(task, [])) > 0
-
- @staticmethod
- def _all_tasks_consumed(ctx):
- return len(ctx._graph.node) == 0
-
- @staticmethod
- def _tasks_iter(ctx):
- for task in ctx.execution.tasks:
- yield ctx.model.task.refresh(task)
-
- def _handle_executable_task(self, ctx, task, executing_tasks):
+ def _handle_executable_task(self, ctx, task):
task_executor = self._executors[task._executor]
# If the task is a stub, a default context is provided, else it should hold the context cls
@@ -129,16 +103,70 @@ class Engine(logger.LoggerMixin):
name=task.name
)
- executing_tasks.append(task)
-
if not task._stub_type:
events.sent_task_signal.send(op_ctx)
task_executor.execute(op_ctx)
@staticmethod
- def _handle_ended_tasks(ctx, task, executing_tasks):
- executing_tasks.remove(task)
+ def _handle_ended_tasks(task):
if task.status == models.Task.FAILED and not task.ignore_failure:
raise exceptions.ExecutorException('Workflow failed')
- else:
- ctx._graph.remove_node(task)
+
+
+class _TasksTracker(object):
+ def __init__(self, ctx):
+ self._ctx = ctx
+ self._tasks = ctx.execution.tasks
+ self._executed_tasks = [task for task in self._tasks if task.has_ended()]
+ self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks))
+ self._executing_tasks = []
+
+ @property
+ def all_tasks_consumed(self):
+ return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0
+
+ def executing_(self, task):
+ # Task executing could be retrying (thus removed and added earlier)
+ if task not in self._executing_tasks:
+ self._executable_tasks.remove(task)
+ self._executing_tasks.append(task)
+
+ def finished_(self, task):
+ self._executing_tasks.remove(task)
+ self._executed_tasks.append(task)
+
+ @property
+ def ended_tasks(self):
+ for task in self.executing_tasks:
+ if task.has_ended():
+ yield task
+
+ @property
+ def executable_tasks(self):
+ now = datetime.utcnow()
+ # we need both list since retrying task are in the executing task list.
+ for task in self._update_tasks(self._executing_tasks + self._executable_tasks):
+ if all([task.is_waiting(),
+ task.due_at <= now,
+ all(dependency in self._executed_tasks for dependency in task.dependencies)
+ ]):
+ yield task
+
+ @property
+ def executing_tasks(self):
+ for task in self._update_tasks(self._executing_tasks):
+ yield task
+
+ @property
+ def executed_tasks(self):
+ for task in self._update_tasks(self._executed_tasks):
+ yield task
+
+ @property
+ def tasks(self):
+ for task in self._update_tasks(self._tasks):
+ yield task
+
+ def _update_tasks(self, tasks):
+ for task in tasks:
+ yield self._ctx.model.task.refresh(task)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/827230da/tests/orchestrator/context/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py
index 086a066..752706e 100644
--- a/tests/orchestrator/context/__init__.py
+++ b/tests/orchestrator/context/__init__.py
@@ -26,7 +26,7 @@ def op_path(func, module_path=None):
def execute(workflow_func, workflow_context, executor):
graph = workflow_func(ctx=workflow_context)
- compile.create_execution_tasks(workflow_context, graph, executor.__class__)
+ compile.GraphCompiler(workflow_context, executor.__class__).compile(graph)
eng = engine.Engine(executors={executor.__class__: executor})
eng.execute(workflow_context)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/827230da/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index 5db5b63..b7335a0 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -48,7 +48,7 @@ def test_serialize_operation_context(context, executor, tmpdir):
context.model.node.update(node)
graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- compile.create_execution_tasks(context, graph, executor.__class__)
+ compile.GraphCompiler(context, executor.__class__).compile(graph)
eng = engine.Engine({executor.__class__: executor})
eng.execute(context)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/827230da/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 1695320..ab6310c 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -500,7 +500,7 @@ if __name__ == '__main__':
arguments=arguments))
return graph
tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter
- compile.create_execution_tasks(workflow_context, tasks_graph, executor.__class__)
+ compile.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph)
eng = engine.Engine({executor.__class__: executor})
eng.execute(workflow_context)
return workflow_context.model.node.get_by_name(
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/827230da/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index fb1dc09..13ad1a3 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -254,8 +254,7 @@ class TestWithActualSSHServer(object):
graph.sequence(*ops)
return graph
tasks_graph = mock_workflow(ctx=self._workflow_context) # pylint: disable=no-value-for-parameter
- compile.create_execution_tasks(
- self._workflow_context, tasks_graph, self._executor.__class__)
+ compile.GraphCompiler(self._workflow_context, self._executor.__class__).compile(tasks_graph)
eng = engine.Engine({self._executor.__class__: self._executor})
eng.execute(self._workflow_context)
return self._workflow_context.model.node.get_by_name(
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/827230da/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index b77d284..7275723 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -50,7 +50,7 @@ class BaseTest(object):
@staticmethod
def _engine(workflow_func, workflow_context, executor):
graph = workflow_func(ctx=workflow_context)
- compile.create_execution_tasks(workflow_context, graph, executor.__class__)
+ compile.GraphCompiler(workflow_context, executor.__class__).compile(graph)
return engine.Engine(executors={executor.__class__: executor})
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/827230da/tests/orchestrator/workflows/core/test_events.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py
index 2b82443..32a6b7b 100644
--- a/tests/orchestrator/workflows/core/test_events.py
+++ b/tests/orchestrator/workflows/core/test_events.py
@@ -113,10 +113,9 @@ def run_operation_on_node(ctx, op_name, interface_name):
operation_name=op_name,
operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func)))
node.interfaces[interface.name] = interface
- compile.create_execution_tasks(
- ctx,
- single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name),
- ThreadExecutor)
+ compile.GraphCompiler(ctx, ThreadExecutor).compile(
+ single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name)
+ )
eng = engine.Engine(executors={ThreadExecutor: ThreadExecutor()})
eng.execute(ctx)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/827230da/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index f5fb17a..3d47d54 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from networkx import topological_sort
+from networkx import topological_sort, DiGraph
from aria.modeling import models
from aria.orchestrator import context
@@ -65,9 +65,10 @@ def test_task_graph_into_execution_graph(tmpdir):
test_task_graph.add_dependency(inner_task_graph, simple_before_task)
test_task_graph.add_dependency(simple_after_task, inner_task_graph)
- compile.create_execution_tasks(workflow_context, test_task_graph, base.StubTaskExecutor)
+ graph_compiler = compile.GraphCompiler(workflow_context, base.StubTaskExecutor)
+ graph_compiler.compile(test_task_graph)
- execution_tasks = topological_sort(workflow_context._graph)
+ execution_tasks = topological_sort(_graph(workflow_context.execution.tasks))
assert len(execution_tasks) == 7
@@ -81,7 +82,7 @@ def test_task_graph_into_execution_graph(tmpdir):
'{0}-End'.format(test_task_graph.id)
]
- assert expected_tasks_names == [t._api_id for t in execution_tasks]
+ assert expected_tasks_names == [graph_compiler._model_to_api_id[t.id] for t in execution_tasks]
assert all(isinstance(task, models.Task) for task in execution_tasks)
execution_tasks = iter(execution_tasks)
@@ -97,7 +98,6 @@ def test_task_graph_into_execution_graph(tmpdir):
def _assert_execution_is_api_task(execution_task, api_task):
- assert execution_task._api_id == api_task.id
assert execution_task.name == api_task.name
assert execution_task.function == api_task.function
assert execution_task.actor == api_task.actor
@@ -106,3 +106,12 @@ def _assert_execution_is_api_task(execution_task, api_task):
def _get_task_by_name(task_name, graph):
return graph.node[task_name]['task']
+
+
+def _graph(tasks):
+ graph = DiGraph()
+ for task in tasks:
+ for dependency in task.dependencies:
+ graph.add_edge(dependency, task)
+
+ return graph
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/827230da/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index ba98c4f..aa08685 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -57,7 +57,7 @@ def test_decorate_extension(context, executor):
graph.add_tasks(task)
return graph
graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- compile.create_execution_tasks(context, graph, executor.__class__)
+ compile.GraphCompiler(context, executor.__class__).compile(graph)
eng = engine.Engine({executor.__class__: executor})
eng.execute(context)
out = get_node(context).attributes.get('out').value
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/827230da/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 2f1c325..7102b13 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -107,7 +107,7 @@ def _run_workflow(context, executor, op_func, arguments=None):
graph.add_tasks(task)
return graph
graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- compile.create_execution_tasks(context, graph, executor.__class__)
+ compile.GraphCompiler(context, executor.__class__).compile(graph)
eng = engine.Engine({executor.__class__: executor})
eng.execute(context)
out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')
[2/4] incubator-ariatosca git commit: ARIA-283 Update readme
installation instructions
Posted by mx...@apache.org.
ARIA-283 Update readme installation instructions
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/a7519349
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/a7519349
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/a7519349
Branch: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution
Commit: a7519349c07a20a04c1b8acda656daa8679f5135
Parents: 105971f
Author: Ran Ziv <ra...@gigaspaces.com>
Authored: Wed Jun 21 18:05:05 2017 +0300
Committer: Ran Ziv <ra...@gigaspaces.com>
Committed: Thu Jun 22 12:37:08 2017 +0300
----------------------------------------------------------------------
README.rst | 66 +++++++++++++++++++++++++++++++++++++++------------------
1 file changed, 45 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a7519349/README.rst
----------------------------------------------------------------------
diff --git a/README.rst b/README.rst
index 8af13a5..dc53d47 100644
--- a/README.rst
+++ b/README.rst
@@ -26,37 +26,61 @@ ARIA is an incubation project under the `Apache Software Foundation <https://www
Installation
------------
-ARIA is `available on PyPI <https://pypi.python.org/pypi/ariatosca>`__.
+ARIA is `available on PyPI <https://pypi.python.org/pypi/apache-ariatosca>`__.
+
+ARIA requires Python 2.6/2.7. Python 3 is currently not supported.
To install ARIA directly from PyPI (using a ``wheel``), use::
- pip install aria
+ pip install --upgrade pip setuptools
+ pip install apache-ariatosca
To install ARIA from source, download the source tarball from
-`PyPI <https://pypi.python.org/pypi/ariatosca>`__, extract it, and then when inside the extracted
-directory, use::
+`PyPI <https://pypi.python.org/pypi/apache-ariatosca>`__, extract it, and run::
+
+ pip install --upgrade pip setuptools
+ pip install incubator-ariatosca
+
+| The source package comes along with relevant examples, documentation, ``requirements.txt`` (for installing specifically the frozen dependencies' versions with which ARIA was tested) and more.
+|
+|
+| ARIA has additional optional dependencies. These are required for running operations over SSH.
+| Below are instructions on how to install these dependencies, including required system dependencies per OS.
+|
+| Note: These dependencies may have varying licenses which may not be compatible with Apache license 2.0.
+|
+
+**Ubuntu/Debian** (tested on Ubuntu14.04, Ubuntu16.04)::
+
+ apt-get install -y python-dev gcc libffi-dev libssl-dev
+ pip install aria[ssh]
+
+**Centos** (tested on Centos6.6, Centos7)::
+
+ yum install -y python-devel gcc libffi-devel openssl-devel
+ pip install aria[ssh]
+
+**Archlinux**::
+
+ pacman -Syu --noconfirm python2 gcc libffi openssl
+ pip2 install aria[ssh]
- pip install .
+**Windows** (tested on Win10)::
-The source package comes along with relevant examples, documentation, ``requirements.txt`` (for
-installing specifically the frozen dependencies' versions with which ARIA was tested) and more.
+ # no additional system requirements are needed
+ pip install aria[ssh]
-Note that for the ``pip install`` commands mentioned above, you must use a privileged user, or use
-virtualenv.
+**MacOS**::
-ARIA itself is in a ``wheel`` format compatible with all platforms. Some dependencies, however,
-might require compilation (based on a given platform), and therefore possibly some system
-dependencies are required as well.
+ # TODO
-On Ubuntu or other Debian-based systems::
- sudo apt install python-setuptools python-dev build-essential libssl-dev libffi-dev
-On Archlinux::
+To install ``pip``, either use your distro's package management system, or run::
- sudo pacman -S python-setuptools
+ wget http://bootstrap.pypa.io/get-pip.py
+ python get-pip.py
-ARIA requires Python 2.6/2.7. Python 3+ is currently not supported.
Getting Started
@@ -129,10 +153,10 @@ ARIA is licensed under the
:target: https://ci.appveyor.com/project/ApacheSoftwareFoundation/incubator-ariatosca/history
.. |License| image:: https://img.shields.io/github/license/apache/incubator-ariatosca.svg
:target: http://www.apache.org/licenses/LICENSE-2.0
-.. |PyPI release| image:: https://img.shields.io/pypi/v/ariatosca.svg
- :target: https://pypi.python.org/pypi/ariatosca
-.. |Python Versions| image:: https://img.shields.io/pypi/pyversions/ariatosca.svg
-.. |Wheel| image:: https://img.shields.io/pypi/wheel/ariatosca.svg
+.. |PyPI release| image:: https://img.shields.io/pypi/v/apache-ariatosca.svg
+ :target: https://pypi.python.org/pypi/apache-ariatosca
+.. |Python Versions| image:: https://img.shields.io/pypi/pyversions/apache-ariatosca.svg
+.. |Wheel| image:: https://img.shields.io/pypi/wheel/apache-ariatosca.svg
.. |Contributors| image:: https://img.shields.io/github/contributors/apache/incubator-ariatosca.svg
.. |Open Pull Requests| image:: https://img.shields.io/github/issues-pr/apache/incubator-ariatosca.svg
:target: https://github.com/apache/incubator-ariatosca/pulls