You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2023/03/16 06:08:33 UTC

[incubator-devlake] branch main updated: Azuredevops: Fix test-connection command (#4679)

This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new fd072b658 Azuredevops: Fix test-connection command (#4679)
fd072b658 is described below

commit fd072b6584ca7e15046e80ad4f893e7f155f97d7
Author: Camille Teruel <ca...@gmail.com>
AuthorDate: Thu Mar 16 07:08:28 2023 +0100

    Azuredevops: Fix test-connection command (#4679)
    
    * feat: Better error messages for malformed JSON
    
    The "fire" module parses JSON command arguments as python dict or list.
    But when the JSON argument is malformed, fire will just pass a str, silencing any decoding error.
    So each time we expect a dict or a list, we reparse the potentially invalid JSON data to get and return the decoding error.
    
    * fix: Make Model.id optional
    
    This id is not necessarily know, e.g. when testing a connection that is not stored in DB yet.
    
    * fix: Remove leftover ref to connection.org
    
    * refactor: Remove AzureDevOpsConnection.base_url
    
    Remove base_url attr because we won't support on-prem Azure DevOps Server soon.
    
    * feat: Add support for proxy
    
    Every connection can have a proxy attribute that will be used when sending requests.
    AzureDevOpsConnection now instanciated from connection.
    
    * fix: Do not return error in case of invalid connection
    
    The TestConnection endpoint is supposed to return a "false" payload in case of invalid connection
    
    * feat: Automatically filter out subtasks based on requested entity types
    
    ---------
    
    Co-authored-by: Camille Teruel <ca...@meri.co>
---
 .../python/plugins/azuredevops/azuredevops/api.py  | 16 ++++------
 .../python/plugins/azuredevops/azuredevops/main.py |  8 ++---
 .../plugins/azuredevops/azuredevops/models.py      |  1 -
 .../azuredevops/azuredevops/streams/builds.py      |  7 ++---
 .../azuredevops/azuredevops/streams/commits.py     |  5 ++--
 .../azuredevops/azuredevops/streams/jobs.py        |  7 ++---
 .../azuredevops/streams/pull_request_commits.py    |  3 +-
 .../azuredevops/streams/pull_requests.py           |  4 +--
 backend/python/pydevlake/pydevlake/api.py          | 17 +++++++++--
 backend/python/pydevlake/pydevlake/ipc.py          | 35 +++++++++++++++++-----
 backend/python/pydevlake/pydevlake/model.py        | 11 ++++---
 backend/python/pydevlake/pydevlake/plugin.py       | 23 ++++++++++----
 backend/python/pydevlake/pydevlake/stream.py       |  5 +++-
 .../services/remote/plugin/connection_api.go       |  4 +--
 .../services/remote/plugin/plugin_extensions.go    |  4 ++-
 15 files changed, 96 insertions(+), 54 deletions(-)

diff --git a/backend/python/plugins/azuredevops/azuredevops/api.py b/backend/python/plugins/azuredevops/azuredevops/api.py
index 825f9b392..352eb3955 100644
--- a/backend/python/plugins/azuredevops/azuredevops/api.py
+++ b/backend/python/plugins/azuredevops/azuredevops/api.py
@@ -18,6 +18,8 @@ import base64
 
 from pydevlake.api import API, request_hook, Paginator, Request
 
+from azuredevops.models import AzureDevOpsConnection
+
 
 class AzurePaginator(Paginator):
     def get_items(self, response) -> Optional[list[object]]:
@@ -32,20 +34,12 @@ class AzurePaginator(Paginator):
 
 class AzureDevOpsAPI(API):
     paginator = AzurePaginator()
-
-    def __init__(self, base_url: str, pat: str):
-        self._base_url = base_url or "https://dev.azure.com/"
-        self.pat = pat
-
-    @property
-    def base_url(self):
-        return self._base_url
+    base_url = "https://dev.azure.com/"
 
     @request_hook
     def authenticate(self, request: Request):
-        if self.pat:
-            pat_b64 = base64.b64encode((':' + self.pat).encode()).decode()
-            request.headers['Authorization'] = 'Basic ' + pat_b64
+        pat_b64 = base64.b64encode((':' + self.connection.pat).encode()).decode()
+        request.headers['Authorization'] = 'Basic ' + pat_b64
 
     @request_hook
     def set_api_version(self, request: Request):
diff --git a/backend/python/plugins/azuredevops/azuredevops/main.py b/backend/python/plugins/azuredevops/azuredevops/main.py
index ec03ef345..5b5bc3841 100644
--- a/backend/python/plugins/azuredevops/azuredevops/main.py
+++ b/backend/python/plugins/azuredevops/azuredevops/main.py
@@ -51,7 +51,7 @@ class AzureDevOpsPlugin(Plugin):
         )
 
     def remote_scope_groups(self, ctx) -> list[RemoteScopeGroup]:
-        api = AzureDevOpsAPI(ctx.connection.base_url, ctx.connection.pat)
+        api = AzureDevOpsAPI(ctx.connection)
         member_id = api.my_profile.json['id']
         accounts = api.accounts(member_id).json
         orgs = [acc['accountId'] for acc in accounts]
@@ -64,7 +64,7 @@ class AzureDevOpsPlugin(Plugin):
 
     def remote_scopes(self, ctx, group_id: str) -> list[GitRepository]:
         org, proj = group_id.split('/')
-        api = AzureDevOpsAPI(ctx.connection.base_url, ctx.connection.pat)
+        api = AzureDevOpsAPI(ctx.connection)
         for raw_repo in api.git_repos(org, proj):
             repo = GitRepository(**raw_repo, project_id=proj, org_id=org)
             if not repo.defaultBranch:
@@ -74,9 +74,9 @@ class AzureDevOpsPlugin(Plugin):
             yield repo
 
     def test_connection(self, connection: AzureDevOpsConnection):
-        resp = AzureDevOpsAPI(connection.base_url, connection.pat).projects(connection.org)
+        resp = AzureDevOpsAPI(connection).my_profile()
         if resp.status != 200:
-            raise Exception(f"Invalid connection: {resp.json}")
+            raise Exception(f"Invalid token: {connection.token}")
 
     @property
     def streams(self):
diff --git a/backend/python/plugins/azuredevops/azuredevops/models.py b/backend/python/plugins/azuredevops/azuredevops/models.py
index 8f2575052..a6f72376f 100644
--- a/backend/python/plugins/azuredevops/azuredevops/models.py
+++ b/backend/python/plugins/azuredevops/azuredevops/models.py
@@ -26,7 +26,6 @@ default_date = datetime.datetime.fromisoformat("1970-01-01")
 
 
 class AzureDevOpsConnection(Connection):
-    base_url: str
     pat: str
 
 
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/builds.py b/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
index 3996e4be9..574810203 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
@@ -18,7 +18,7 @@ from typing import Iterable
 import iso8601 as iso8601
 
 from azuredevops.api import AzureDevOpsAPI
-from azuredevops.models import AzureDevOpsConnection, GitRepository
+from azuredevops.models import GitRepository
 from azuredevops.models import Build
 from pydevlake import Context, DomainType, Stream, logger
 import pydevlake.domain_layer.devops as devops
@@ -29,10 +29,9 @@ class Builds(Stream):
     domain_types = [DomainType.CICD]
 
     def collect(self, state, context) -> Iterable[tuple[object, dict]]:
-        connection: AzureDevOpsConnection = context.connection
         repo: GitRepository = context.scope
-        azuredevops_api = AzureDevOpsAPI(connection.base_url, connection.pat)
-        response = azuredevops_api.builds(repo.org_id, repo.project_id, repo.id, repo.provider)
+        api = AzureDevOpsAPI(context.connection)
+        response = api.builds(repo.org_id, repo.project_id, repo.id, repo.provider)
         for raw_build in response:
             yield raw_build, state
 
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/commits.py b/backend/python/plugins/azuredevops/azuredevops/streams/commits.py
index 3e02ef2f2..fb083f2a5 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/commits.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/commits.py
@@ -29,10 +29,9 @@ class GitCommits(Stream):
     domain_types = [DomainType.CODE]
 
     def collect(self, state, context) -> Iterable[tuple[object, dict]]:
-        connection = context.connection
         repo: GitRepository = context.scope
-        azuredevops_api = AzureDevOpsAPI(connection.base_url, connection.pat)
-        response = azuredevops_api.commits(repo.org_id, repo.project_id, repo.id)
+        api = AzureDevOpsAPI(context.connection)
+        response = api.commits(repo.org_id, repo.project_id, repo.id)
         for raw_commit in response:
             raw_commit["repo_id"] = repo.id
             yield raw_commit, state
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py b/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
index ef6997f7a..bd4ef6834 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
@@ -16,7 +16,7 @@
 from typing import Iterable
 
 from azuredevops.api import AzureDevOpsAPI
-from azuredevops.models import AzureDevOpsConnection, Job, Build, GitRepository
+from azuredevops.models import Job, Build, GitRepository
 from azuredevops.streams.builds import Builds
 from pydevlake import Context, Substream, DomainType
 import pydevlake.domain_layer.devops as devops
@@ -28,10 +28,9 @@ class Jobs(Substream):
     parent_stream = Builds
 
     def collect(self, state, context, parent: Build) -> Iterable[tuple[object, dict]]:
-        connection: AzureDevOpsConnection = context.connection
         repo: GitRepository = context.scope
-        azuredevops_api = AzureDevOpsAPI(connection.base_url, connection.pat)
-        response = azuredevops_api.jobs(repo.org_id, repo.project_id, parent.id)
+        api = AzureDevOpsAPI(context.connection)
+        response = api.jobs(repo.org_id, repo.project_id, parent.id)
         if response.status != 200:
             yield None, state
         else:
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/pull_request_commits.py b/backend/python/plugins/azuredevops/azuredevops/streams/pull_request_commits.py
index 560833c9b..3e8bb6abb 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/pull_request_commits.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/pull_request_commits.py
@@ -29,9 +29,8 @@ class GitPullRequestCommits(Substream):
     parent_stream = GitPullRequests
 
     def collect(self, state, context, parent: GitPullRequest) -> Iterable[tuple[object, dict]]:
-        connection = context.connection
         repo: GitRepository = context.scope
-        azuredevops_api = AzureDevOpsAPI(connection.base_url, connection.pat)
+        azuredevops_api = AzureDevOpsAPI(context.connection)
         response = azuredevops_api.git_repo_pull_request_commits(repo.org_id, repo.project_id, parent.repo_id, parent.id)
         for raw_commit in response:
             raw_commit["repo_id"] = parent.repo_id
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py b/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py
index 1267de21d..733e930b2 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py
@@ -31,9 +31,9 @@ class GitPullRequests(Stream):
 
     def collect(self, state, context) -> Iterable[tuple[object, dict]]:
         connection = context.connection
-        azuredevops_api = AzureDevOpsAPI(connection.base_url, connection.pat)
+        api = AzureDevOpsAPI(context.connection)
         repo: GitRepository = context.scope
-        response = azuredevops_api.git_repo_pull_requests(repo.org_id, repo.project_id, repo.id)
+        response = api.git_repo_pull_requests(repo.org_id, repo.project_id, repo.id)
         for raw_pr in response:
             yield raw_pr, state
 
diff --git a/backend/python/pydevlake/pydevlake/api.py b/backend/python/pydevlake/pydevlake/api.py
index 14772d0c9..16caa1abf 100644
--- a/backend/python/pydevlake/pydevlake/api.py
+++ b/backend/python/pydevlake/pydevlake/api.py
@@ -23,7 +23,8 @@ import time
 
 import requests as req
 
-from pydevlake import logger
+from pydevlake.logger import logger
+from pydevlake.model import Connection
 
 
 RouteArgs = Union[list[str], dict[str, str]]
@@ -83,12 +84,19 @@ class APIBase:
     Hooks are declared by decorating methods with `@request_hook` and `@response_hook`.
     Hooks are executed in the order they are declared.
     """
+    def __init__(self, connection: Connection):
+        self.connection = connection
+
     @property
     def session(self):
         if not hasattr(self, '_session'):
             self._session = req.Session()
         return self._session
 
+    @property
+    def proxy(self):
+        return self.connection.proxy
+
     @property
     def base_url(self) -> Optional[str]:
         return None
@@ -98,10 +106,15 @@ class APIBase:
         if request is ABORT:
             return ABORT
 
+        proxies = {}
+        if self.proxy:
+            proxies['http'] = self.proxy
+            proxies['https'] = self.proxy
         res = self.session.get(
             url=request.url,
             headers=request.headers,
-            params=request.query_args
+            params=request.query_args,
+            proxies=proxies
         )
 
         response = Response(
diff --git a/backend/python/pydevlake/pydevlake/ipc.py b/backend/python/pydevlake/pydevlake/ipc.py
index b55ce1147..2e2a33598 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -15,8 +15,9 @@
 
 
 import os
+import json
 from functools import wraps
-from typing import Generator, TextIO, Optional
+from typing import Generator, TextIO, Optional, Union
 
 from pydevlake.context import Context
 from pydevlake.message import Message
@@ -67,13 +68,20 @@ class PluginCommands:
 
     @plugin_method
     def test_connection(self, connection: dict):
+        connection = self._parse(connection)
         connection = self._plugin.connection_type(**connection)
         self._plugin.test_connection(connection)
 
     @plugin_method
-    def make_pipeline(self, scopes: list[dict], connection: dict):
-        s = [self._plugin.tool_scope_type(**data) for data in scopes]
-        return self._plugin.make_pipeline(s, connection['id'])
+    def make_pipeline(self, scopes: list[dict], entities: list[str], connection: dict):
+        scopes = self._parse(scopes)
+        connection = self._parse(connection)
+        entities = self._parse(entities)
+        tool_scopes = [
+            self._plugin.tool_scope_type(**self._parse(data))
+            for data in scopes
+        ]
+        return self._plugin.make_pipeline(tool_scopes, entities, connection['id'])
 
     @plugin_method
     def run_migrations(self, force: bool):
@@ -85,6 +93,7 @@ class PluginCommands:
 
     @plugin_method
     def remote_scopes(self, connection: dict, group_id: Optional[str] = None):
+        connection = self._parse(connection)
         c = self._plugin.connection_type(**connection)
         return self._plugin.make_remote_scopes(c, group_id)
 
@@ -92,12 +101,24 @@ class PluginCommands:
         self._plugin.startup(endpoint)
 
     def _mk_context(self, data: dict):
+        data = self._parse(data)
         db_url = data['db_url']
-        scope = self._plugin.tool_scope_type(**data['scope'])
-        connection = self._plugin.connection_type(**data['connection'])
+        scope_dict = self._parse(data['scope'])
+        scope = self._plugin.tool_scope_type(**scope_dict)
+        connection_dict = self._parse(data['connection'])
+        connection = self._plugin.connection_type(**connection_dict)
         if self._plugin.transformation_rule_type:
-            transformation_rule = self._plugin.transformation_rule_type(**data['transformation_rule'])
+            transformation_rule_dict = self._parse(data['transformation_rule'])
+            transformation_rule = self._plugin.transformation_rule_type(**transformation_rule_dict)
         else:
             transformation_rule = None
         options = data.get('options', {})
         return Context(db_url, scope, connection, transformation_rule, options)
+
+    def _parse(self, data: Union[str, dict]) -> Union[dict, list]:
+        if isinstance(data, dict):
+            return data
+        try:
+            return json.loads(data)
+        except json.JSONDecodeError as e:
+            raise Exception(f"Invalid JSON: {e.msg}")
diff --git a/backend/python/pydevlake/pydevlake/model.py b/backend/python/pydevlake/pydevlake/model.py
index c5e865065..158eaffd1 100644
--- a/backend/python/pydevlake/pydevlake/model.py
+++ b/backend/python/pydevlake/pydevlake/model.py
@@ -17,19 +17,21 @@
 import os
 from typing import Optional
 from inspect import getmodule
-
 from datetime import datetime
-from sqlalchemy import Column, String, DateTime, func
+
+import inflect
+from pydantic import AnyUrl
+from sqlalchemy import Column, DateTime, func
 from sqlalchemy.orm import declared_attr
 from sqlalchemy.inspection import inspect
 from sqlmodel import SQLModel, Field
-import inflect
+
 
 inflect_engine = inflect.engine()
 
 
 class Model(SQLModel):
-    id: int = Field(primary_key=True)
+    id: Optional[int] = Field(primary_key=True)
     created_at: Optional[datetime] = Field(
         sa_column=Column(DateTime(), default=func.now())
     )
@@ -47,6 +49,7 @@ class ToolTable(Model):
 
 class Connection(ToolTable):
     name: str
+    proxy: Optional[AnyUrl]
 
 
 class TransformationRule(ToolTable):
diff --git a/backend/python/pydevlake/pydevlake/plugin.py b/backend/python/pydevlake/pydevlake/plugin.py
index 2b1bc4acf..bfe781473 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -119,11 +119,11 @@ class Plugin(ABC):
             scopes = self.remote_scope_groups(connection)
         return msg.RemoteScopes(__root__=scopes)
 
-    def make_pipeline(self, tool_scopes: list[ToolScope], connection_id: int):
+    def make_pipeline(self, tool_scopes: list[ToolScope], entity_types: list[str], connection_id: int):
         """
         Make a simple pipeline using the scopes declared by the plugin.
         """
-        plan = self.make_pipeline_plan(tool_scopes, connection_id)
+        plan = self.make_pipeline_plan(tool_scopes, entity_types, connection_id)
         domain_scopes = [
             msg.DynamicDomainScope(
                 type_name=type(scope).__name__,
@@ -137,15 +137,15 @@ class Plugin(ABC):
             scopes=domain_scopes
         )
 
-    def make_pipeline_plan(self, scopes: list[ToolScope], connection_id: int) -> list[list[msg.PipelineTask]]:
-        return [self.make_pipeline_stage(scope, connection_id) for scope in scopes]
+    def make_pipeline_plan(self, scopes: list[ToolScope], entity_types: list[str], connection_id: int) -> list[list[msg.PipelineTask]]:
+        return [self.make_pipeline_stage(scope, entity_types, connection_id) for scope in scopes]
 
-    def make_pipeline_stage(self, scope: ToolScope, connection_id: int) -> list[msg.PipelineTask]:
+    def make_pipeline_stage(self, scope: ToolScope, entity_types: list[str], connection_id: int) -> list[msg.PipelineTask]:
         return [
             msg.PipelineTask(
                 plugin=self.name,
                 skipOnFail=False,
-                subtasks=[t.name for t in self.subtasks],
+                subtasks=self.select_subtasks(scope, entity_types),
                 options={
                     "scopeId": scope.id,
                     "scopeName": scope.name,
@@ -154,6 +154,17 @@ class Plugin(ABC):
             )
         ]
 
+    def select_subtasks(self, scope: ToolScope, entity_types: list[str]) -> list[str]:
+        """
+        Returns the list of subtasks names that should be run for given scope and entity types.
+        """
+        subtasks = []
+        for stream in self._streams.values():
+            if set(stream.domain_types).intersection(entity_types) and stream.should_run_on(scope):
+                for subtask in stream.subtasks:
+                    subtasks.append(subtask.name)
+        return subtasks
+
     def get_stream(self, stream_name: str):
         stream = self._streams.get(stream_name)
         if stream is None:
diff --git a/backend/python/pydevlake/pydevlake/stream.py b/backend/python/pydevlake/pydevlake/stream.py
index 108dc106e..54c8288de 100644
--- a/backend/python/pydevlake/pydevlake/stream.py
+++ b/backend/python/pydevlake/pydevlake/stream.py
@@ -20,7 +20,7 @@ from enum import Enum
 
 from pydevlake.context import Context
 from pydevlake.subtasks import Collector, Extractor, Convertor, SubstreamCollector
-from pydevlake.model import RawModel, ToolModel, DomainModel
+from pydevlake.model import RawModel, ToolModel, ToolScope, DomainModel
 
 
 class DomainType(Enum):
@@ -91,6 +91,9 @@ class Stream:
     def convert(self, tool_model: ToolModel, context: Context) -> DomainModel:
         pass
 
+    def should_run_on(self, scope: ToolScope) -> bool:
+        return True
+
 
 class Substream(Stream):
     def __init__(self, plugin_name: str):
diff --git a/backend/server/services/remote/plugin/connection_api.go b/backend/server/services/remote/plugin/connection_api.go
index a5db952fb..7ad041302 100644
--- a/backend/server/services/remote/plugin/connection_api.go
+++ b/backend/server/services/remote/plugin/connection_api.go
@@ -28,9 +28,9 @@ import (
 func (pa *pluginAPI) TestConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
 	err := pa.invoker.Call("test-connection", bridge.DefaultContext, input.Body).Err
 	if err != nil {
-		return nil, err
+		return &plugin.ApiResourceOutput{Body: false, Status: 401}, nil
 	}
-	return nil, nil
+	return &plugin.ApiResourceOutput{Body: true, Status: 200}, nil
 }
 
 func (pa *pluginAPI) PostConnections(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
diff --git a/backend/server/services/remote/plugin/plugin_extensions.go b/backend/server/services/remote/plugin/plugin_extensions.go
index b60beb4b3..8c5bedbd2 100644
--- a/backend/server/services/remote/plugin/plugin_extensions.go
+++ b/backend/server/services/remote/plugin/plugin_extensions.go
@@ -59,8 +59,10 @@ func (p remoteDatasourcePlugin) MakeDataSourcePipelinePlanV200(connectionId uint
 		toolScopes[i] = toolScope.Unwrap()
 	}
 
+	entities := bpScopes[0].Entities
+
 	plan_data := models.PipelineData{}
-	err = p.invoker.Call("make-pipeline", bridge.DefaultContext, toolScopes, connection.Unwrap()).Get(&plan_data)
+	err = p.invoker.Call("make-pipeline", bridge.DefaultContext, toolScopes, entities, connection.Unwrap()).Get(&plan_data)
 	if err != nil {
 		return nil, nil, err
 	}