You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by he...@apache.org on 2023/02/21 19:06:30 UTC

[incubator-devlake] branch main updated: Blueprint v200 python plugins (#4480)

This is an automated email from the ASF dual-hosted git repository.

hez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 967823f8e Blueprint v200 python plugins (#4480)
967823f8e is described below

commit 967823f8e0363f5a6297ad076ce177109b899c5b
Author: Camille Teruel <ca...@gmail.com>
AuthorDate: Tue Feb 21 20:06:24 2023 +0100

    Blueprint v200 python plugins (#4480)
    
    * feat: Generic handler for 429 responses
    
    * refactor: Extract code from PagedResponse.__iter__
    
    Extract `items` and `next_page_request` properties from `__iter__` to let
    clients iterate over page manually.
    
    * refactor: Use same struct for all plugin API areas
    
    * feat: Add remote plugins endpoint for python plugins
    
    * fix: Pass connection to make-pipeline plugin command
    
    ---------
    
    Co-authored-by: Camille Teruel <ca...@meri.co>
---
 backend/python/pydevlake/pydevlake/__init__.py     |  2 +-
 backend/python/pydevlake/pydevlake/api.py          | 70 ++++++++++++++++------
 backend/python/pydevlake/pydevlake/ipc.py          |  5 ++
 backend/python/pydevlake/pydevlake/message.py      |  5 ++
 backend/python/pydevlake/pydevlake/plugin.py       |  6 +-
 .../services/remote/plugin/connection_api.go       | 47 +++++++--------
 .../server/services/remote/plugin/default_api.go   | 55 +++++++++--------
 .../services/remote/plugin/plugin_extensions.go    | 15 ++---
 .../services/remote/plugin/remote_scope_api.go     | 61 +++++++++++++++++++
 backend/server/services/remote/plugin/scope_api.go | 17 ++----
 .../remote/plugin/transformation_rule_api.go       | 21 +++----
 backend/test/remote/fakeplugin/fakeplugin/main.py  | 14 +++--
 12 files changed, 211 insertions(+), 107 deletions(-)

diff --git a/backend/python/pydevlake/pydevlake/__init__.py b/backend/python/pydevlake/pydevlake/__init__.py
index d45d1ba7d..6d513946d 100644
--- a/backend/python/pydevlake/pydevlake/__init__.py
+++ b/backend/python/pydevlake/pydevlake/__init__.py
@@ -16,7 +16,7 @@
 
 from .model import ToolModel
 from .logger import logger
-from .message import Connection, TransformationRule
+from .message import Connection, TransformationRule, RemoteScope
 from .plugin import Plugin
 from .stream import Stream, Substream
 from .context import Context
diff --git a/backend/python/pydevlake/pydevlake/api.py b/backend/python/pydevlake/pydevlake/api.py
index cc2a930cd..bc29fc474 100644
--- a/backend/python/pydevlake/pydevlake/api.py
+++ b/backend/python/pydevlake/pydevlake/api.py
@@ -17,10 +17,14 @@
 from __future__ import annotations
 
 from typing import Optional, Union
+from urllib.parse import urljoin
+from http import HTTPStatus
 import json
+import time
 
 import requests as req
-from urllib.parse import urljoin
+
+from pydevlake import logger
 
 
 RouteArgs = Union[list[str], dict[str, str]]
@@ -75,7 +79,7 @@ ABORT = object()
 class APIBase:
     """
     The base class for defining APIs.
-    It implements a hook system to preprocess requests before sending them and postprocess response 
+    It implements a hook system to preprocess requests before sending them and postprocess response
     before returning them.
     Hooks are declared by decorating methods with `@request_hook` and `@response_hook`.
     Hooks are executed in the order they are declared.
@@ -121,7 +125,7 @@ class APIBase:
             if isinstance(result, type(target)):
                 target = result
         return target
-                
+
     def get(self, path, **query_args):
         req = Request(urljoin(self.base_url, path), query_args)
         return self.send(req)
@@ -135,7 +139,7 @@ class APIBase:
         if not hasattr(self, '_response_hooks'):
             self._response_hooks = [h for h in self._iter_members() if isinstance(h, ResponseHook)]
         return self._response_hooks
- 
+
     def _iter_members(self):
         for c in reversed(type(self).__mro__):
             for m in c.__dict__.values():
@@ -199,7 +203,7 @@ class Paginator:
     def set_next_page_param(self, request, next_page_id: int | str):
         """
         Modify the request to set the parameter for fetching next page,
-        e.g. set the `page` query parameter. 
+        e.g. set the `page` query parameter.
         """
         pass
 
@@ -214,23 +218,36 @@ class PagedResponse(Response):
         self.paginator = paginator
         self.api = api
 
+    @property
+    def items(self):
+        return self.paginator.get_items(self.response)
+
+    @property
+    def next_page_request(self):
+        next_page_id = self.paginator.get_next_page_id(self.response)
+        if not next_page_id:
+            # No next page
+            return None
+
+        next_request = self.response.request.copy()
+        self.paginator.set_next_page_param(next_request, next_page_id)
+        return next_request
+
     def __iter__(self):
-        current = self.response
+        """
+        Iterate over
+        """
+        current = self
 
         while True:
-            items = self.paginator.get_items(current)
-            for item in items:
-                yield item
+            yield from current.items
 
-            next_page_id = self.paginator.get_next_page_id(current)
-            if not next_page_id:
+            next_page_request = current.next_page_request
+            if not next_page_request:
                 # No next page
                 return
 
-            # Fetch next page
-            next_request = current.request.copy()
-            self.paginator.set_next_page_param(next_request, next_page_id)
-            current = self.api.send(next_request)
+            current = current.api.send(current.next_page_request)
 
     def __getattr__(self, attr_name):
         # Delegate everything to Response
@@ -251,12 +268,12 @@ class TokenPaginator(Paginator):
 
     def set_next_page_param(self, request, next_page_id):
         request.query_args[self.next_page_token_param] = next_page_id
-        
+
 
 class APIException(Exception):
     def __init__(self, response):
         self.response = response
-    
+
     def __str__(self):
         return f'APIException: {self.response}'
 
@@ -267,7 +284,7 @@ class API(APIBase):
     - pagination: define the `paginator` property in subclasses
 
     # TODO:
-    - Error handling response hook: retries, 
+    - Error handling response hook: retries,
     - Rate limitation
     """
     @property
@@ -277,9 +294,24 @@ class API(APIBase):
         """
         return None
 
+    @response_hook
+    def pause_if_too_many_requests(self, response: Response):
+        """
+        Pause execution if a response has a 429 status TOO_MANY_REQUEST.
+        for the number of seconds indicated in the 'Retry-After' header,
+        or 60 seconds if this header is missing.
+        Retry the failed request afterwards.
+        """
+        if response.status == HTTPStatus.TOO_MANY_REQUESTS:
+            retry_after = response.headers.get('Retry-After', 60)
+            logger.warning(f'Got TOO_MANY_REQUESTS response, sleep {int(retry_after)} seconds')
+            time.sleep(retry_after)
+            return self.send(response.request)
+        return response
+
     @response_hook
     def handle_error(self, response):
-        if response.status != 200:
+        if response.status != HTTPStatus.OK:
             raise APIException(response)
 
     @response_hook
diff --git a/backend/python/pydevlake/pydevlake/ipc.py b/backend/python/pydevlake/pydevlake/ipc.py
index 0b8d664e3..b1284007a 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -82,6 +82,11 @@ class PluginCommands:
     def plugin_info(self):
         return self._plugin.plugin_info()
 
+    @plugin_method
+    def remote_scopes(self, connection: dict, query: str = ''):
+        c = self._plugin.connection_type(**connection)
+        self.plugin.remote_scopes(c, query)
+
     def startup(self, endpoint: str):
         self._plugin.startup(endpoint)
 
diff --git a/backend/python/pydevlake/pydevlake/message.py b/backend/python/pydevlake/pydevlake/message.py
index aad4690d0..9b2ba7456 100644
--- a/backend/python/pydevlake/pydevlake/message.py
+++ b/backend/python/pydevlake/pydevlake/message.py
@@ -93,3 +93,8 @@ class PipelineScope(Message):
 class BlueprintScope(Message):
     id: str
     name: str
+
+
+class RemoteScope(Message):
+    id: str
+    name: str
diff --git a/backend/python/pydevlake/pydevlake/plugin.py b/backend/python/pydevlake/pydevlake/plugin.py
index 3f81016d8..fbad2b38d 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -74,6 +74,10 @@ class Plugin:
     def get_scopes(self, scope_name: str, connection: msg.Connection) -> Iterable[DomainModel]:
         pass
 
+    @abstractmethod
+    def remote_scopes(self, connection: msg.Connection, query: str = ''):
+        pass
+
     @property
     def streams(self) -> list[Union[Stream, Type[Stream]]]:
         pass
@@ -114,7 +118,7 @@ class Plugin:
         plan = msg.PipelinePlan(stages=stages)
         yield plan
 
-        scopes = [ 
+        scopes = [
             msg.PipelineScope(
                 id=':'.join([self.name, type(scope).__name__, ctx.connection_id, bp_scope.id]),
                 name=bp_scope.name,
diff --git a/backend/server/services/remote/plugin/connection_api.go b/backend/server/services/remote/plugin/connection_api.go
index 94b09fbee..7be545cba 100644
--- a/backend/server/services/remote/plugin/connection_api.go
+++ b/backend/server/services/remote/plugin/connection_api.go
@@ -18,31 +18,24 @@ limitations under the License.
 package plugin
 
 import (
+	"net/http"
+
 	"github.com/apache/incubator-devlake/core/errors"
-	"github.com/apache/incubator-devlake/core/models"
 	"github.com/apache/incubator-devlake/core/plugin"
-	"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
 	"github.com/apache/incubator-devlake/server/services/remote/bridge"
-	"net/http"
 )
 
-type ConnectionAPI struct {
-	invoker  bridge.Invoker
-	connType *models.DynamicTabler
-	helper   *api.ConnectionApiHelper
-}
-
-func (c *ConnectionAPI) TestConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
-	err := c.invoker.Call("test-connection", bridge.DefaultContext, input.Body).Get()
+func (pa *pluginAPI) TestConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	err := pa.invoker.Call("test-connection", bridge.DefaultContext, input.Body).Get()
 	if err != nil {
 		return nil, err
 	}
 	return nil, nil
 }
 
-func (c *ConnectionAPI) PostConnections(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
-	connection := c.connType.New()
-	err := c.helper.Create(connection, input)
+func (pa *pluginAPI) PostConnections(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	connection := pa.connType.New()
+	err := pa.helper.Create(connection, input)
 	if err != nil {
 		return nil, err
 	}
@@ -50,9 +43,9 @@ func (c *ConnectionAPI) PostConnections(input *plugin.ApiResourceInput) (*plugin
 	return &plugin.ApiResourceOutput{Body: conn, Status: http.StatusOK}, nil
 }
 
-func (c *ConnectionAPI) ListConnections(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
-	connections := c.connType.NewSlice()
-	err := c.helper.List(connections)
+func (pa *pluginAPI) ListConnections(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	connections := pa.connType.NewSlice()
+	err := pa.helper.List(connections)
 	if err != nil {
 		return nil, err
 	}
@@ -60,9 +53,9 @@ func (c *ConnectionAPI) ListConnections(input *plugin.ApiResourceInput) (*plugin
 	return &plugin.ApiResourceOutput{Body: conns}, nil
 }
 
-func (c *ConnectionAPI) GetConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
-	connection := c.connType.New()
-	err := c.helper.First(connection, input.Params)
+func (pa *pluginAPI) GetConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	connection := pa.connType.New()
+	err := pa.helper.First(connection, input.Params)
 	if err != nil {
 		return nil, err
 	}
@@ -70,9 +63,9 @@ func (c *ConnectionAPI) GetConnection(input *plugin.ApiResourceInput) (*plugin.A
 	return &plugin.ApiResourceOutput{Body: conn}, nil
 }
 
-func (c *ConnectionAPI) PatchConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
-	connection := c.connType.New()
-	err := c.helper.Patch(connection, input)
+func (pa *pluginAPI) PatchConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	connection := pa.connType.New()
+	err := pa.helper.Patch(connection, input)
 	if err != nil {
 		return nil, err
 	}
@@ -80,13 +73,13 @@ func (c *ConnectionAPI) PatchConnection(input *plugin.ApiResourceInput) (*plugin
 	return &plugin.ApiResourceOutput{Body: conn, Status: http.StatusOK}, nil
 }
 
-func (c *ConnectionAPI) DeleteConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
-	connection := c.connType.New()
-	err := c.helper.First(connection, input.Params)
+func (pa *pluginAPI) DeleteConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	connection := pa.connType.New()
+	err := pa.helper.First(connection, input.Params)
 	if err != nil {
 		return nil, err
 	}
-	err = c.helper.Delete(connection)
+	err = pa.helper.Delete(connection)
 	conn := connection.Unwrap()
 	return &plugin.ApiResourceOutput{Body: conn}, err
 }
diff --git a/backend/server/services/remote/plugin/default_api.go b/backend/server/services/remote/plugin/default_api.go
index b1fa1879c..59bd80801 100644
--- a/backend/server/services/remote/plugin/default_api.go
+++ b/backend/server/services/remote/plugin/default_api.go
@@ -24,52 +24,59 @@ import (
 	"github.com/apache/incubator-devlake/server/services/remote/bridge"
 )
 
+type pluginAPI struct {
+	invoker    bridge.Invoker
+	connType   *models.DynamicTabler
+	txRuleType *models.DynamicTabler
+	helper     *api.ConnectionApiHelper
+}
+
 func GetDefaultAPI(
 	invoker bridge.Invoker,
 	connType *models.DynamicTabler,
 	txRuleType *models.DynamicTabler,
 	helper *api.ConnectionApiHelper) map[string]map[string]plugin.ApiResourceHandler {
-	connectionApi := &ConnectionAPI{
-		invoker:  invoker,
-		connType: connType,
-		helper:   helper,
-	}
-
-	scopeApi := &ScopeAPI{
-		txRuleType: txRuleType,
-	}
-	txruleApi := &TransformationRuleAPI{
+	papi := &pluginAPI{
+		invoker:    invoker,
+		connType:   connType,
 		txRuleType: txRuleType,
+		helper:     helper,
 	}
 
 	return map[string]map[string]plugin.ApiResourceHandler{
 		"test": {
-			"POST": connectionApi.TestConnection,
+			"POST": papi.TestConnection,
 		},
 		"connections": {
-			"POST": connectionApi.PostConnections,
-			"GET":  connectionApi.ListConnections,
+			"POST": papi.PostConnections,
+			"GET":  papi.ListConnections,
 		},
 		"connections/:connectionId": {
-			"GET":    connectionApi.GetConnection,
-			"PATCH":  connectionApi.PatchConnection,
-			"DELETE": connectionApi.DeleteConnection,
+			"GET":    papi.GetConnection,
+			"PATCH":  papi.PatchConnection,
+			"DELETE": papi.DeleteConnection,
 		},
 		"connections/:connectionId/scopes": {
-			"PUT": scopeApi.PutScope,
-			"GET": scopeApi.ListScopes,
+			"PUT": papi.PutScope,
+			"GET": papi.ListScopes,
 		},
 		"connections/:connectionId/scopes/*scopeId": {
-			"GET":   scopeApi.GetScope,
-			"PATCH": scopeApi.PatchScope,
+			"GET":   papi.GetScope,
+			"PATCH": papi.PatchScope,
+		},
+		"connections/:connectionId/remote-scopes": {
+			"GET": papi.GetRemoteScopes,
+		},
+		"connections/:connectionId/search-remote-scopes": {
+			"GET": papi.GetRemoteScopes,
 		},
 		"transformation_rules": {
-			"POST": txruleApi.PostTransformationRules,
-			"GET":  txruleApi.ListTransformationRules,
+			"POST": papi.PostTransformationRules,
+			"GET":  papi.ListTransformationRules,
 		},
 		"transformation_rules/:id": {
-			"GET":   txruleApi.GetTransformationRule,
-			"PATCH": txruleApi.PatchTransformationRule,
+			"GET":   papi.GetTransformationRule,
+			"PATCH": papi.PatchTransformationRule,
 		},
 	}
 }
diff --git a/backend/server/services/remote/plugin/plugin_extensions.go b/backend/server/services/remote/plugin/plugin_extensions.go
index 537e4640f..e81abeed3 100644
--- a/backend/server/services/remote/plugin/plugin_extensions.go
+++ b/backend/server/services/remote/plugin/plugin_extensions.go
@@ -36,18 +36,19 @@ type (
 )
 
 func (p *remoteMetricPlugin) MakeMetricPluginPipelinePlanV200(projectName string, options json.RawMessage) (plugin.PipelinePlan, errors.Error) {
-	plan := plugin.PipelinePlan{}
-	err := p.invoker.Call("MakeMetricPluginPipelinePlanV200", bridge.DefaultContext, projectName, options).Get(&plan)
-	if err != nil {
-		return nil, err
-	}
-	return plan, err
+	return nil, errors.Internal.New("Remote metric plugins not supported")
 }
 
 func (p *remoteDatasourcePlugin) MakeDataSourcePipelinePlanV200(connectionId uint64, bpScopes []*plugin.BlueprintScopeV200, syncPolicy plugin.BlueprintSyncPolicy) (plugin.PipelinePlan, []plugin.Scope, errors.Error) {
+	connection := p.connectionTabler.New()
+	err := connectionHelper.FirstById(connection, connectionId)
+	if err != nil {
+		return nil, nil, err
+	}
+
 	plan := plugin.PipelinePlan{}
 	var scopes []models.PipelineScope
-	err := p.invoker.Call("make-pipeline", bridge.DefaultContext, connectionId, bpScopes).Get(&plan, &scopes)
+	err = p.invoker.Call("make-pipeline", bridge.DefaultContext, connectionId, bpScopes).Get(&plan, &scopes)
 	if err != nil {
 		return nil, nil, err
 	}
diff --git a/backend/server/services/remote/plugin/remote_scope_api.go b/backend/server/services/remote/plugin/remote_scope_api.go
new file mode 100644
index 000000000..fc428bd9a
--- /dev/null
+++ b/backend/server/services/remote/plugin/remote_scope_api.go
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package plugin
+
+import (
+	"net/http"
+	"strconv"
+
+	"github.com/apache/incubator-devlake/core/errors"
+	"github.com/apache/incubator-devlake/core/plugin"
+	"github.com/apache/incubator-devlake/server/services/remote/bridge"
+)
+
+type RemoteScopesOutput struct {
+	Children []ScopeItem `json:"children"`
+}
+
+func (pa *pluginAPI) GetRemoteScopes(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	connectionId, _ := strconv.ParseUint(input.Params["connectionId"], 10, 64)
+	if connectionId == 0 {
+		return nil, errors.BadInput.New("invalid connectionId")
+	}
+
+	connection := pa.connType.New()
+	err := pa.helper.First(connection, input.Params)
+	if err != nil {
+		return nil, err
+	}
+
+	query, ok := input.Params["query"]
+	if !ok {
+		query = ""
+	}
+
+	var scopes []ScopeItem
+	err = pa.invoker.Call("remote-scopes", bridge.DefaultContext, connection, query).Get(&scopes)
+	if err != nil {
+		return nil, err
+	}
+
+	res := RemoteScopesOutput{
+		Children: scopes,
+	}
+
+	return &plugin.ApiResourceOutput{Body: res, Status: http.StatusOK}, nil
+}
diff --git a/backend/server/services/remote/plugin/scope_api.go b/backend/server/services/remote/plugin/scope_api.go
index 9bbdbaaf8..e793d6119 100644
--- a/backend/server/services/remote/plugin/scope_api.go
+++ b/backend/server/services/remote/plugin/scope_api.go
@@ -25,7 +25,6 @@ import (
 
 	"github.com/apache/incubator-devlake/core/dal"
 	"github.com/apache/incubator-devlake/core/errors"
-	"github.com/apache/incubator-devlake/core/models"
 	"github.com/apache/incubator-devlake/core/plugin"
 	"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
 )
@@ -48,11 +47,7 @@ type request struct {
 	Data []*ScopeItem `json:"data"`
 }
 
-type ScopeAPI struct {
-	txRuleType *models.DynamicTabler
-}
-
-func (s *ScopeAPI) PutScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+func (pa *pluginAPI) PutScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
 	connectionId, _ := extractParam(input.Params)
 	if connectionId == 0 {
 		return nil, errors.BadInput.New("invalid connectionId")
@@ -87,7 +82,7 @@ func (s *ScopeAPI) PutScope(input *plugin.ApiResourceInput) (*plugin.ApiResource
 	return &plugin.ApiResourceOutput{Body: scopes.Data, Status: http.StatusOK}, nil
 }
 
-func (s *ScopeAPI) PatchScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+func (pa *pluginAPI) PatchScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
 	connectionId, scopeId := extractParam(input.Params)
 	if connectionId == 0 {
 		return nil, errors.BadInput.New("invalid connectionId")
@@ -117,7 +112,7 @@ func (s *ScopeAPI) PatchScope(input *plugin.ApiResourceInput) (*plugin.ApiResour
 	return &plugin.ApiResourceOutput{Body: scope, Status: http.StatusOK}, nil
 }
 
-func (s *ScopeAPI) ListScopes(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+func (pa *pluginAPI) ListScopes(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
 	var scopes []ScopeItem
 	connectionId, _ := extractParam(input.Params)
 
@@ -151,7 +146,7 @@ func (s *ScopeAPI) ListScopes(input *plugin.ApiResourceInput) (*plugin.ApiResour
 	if len(ruleIds) > 0 {
 		err = db.All(&txRuleId2Name,
 			dal.Select("id, name"),
-			dal.From(s.txRuleType.TableName()),
+			dal.From(pa.txRuleType.TableName()),
 			dal.Where("id IN (?)", ruleIds))
 		if err != nil {
 			return nil, err
@@ -176,7 +171,7 @@ func (s *ScopeAPI) ListScopes(input *plugin.ApiResourceInput) (*plugin.ApiResour
 	return &plugin.ApiResourceOutput{Body: apiScopes, Status: http.StatusOK}, nil
 }
 
-func (s *ScopeAPI) GetScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+func (pa *pluginAPI) GetScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
 	var scope ScopeItem
 	connectionId, scopeId := extractParam(input.Params)
 	if connectionId == 0 {
@@ -194,7 +189,7 @@ func (s *ScopeAPI) GetScope(input *plugin.ApiResourceInput) (*plugin.ApiResource
 
 	var ruleName string
 	if scope.TransformationRuleId > 0 {
-		err = db.First(&ruleName, dal.Select("name"), dal.From(s.txRuleType.TableName()), dal.Where("id = ?", scope.TransformationRuleId))
+		err = db.First(&ruleName, dal.Select("name"), dal.From(pa.txRuleType.TableName()), dal.Where("id = ?", scope.TransformationRuleId))
 		if err != nil {
 			return nil, err
 		}
diff --git a/backend/server/services/remote/plugin/transformation_rule_api.go b/backend/server/services/remote/plugin/transformation_rule_api.go
index 8a97bfc16..faf2d44c0 100644
--- a/backend/server/services/remote/plugin/transformation_rule_api.go
+++ b/backend/server/services/remote/plugin/transformation_rule_api.go
@@ -23,17 +23,12 @@ import (
 
 	"github.com/apache/incubator-devlake/core/dal"
 	"github.com/apache/incubator-devlake/core/errors"
-	"github.com/apache/incubator-devlake/core/models"
 	"github.com/apache/incubator-devlake/core/plugin"
 	"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
 )
 
-type TransformationRuleAPI struct {
-	txRuleType *models.DynamicTabler
-}
-
-func (t *TransformationRuleAPI) PostTransformationRules(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
-	txRule := t.txRuleType.New()
+func (pa *pluginAPI) PostTransformationRules(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	txRule := pa.txRuleType.New()
 	err := api.Decode(input.Body, txRule, vld)
 	if err != nil {
 		return nil, errors.BadInput.Wrap(err, "error in decoding transformation rule")
@@ -46,13 +41,13 @@ func (t *TransformationRuleAPI) PostTransformationRules(input *plugin.ApiResourc
 	return &plugin.ApiResourceOutput{Body: txRule.Unwrap(), Status: http.StatusOK}, nil
 }
 
-func (t *TransformationRuleAPI) PatchTransformationRule(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+func (pa *pluginAPI) PatchTransformationRule(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
 	id, err := strconv.ParseUint(input.Params["id"], 10, 64)
 	if err != nil {
 		return nil, errors.Default.Wrap(err, "id should be an integer")
 	}
 
-	txRule := t.txRuleType.New()
+	txRule := pa.txRuleType.New()
 	db := basicRes.GetDal()
 	err = api.CallDB(db.First, txRule, dal.Where("id = ?", id))
 	if err != nil {
@@ -67,8 +62,8 @@ func (t *TransformationRuleAPI) PatchTransformationRule(input *plugin.ApiResourc
 	return &plugin.ApiResourceOutput{Body: txRule.Unwrap(), Status: http.StatusOK}, nil
 }
 
-func (t *TransformationRuleAPI) GetTransformationRule(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
-	txRule := t.txRuleType.New()
+func (pa *pluginAPI) GetTransformationRule(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	txRule := pa.txRuleType.New()
 	db := basicRes.GetDal()
 	err := api.CallDB(db.First, txRule, dal.Where("id = ?", input.Params))
 	if err != nil {
@@ -78,8 +73,8 @@ func (t *TransformationRuleAPI) GetTransformationRule(input *plugin.ApiResourceI
 	return &plugin.ApiResourceOutput{Body: txRule.Unwrap()}, nil
 }
 
-func (t *TransformationRuleAPI) ListTransformationRules(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
-	txRules := t.txRuleType.NewSlice()
+func (pa *pluginAPI) ListTransformationRules(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	txRules := pa.txRuleType.NewSlice()
 	limit, offset := api.GetLimitOffset(input.Query, "pageSize", "page")
 	if limit > 100 {
 		return nil, errors.BadInput.New("pageSize cannot exceed 100")
diff --git a/backend/test/remote/fakeplugin/fakeplugin/main.py b/backend/test/remote/fakeplugin/fakeplugin/main.py
index 51c13bb9b..969c70d7a 100644
--- a/backend/test/remote/fakeplugin/fakeplugin/main.py
+++ b/backend/test/remote/fakeplugin/fakeplugin/main.py
@@ -19,7 +19,7 @@ from typing import Optional
 
 from sqlmodel import Field
 
-from pydevlake import Plugin, Connection, TransformationRule, Stream, ToolModel
+from pydevlake import Plugin, Connection, TransformationRule, Stream, ToolModel, RemoteScope
 from pydevlake.domain_layer.devops import CICDScope, CICDPipeline
 
 
@@ -74,7 +74,7 @@ class FakeStream(Stream):
                 return CICDPipeline.Status.DONE
             case _:
                 return CICDPipeline.Status.IN_PROGRESS
-            
+
     def convert_result(self, state: FakePipeline.State):
         match state:
             case FakePipeline.State.SUCCESS:
@@ -82,7 +82,7 @@ class FakeStream(Stream):
             case FakePipeline.State.FAILURE:
                 return CICDPipeline.Status.FAILURE
             case _:
-                return None            
+                return None
 
     def duration(self, pipeline: FakePipeline):
         if pipeline.finished_at:
@@ -111,10 +111,16 @@ class FakePlugin(Plugin):
             url=f"http://fake.org/api/project/{scope_name}"
         )
 
+    def remote_scopes(self, connection: FakeConnection, query: str = ''):
+        yield RemoteScope(
+            id='test',
+            name='Not a real scope'
+        )
+
     def test_connection(self, connection: FakeConnection):
         if connection.token != VALID_TOKEN:
             raise Exception("Invalid token")
-        
+
     @property
     def streams(self):
         return [