You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2023/03/16 06:08:33 UTC
[incubator-devlake] branch main updated: Azuredevops: Fix test-connection command (#4679)
This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new fd072b658 Azuredevops: Fix test-connection command (#4679)
fd072b658 is described below
commit fd072b6584ca7e15046e80ad4f893e7f155f97d7
Author: Camille Teruel <ca...@gmail.com>
AuthorDate: Thu Mar 16 07:08:28 2023 +0100
Azuredevops: Fix test-connection command (#4679)
* feat: Better error messages for malformed JSON
The "fire" module parses JSON command arguments as python dict or list.
But when the JSON argument is malformed, fire will just pass a str, silencing any decoding error.
So each time we expect a dict or a list, we reparse the potentially invalid JSON data to get and return the decoding error.
* fix: Make Model.id optional
This id is not necessarily know, e.g. when testing a connection that is not stored in DB yet.
* fix: Remove leftover ref to connection.org
* refactor: Remove AzureDevOpsConnection.base_url
Remove base_url attr because we won't support on-prem Azure DevOps Server soon.
* feat: Add support for proxy
Every connection can have a proxy attribute that will be used when sending requests.
AzureDevOpsConnection now instanciated from connection.
* fix: Do not return error in case of invalid connection
The TestConnection endpoint is supposed to return a "false" payload in case of invalid connection
* feat: Automatically filter out subtasks based on requested entity types
---------
Co-authored-by: Camille Teruel <ca...@meri.co>
---
.../python/plugins/azuredevops/azuredevops/api.py | 16 ++++------
.../python/plugins/azuredevops/azuredevops/main.py | 8 ++---
.../plugins/azuredevops/azuredevops/models.py | 1 -
.../azuredevops/azuredevops/streams/builds.py | 7 ++---
.../azuredevops/azuredevops/streams/commits.py | 5 ++--
.../azuredevops/azuredevops/streams/jobs.py | 7 ++---
.../azuredevops/streams/pull_request_commits.py | 3 +-
.../azuredevops/streams/pull_requests.py | 4 +--
backend/python/pydevlake/pydevlake/api.py | 17 +++++++++--
backend/python/pydevlake/pydevlake/ipc.py | 35 +++++++++++++++++-----
backend/python/pydevlake/pydevlake/model.py | 11 ++++---
backend/python/pydevlake/pydevlake/plugin.py | 23 ++++++++++----
backend/python/pydevlake/pydevlake/stream.py | 5 +++-
.../services/remote/plugin/connection_api.go | 4 +--
.../services/remote/plugin/plugin_extensions.go | 4 ++-
15 files changed, 96 insertions(+), 54 deletions(-)
diff --git a/backend/python/plugins/azuredevops/azuredevops/api.py b/backend/python/plugins/azuredevops/azuredevops/api.py
index 825f9b392..352eb3955 100644
--- a/backend/python/plugins/azuredevops/azuredevops/api.py
+++ b/backend/python/plugins/azuredevops/azuredevops/api.py
@@ -18,6 +18,8 @@ import base64
from pydevlake.api import API, request_hook, Paginator, Request
+from azuredevops.models import AzureDevOpsConnection
+
class AzurePaginator(Paginator):
def get_items(self, response) -> Optional[list[object]]:
@@ -32,20 +34,12 @@ class AzurePaginator(Paginator):
class AzureDevOpsAPI(API):
paginator = AzurePaginator()
-
- def __init__(self, base_url: str, pat: str):
- self._base_url = base_url or "https://dev.azure.com/"
- self.pat = pat
-
- @property
- def base_url(self):
- return self._base_url
+ base_url = "https://dev.azure.com/"
@request_hook
def authenticate(self, request: Request):
- if self.pat:
- pat_b64 = base64.b64encode((':' + self.pat).encode()).decode()
- request.headers['Authorization'] = 'Basic ' + pat_b64
+ pat_b64 = base64.b64encode((':' + self.connection.pat).encode()).decode()
+ request.headers['Authorization'] = 'Basic ' + pat_b64
@request_hook
def set_api_version(self, request: Request):
diff --git a/backend/python/plugins/azuredevops/azuredevops/main.py b/backend/python/plugins/azuredevops/azuredevops/main.py
index ec03ef345..5b5bc3841 100644
--- a/backend/python/plugins/azuredevops/azuredevops/main.py
+++ b/backend/python/plugins/azuredevops/azuredevops/main.py
@@ -51,7 +51,7 @@ class AzureDevOpsPlugin(Plugin):
)
def remote_scope_groups(self, ctx) -> list[RemoteScopeGroup]:
- api = AzureDevOpsAPI(ctx.connection.base_url, ctx.connection.pat)
+ api = AzureDevOpsAPI(ctx.connection)
member_id = api.my_profile.json['id']
accounts = api.accounts(member_id).json
orgs = [acc['accountId'] for acc in accounts]
@@ -64,7 +64,7 @@ class AzureDevOpsPlugin(Plugin):
def remote_scopes(self, ctx, group_id: str) -> list[GitRepository]:
org, proj = group_id.split('/')
- api = AzureDevOpsAPI(ctx.connection.base_url, ctx.connection.pat)
+ api = AzureDevOpsAPI(ctx.connection)
for raw_repo in api.git_repos(org, proj):
repo = GitRepository(**raw_repo, project_id=proj, org_id=org)
if not repo.defaultBranch:
@@ -74,9 +74,9 @@ class AzureDevOpsPlugin(Plugin):
yield repo
def test_connection(self, connection: AzureDevOpsConnection):
- resp = AzureDevOpsAPI(connection.base_url, connection.pat).projects(connection.org)
+ resp = AzureDevOpsAPI(connection).my_profile()
if resp.status != 200:
- raise Exception(f"Invalid connection: {resp.json}")
+ raise Exception(f"Invalid token: {connection.token}")
@property
def streams(self):
diff --git a/backend/python/plugins/azuredevops/azuredevops/models.py b/backend/python/plugins/azuredevops/azuredevops/models.py
index 8f2575052..a6f72376f 100644
--- a/backend/python/plugins/azuredevops/azuredevops/models.py
+++ b/backend/python/plugins/azuredevops/azuredevops/models.py
@@ -26,7 +26,6 @@ default_date = datetime.datetime.fromisoformat("1970-01-01")
class AzureDevOpsConnection(Connection):
- base_url: str
pat: str
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/builds.py b/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
index 3996e4be9..574810203 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
@@ -18,7 +18,7 @@ from typing import Iterable
import iso8601 as iso8601
from azuredevops.api import AzureDevOpsAPI
-from azuredevops.models import AzureDevOpsConnection, GitRepository
+from azuredevops.models import GitRepository
from azuredevops.models import Build
from pydevlake import Context, DomainType, Stream, logger
import pydevlake.domain_layer.devops as devops
@@ -29,10 +29,9 @@ class Builds(Stream):
domain_types = [DomainType.CICD]
def collect(self, state, context) -> Iterable[tuple[object, dict]]:
- connection: AzureDevOpsConnection = context.connection
repo: GitRepository = context.scope
- azuredevops_api = AzureDevOpsAPI(connection.base_url, connection.pat)
- response = azuredevops_api.builds(repo.org_id, repo.project_id, repo.id, repo.provider)
+ api = AzureDevOpsAPI(context.connection)
+ response = api.builds(repo.org_id, repo.project_id, repo.id, repo.provider)
for raw_build in response:
yield raw_build, state
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/commits.py b/backend/python/plugins/azuredevops/azuredevops/streams/commits.py
index 3e02ef2f2..fb083f2a5 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/commits.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/commits.py
@@ -29,10 +29,9 @@ class GitCommits(Stream):
domain_types = [DomainType.CODE]
def collect(self, state, context) -> Iterable[tuple[object, dict]]:
- connection = context.connection
repo: GitRepository = context.scope
- azuredevops_api = AzureDevOpsAPI(connection.base_url, connection.pat)
- response = azuredevops_api.commits(repo.org_id, repo.project_id, repo.id)
+ api = AzureDevOpsAPI(context.connection)
+ response = api.commits(repo.org_id, repo.project_id, repo.id)
for raw_commit in response:
raw_commit["repo_id"] = repo.id
yield raw_commit, state
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py b/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
index ef6997f7a..bd4ef6834 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
@@ -16,7 +16,7 @@
from typing import Iterable
from azuredevops.api import AzureDevOpsAPI
-from azuredevops.models import AzureDevOpsConnection, Job, Build, GitRepository
+from azuredevops.models import Job, Build, GitRepository
from azuredevops.streams.builds import Builds
from pydevlake import Context, Substream, DomainType
import pydevlake.domain_layer.devops as devops
@@ -28,10 +28,9 @@ class Jobs(Substream):
parent_stream = Builds
def collect(self, state, context, parent: Build) -> Iterable[tuple[object, dict]]:
- connection: AzureDevOpsConnection = context.connection
repo: GitRepository = context.scope
- azuredevops_api = AzureDevOpsAPI(connection.base_url, connection.pat)
- response = azuredevops_api.jobs(repo.org_id, repo.project_id, parent.id)
+ api = AzureDevOpsAPI(context.connection)
+ response = api.jobs(repo.org_id, repo.project_id, parent.id)
if response.status != 200:
yield None, state
else:
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/pull_request_commits.py b/backend/python/plugins/azuredevops/azuredevops/streams/pull_request_commits.py
index 560833c9b..3e8bb6abb 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/pull_request_commits.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/pull_request_commits.py
@@ -29,9 +29,8 @@ class GitPullRequestCommits(Substream):
parent_stream = GitPullRequests
def collect(self, state, context, parent: GitPullRequest) -> Iterable[tuple[object, dict]]:
- connection = context.connection
repo: GitRepository = context.scope
- azuredevops_api = AzureDevOpsAPI(connection.base_url, connection.pat)
+ azuredevops_api = AzureDevOpsAPI(context.connection)
response = azuredevops_api.git_repo_pull_request_commits(repo.org_id, repo.project_id, parent.repo_id, parent.id)
for raw_commit in response:
raw_commit["repo_id"] = parent.repo_id
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py b/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py
index 1267de21d..733e930b2 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py
@@ -31,9 +31,9 @@ class GitPullRequests(Stream):
def collect(self, state, context) -> Iterable[tuple[object, dict]]:
connection = context.connection
- azuredevops_api = AzureDevOpsAPI(connection.base_url, connection.pat)
+ api = AzureDevOpsAPI(context.connection)
repo: GitRepository = context.scope
- response = azuredevops_api.git_repo_pull_requests(repo.org_id, repo.project_id, repo.id)
+ response = api.git_repo_pull_requests(repo.org_id, repo.project_id, repo.id)
for raw_pr in response:
yield raw_pr, state
diff --git a/backend/python/pydevlake/pydevlake/api.py b/backend/python/pydevlake/pydevlake/api.py
index 14772d0c9..16caa1abf 100644
--- a/backend/python/pydevlake/pydevlake/api.py
+++ b/backend/python/pydevlake/pydevlake/api.py
@@ -23,7 +23,8 @@ import time
import requests as req
-from pydevlake import logger
+from pydevlake.logger import logger
+from pydevlake.model import Connection
RouteArgs = Union[list[str], dict[str, str]]
@@ -83,12 +84,19 @@ class APIBase:
Hooks are declared by decorating methods with `@request_hook` and `@response_hook`.
Hooks are executed in the order they are declared.
"""
+ def __init__(self, connection: Connection):
+ self.connection = connection
+
@property
def session(self):
if not hasattr(self, '_session'):
self._session = req.Session()
return self._session
+ @property
+ def proxy(self):
+ return self.connection.proxy
+
@property
def base_url(self) -> Optional[str]:
return None
@@ -98,10 +106,15 @@ class APIBase:
if request is ABORT:
return ABORT
+ proxies = {}
+ if self.proxy:
+ proxies['http'] = self.proxy
+ proxies['https'] = self.proxy
res = self.session.get(
url=request.url,
headers=request.headers,
- params=request.query_args
+ params=request.query_args,
+ proxies=proxies
)
response = Response(
diff --git a/backend/python/pydevlake/pydevlake/ipc.py b/backend/python/pydevlake/pydevlake/ipc.py
index b55ce1147..2e2a33598 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -15,8 +15,9 @@
import os
+import json
from functools import wraps
-from typing import Generator, TextIO, Optional
+from typing import Generator, TextIO, Optional, Union
from pydevlake.context import Context
from pydevlake.message import Message
@@ -67,13 +68,20 @@ class PluginCommands:
@plugin_method
def test_connection(self, connection: dict):
+ connection = self._parse(connection)
connection = self._plugin.connection_type(**connection)
self._plugin.test_connection(connection)
@plugin_method
- def make_pipeline(self, scopes: list[dict], connection: dict):
- s = [self._plugin.tool_scope_type(**data) for data in scopes]
- return self._plugin.make_pipeline(s, connection['id'])
+ def make_pipeline(self, scopes: list[dict], entities: list[str], connection: dict):
+ scopes = self._parse(scopes)
+ connection = self._parse(connection)
+ entities = self._parse(entities)
+ tool_scopes = [
+ self._plugin.tool_scope_type(**self._parse(data))
+ for data in scopes
+ ]
+ return self._plugin.make_pipeline(tool_scopes, entities, connection['id'])
@plugin_method
def run_migrations(self, force: bool):
@@ -85,6 +93,7 @@ class PluginCommands:
@plugin_method
def remote_scopes(self, connection: dict, group_id: Optional[str] = None):
+ connection = self._parse(connection)
c = self._plugin.connection_type(**connection)
return self._plugin.make_remote_scopes(c, group_id)
@@ -92,12 +101,24 @@ class PluginCommands:
self._plugin.startup(endpoint)
def _mk_context(self, data: dict):
+ data = self._parse(data)
db_url = data['db_url']
- scope = self._plugin.tool_scope_type(**data['scope'])
- connection = self._plugin.connection_type(**data['connection'])
+ scope_dict = self._parse(data['scope'])
+ scope = self._plugin.tool_scope_type(**scope_dict)
+ connection_dict = self._parse(data['connection'])
+ connection = self._plugin.connection_type(**connection_dict)
if self._plugin.transformation_rule_type:
- transformation_rule = self._plugin.transformation_rule_type(**data['transformation_rule'])
+ transformation_rule_dict = self._parse(data['transformation_rule'])
+ transformation_rule = self._plugin.transformation_rule_type(**transformation_rule_dict)
else:
transformation_rule = None
options = data.get('options', {})
return Context(db_url, scope, connection, transformation_rule, options)
+
+ def _parse(self, data: Union[str, dict]) -> Union[dict, list]:
+ if isinstance(data, dict):
+ return data
+ try:
+ return json.loads(data)
+ except json.JSONDecodeError as e:
+ raise Exception(f"Invalid JSON: {e.msg}")
diff --git a/backend/python/pydevlake/pydevlake/model.py b/backend/python/pydevlake/pydevlake/model.py
index c5e865065..158eaffd1 100644
--- a/backend/python/pydevlake/pydevlake/model.py
+++ b/backend/python/pydevlake/pydevlake/model.py
@@ -17,19 +17,21 @@
import os
from typing import Optional
from inspect import getmodule
-
from datetime import datetime
-from sqlalchemy import Column, String, DateTime, func
+
+import inflect
+from pydantic import AnyUrl
+from sqlalchemy import Column, DateTime, func
from sqlalchemy.orm import declared_attr
from sqlalchemy.inspection import inspect
from sqlmodel import SQLModel, Field
-import inflect
+
inflect_engine = inflect.engine()
class Model(SQLModel):
- id: int = Field(primary_key=True)
+ id: Optional[int] = Field(primary_key=True)
created_at: Optional[datetime] = Field(
sa_column=Column(DateTime(), default=func.now())
)
@@ -47,6 +49,7 @@ class ToolTable(Model):
class Connection(ToolTable):
name: str
+ proxy: Optional[AnyUrl]
class TransformationRule(ToolTable):
diff --git a/backend/python/pydevlake/pydevlake/plugin.py b/backend/python/pydevlake/pydevlake/plugin.py
index 2b1bc4acf..bfe781473 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -119,11 +119,11 @@ class Plugin(ABC):
scopes = self.remote_scope_groups(connection)
return msg.RemoteScopes(__root__=scopes)
- def make_pipeline(self, tool_scopes: list[ToolScope], connection_id: int):
+ def make_pipeline(self, tool_scopes: list[ToolScope], entity_types: list[str], connection_id: int):
"""
Make a simple pipeline using the scopes declared by the plugin.
"""
- plan = self.make_pipeline_plan(tool_scopes, connection_id)
+ plan = self.make_pipeline_plan(tool_scopes, entity_types, connection_id)
domain_scopes = [
msg.DynamicDomainScope(
type_name=type(scope).__name__,
@@ -137,15 +137,15 @@ class Plugin(ABC):
scopes=domain_scopes
)
- def make_pipeline_plan(self, scopes: list[ToolScope], connection_id: int) -> list[list[msg.PipelineTask]]:
- return [self.make_pipeline_stage(scope, connection_id) for scope in scopes]
+ def make_pipeline_plan(self, scopes: list[ToolScope], entity_types: list[str], connection_id: int) -> list[list[msg.PipelineTask]]:
+ return [self.make_pipeline_stage(scope, entity_types, connection_id) for scope in scopes]
- def make_pipeline_stage(self, scope: ToolScope, connection_id: int) -> list[msg.PipelineTask]:
+ def make_pipeline_stage(self, scope: ToolScope, entity_types: list[str], connection_id: int) -> list[msg.PipelineTask]:
return [
msg.PipelineTask(
plugin=self.name,
skipOnFail=False,
- subtasks=[t.name for t in self.subtasks],
+ subtasks=self.select_subtasks(scope, entity_types),
options={
"scopeId": scope.id,
"scopeName": scope.name,
@@ -154,6 +154,17 @@ class Plugin(ABC):
)
]
+ def select_subtasks(self, scope: ToolScope, entity_types: list[str]) -> list[str]:
+ """
+ Returns the list of subtasks names that should be run for given scope and entity types.
+ """
+ subtasks = []
+ for stream in self._streams.values():
+ if set(stream.domain_types).intersection(entity_types) and stream.should_run_on(scope):
+ for subtask in stream.subtasks:
+ subtasks.append(subtask.name)
+ return subtasks
+
def get_stream(self, stream_name: str):
stream = self._streams.get(stream_name)
if stream is None:
diff --git a/backend/python/pydevlake/pydevlake/stream.py b/backend/python/pydevlake/pydevlake/stream.py
index 108dc106e..54c8288de 100644
--- a/backend/python/pydevlake/pydevlake/stream.py
+++ b/backend/python/pydevlake/pydevlake/stream.py
@@ -20,7 +20,7 @@ from enum import Enum
from pydevlake.context import Context
from pydevlake.subtasks import Collector, Extractor, Convertor, SubstreamCollector
-from pydevlake.model import RawModel, ToolModel, DomainModel
+from pydevlake.model import RawModel, ToolModel, ToolScope, DomainModel
class DomainType(Enum):
@@ -91,6 +91,9 @@ class Stream:
def convert(self, tool_model: ToolModel, context: Context) -> DomainModel:
pass
+ def should_run_on(self, scope: ToolScope) -> bool:
+ return True
+
class Substream(Stream):
def __init__(self, plugin_name: str):
diff --git a/backend/server/services/remote/plugin/connection_api.go b/backend/server/services/remote/plugin/connection_api.go
index a5db952fb..7ad041302 100644
--- a/backend/server/services/remote/plugin/connection_api.go
+++ b/backend/server/services/remote/plugin/connection_api.go
@@ -28,9 +28,9 @@ import (
func (pa *pluginAPI) TestConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
err := pa.invoker.Call("test-connection", bridge.DefaultContext, input.Body).Err
if err != nil {
- return nil, err
+ return &plugin.ApiResourceOutput{Body: false, Status: 401}, nil
}
- return nil, nil
+ return &plugin.ApiResourceOutput{Body: true, Status: 200}, nil
}
func (pa *pluginAPI) PostConnections(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
diff --git a/backend/server/services/remote/plugin/plugin_extensions.go b/backend/server/services/remote/plugin/plugin_extensions.go
index b60beb4b3..8c5bedbd2 100644
--- a/backend/server/services/remote/plugin/plugin_extensions.go
+++ b/backend/server/services/remote/plugin/plugin_extensions.go
@@ -59,8 +59,10 @@ func (p remoteDatasourcePlugin) MakeDataSourcePipelinePlanV200(connectionId uint
toolScopes[i] = toolScope.Unwrap()
}
+ entities := bpScopes[0].Entities
+
plan_data := models.PipelineData{}
- err = p.invoker.Call("make-pipeline", bridge.DefaultContext, toolScopes, connection.Unwrap()).Get(&plan_data)
+ err = p.invoker.Call("make-pipeline", bridge.DefaultContext, toolScopes, entities, connection.Unwrap()).Get(&plan_data)
if err != nil {
return nil, nil, err
}