You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by he...@apache.org on 2023/02/21 19:06:30 UTC
[incubator-devlake] branch main updated: Blueprint v200 python plugins (#4480)
This is an automated email from the ASF dual-hosted git repository.
hez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 967823f8e Blueprint v200 python plugins (#4480)
967823f8e is described below
commit 967823f8e0363f5a6297ad076ce177109b899c5b
Author: Camille Teruel <ca...@gmail.com>
AuthorDate: Tue Feb 21 20:06:24 2023 +0100
Blueprint v200 python plugins (#4480)
* feat: Generic handler for 429 responses
* refactor: Extract code from PagedResponse.__iter__
Extract `items` and `next_page_request` properties from `__iter__` to let
clients iterate over page manually.
* refactor: Use same struct for all plugin API areas
* feat: Add remote plugins endpoint for python plugins
* fix: Pass connection to make-pipeline plugin command
---------
Co-authored-by: Camille Teruel <ca...@meri.co>
---
backend/python/pydevlake/pydevlake/__init__.py | 2 +-
backend/python/pydevlake/pydevlake/api.py | 70 ++++++++++++++++------
backend/python/pydevlake/pydevlake/ipc.py | 5 ++
backend/python/pydevlake/pydevlake/message.py | 5 ++
backend/python/pydevlake/pydevlake/plugin.py | 6 +-
.../services/remote/plugin/connection_api.go | 47 +++++++--------
.../server/services/remote/plugin/default_api.go | 55 +++++++++--------
.../services/remote/plugin/plugin_extensions.go | 15 ++---
.../services/remote/plugin/remote_scope_api.go | 61 +++++++++++++++++++
backend/server/services/remote/plugin/scope_api.go | 17 ++----
.../remote/plugin/transformation_rule_api.go | 21 +++----
backend/test/remote/fakeplugin/fakeplugin/main.py | 14 +++--
12 files changed, 211 insertions(+), 107 deletions(-)
diff --git a/backend/python/pydevlake/pydevlake/__init__.py b/backend/python/pydevlake/pydevlake/__init__.py
index d45d1ba7d..6d513946d 100644
--- a/backend/python/pydevlake/pydevlake/__init__.py
+++ b/backend/python/pydevlake/pydevlake/__init__.py
@@ -16,7 +16,7 @@
from .model import ToolModel
from .logger import logger
-from .message import Connection, TransformationRule
+from .message import Connection, TransformationRule, RemoteScope
from .plugin import Plugin
from .stream import Stream, Substream
from .context import Context
diff --git a/backend/python/pydevlake/pydevlake/api.py b/backend/python/pydevlake/pydevlake/api.py
index cc2a930cd..bc29fc474 100644
--- a/backend/python/pydevlake/pydevlake/api.py
+++ b/backend/python/pydevlake/pydevlake/api.py
@@ -17,10 +17,14 @@
from __future__ import annotations
from typing import Optional, Union
+from urllib.parse import urljoin
+from http import HTTPStatus
import json
+import time
import requests as req
-from urllib.parse import urljoin
+
+from pydevlake import logger
RouteArgs = Union[list[str], dict[str, str]]
@@ -75,7 +79,7 @@ ABORT = object()
class APIBase:
"""
The base class for defining APIs.
- It implements a hook system to preprocess requests before sending them and postprocess response
+ It implements a hook system to preprocess requests before sending them and postprocess response
before returning them.
Hooks are declared by decorating methods with `@request_hook` and `@response_hook`.
Hooks are executed in the order they are declared.
@@ -121,7 +125,7 @@ class APIBase:
if isinstance(result, type(target)):
target = result
return target
-
+
def get(self, path, **query_args):
req = Request(urljoin(self.base_url, path), query_args)
return self.send(req)
@@ -135,7 +139,7 @@ class APIBase:
if not hasattr(self, '_response_hooks'):
self._response_hooks = [h for h in self._iter_members() if isinstance(h, ResponseHook)]
return self._response_hooks
-
+
def _iter_members(self):
for c in reversed(type(self).__mro__):
for m in c.__dict__.values():
@@ -199,7 +203,7 @@ class Paginator:
def set_next_page_param(self, request, next_page_id: int | str):
"""
Modify the request to set the parameter for fetching next page,
- e.g. set the `page` query parameter.
+ e.g. set the `page` query parameter.
"""
pass
@@ -214,23 +218,36 @@ class PagedResponse(Response):
self.paginator = paginator
self.api = api
+ @property
+ def items(self):
+ return self.paginator.get_items(self.response)
+
+ @property
+ def next_page_request(self):
+ next_page_id = self.paginator.get_next_page_id(self.response)
+ if not next_page_id:
+ # No next page
+ return None
+
+ next_request = self.response.request.copy()
+ self.paginator.set_next_page_param(next_request, next_page_id)
+ return next_request
+
def __iter__(self):
- current = self.response
+ """
+ Iterate over
+ """
+ current = self
while True:
- items = self.paginator.get_items(current)
- for item in items:
- yield item
+ yield from current.items
- next_page_id = self.paginator.get_next_page_id(current)
- if not next_page_id:
+ next_page_request = current.next_page_request
+ if not next_page_request:
# No next page
return
- # Fetch next page
- next_request = current.request.copy()
- self.paginator.set_next_page_param(next_request, next_page_id)
- current = self.api.send(next_request)
+ current = current.api.send(current.next_page_request)
def __getattr__(self, attr_name):
# Delegate everything to Response
@@ -251,12 +268,12 @@ class TokenPaginator(Paginator):
def set_next_page_param(self, request, next_page_id):
request.query_args[self.next_page_token_param] = next_page_id
-
+
class APIException(Exception):
def __init__(self, response):
self.response = response
-
+
def __str__(self):
return f'APIException: {self.response}'
@@ -267,7 +284,7 @@ class API(APIBase):
- pagination: define the `paginator` property in subclasses
# TODO:
- - Error handling response hook: retries,
+ - Error handling response hook: retries,
- Rate limitation
"""
@property
@@ -277,9 +294,24 @@ class API(APIBase):
"""
return None
+ @response_hook
+ def pause_if_too_many_requests(self, response: Response):
+ """
+ Pause execution if a response has a 429 status TOO_MANY_REQUEST.
+ for the number of seconds indicated in the 'Retry-After' header,
+ or 60 seconds if this header is missing.
+ Retry the failed request afterwards.
+ """
+ if response.status == HTTPStatus.TOO_MANY_REQUESTS:
+ retry_after = response.headers.get('Retry-After', 60)
+ logger.warning(f'Got TOO_MANY_REQUESTS response, sleep {int(retry_after)} seconds')
+ time.sleep(retry_after)
+ return self.send(response.request)
+ return response
+
@response_hook
def handle_error(self, response):
- if response.status != 200:
+ if response.status != HTTPStatus.OK:
raise APIException(response)
@response_hook
diff --git a/backend/python/pydevlake/pydevlake/ipc.py b/backend/python/pydevlake/pydevlake/ipc.py
index 0b8d664e3..b1284007a 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -82,6 +82,11 @@ class PluginCommands:
def plugin_info(self):
return self._plugin.plugin_info()
+ @plugin_method
+ def remote_scopes(self, connection: dict, query: str = ''):
+ c = self._plugin.connection_type(**connection)
+ self.plugin.remote_scopes(c, query)
+
def startup(self, endpoint: str):
self._plugin.startup(endpoint)
diff --git a/backend/python/pydevlake/pydevlake/message.py b/backend/python/pydevlake/pydevlake/message.py
index aad4690d0..9b2ba7456 100644
--- a/backend/python/pydevlake/pydevlake/message.py
+++ b/backend/python/pydevlake/pydevlake/message.py
@@ -93,3 +93,8 @@ class PipelineScope(Message):
class BlueprintScope(Message):
id: str
name: str
+
+
+class RemoteScope(Message):
+ id: str
+ name: str
diff --git a/backend/python/pydevlake/pydevlake/plugin.py b/backend/python/pydevlake/pydevlake/plugin.py
index 3f81016d8..fbad2b38d 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -74,6 +74,10 @@ class Plugin:
def get_scopes(self, scope_name: str, connection: msg.Connection) -> Iterable[DomainModel]:
pass
+ @abstractmethod
+ def remote_scopes(self, connection: msg.Connection, query: str = ''):
+ pass
+
@property
def streams(self) -> list[Union[Stream, Type[Stream]]]:
pass
@@ -114,7 +118,7 @@ class Plugin:
plan = msg.PipelinePlan(stages=stages)
yield plan
- scopes = [
+ scopes = [
msg.PipelineScope(
id=':'.join([self.name, type(scope).__name__, ctx.connection_id, bp_scope.id]),
name=bp_scope.name,
diff --git a/backend/server/services/remote/plugin/connection_api.go b/backend/server/services/remote/plugin/connection_api.go
index 94b09fbee..7be545cba 100644
--- a/backend/server/services/remote/plugin/connection_api.go
+++ b/backend/server/services/remote/plugin/connection_api.go
@@ -18,31 +18,24 @@ limitations under the License.
package plugin
import (
+ "net/http"
+
"github.com/apache/incubator-devlake/core/errors"
- "github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/plugin"
- "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/server/services/remote/bridge"
- "net/http"
)
-type ConnectionAPI struct {
- invoker bridge.Invoker
- connType *models.DynamicTabler
- helper *api.ConnectionApiHelper
-}
-
-func (c *ConnectionAPI) TestConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- err := c.invoker.Call("test-connection", bridge.DefaultContext, input.Body).Get()
+func (pa *pluginAPI) TestConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ err := pa.invoker.Call("test-connection", bridge.DefaultContext, input.Body).Get()
if err != nil {
return nil, err
}
return nil, nil
}
-func (c *ConnectionAPI) PostConnections(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- connection := c.connType.New()
- err := c.helper.Create(connection, input)
+func (pa *pluginAPI) PostConnections(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ connection := pa.connType.New()
+ err := pa.helper.Create(connection, input)
if err != nil {
return nil, err
}
@@ -50,9 +43,9 @@ func (c *ConnectionAPI) PostConnections(input *plugin.ApiResourceInput) (*plugin
return &plugin.ApiResourceOutput{Body: conn, Status: http.StatusOK}, nil
}
-func (c *ConnectionAPI) ListConnections(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- connections := c.connType.NewSlice()
- err := c.helper.List(connections)
+func (pa *pluginAPI) ListConnections(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ connections := pa.connType.NewSlice()
+ err := pa.helper.List(connections)
if err != nil {
return nil, err
}
@@ -60,9 +53,9 @@ func (c *ConnectionAPI) ListConnections(input *plugin.ApiResourceInput) (*plugin
return &plugin.ApiResourceOutput{Body: conns}, nil
}
-func (c *ConnectionAPI) GetConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- connection := c.connType.New()
- err := c.helper.First(connection, input.Params)
+func (pa *pluginAPI) GetConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ connection := pa.connType.New()
+ err := pa.helper.First(connection, input.Params)
if err != nil {
return nil, err
}
@@ -70,9 +63,9 @@ func (c *ConnectionAPI) GetConnection(input *plugin.ApiResourceInput) (*plugin.A
return &plugin.ApiResourceOutput{Body: conn}, nil
}
-func (c *ConnectionAPI) PatchConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- connection := c.connType.New()
- err := c.helper.Patch(connection, input)
+func (pa *pluginAPI) PatchConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ connection := pa.connType.New()
+ err := pa.helper.Patch(connection, input)
if err != nil {
return nil, err
}
@@ -80,13 +73,13 @@ func (c *ConnectionAPI) PatchConnection(input *plugin.ApiResourceInput) (*plugin
return &plugin.ApiResourceOutput{Body: conn, Status: http.StatusOK}, nil
}
-func (c *ConnectionAPI) DeleteConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- connection := c.connType.New()
- err := c.helper.First(connection, input.Params)
+func (pa *pluginAPI) DeleteConnection(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ connection := pa.connType.New()
+ err := pa.helper.First(connection, input.Params)
if err != nil {
return nil, err
}
- err = c.helper.Delete(connection)
+ err = pa.helper.Delete(connection)
conn := connection.Unwrap()
return &plugin.ApiResourceOutput{Body: conn}, err
}
diff --git a/backend/server/services/remote/plugin/default_api.go b/backend/server/services/remote/plugin/default_api.go
index b1fa1879c..59bd80801 100644
--- a/backend/server/services/remote/plugin/default_api.go
+++ b/backend/server/services/remote/plugin/default_api.go
@@ -24,52 +24,59 @@ import (
"github.com/apache/incubator-devlake/server/services/remote/bridge"
)
+type pluginAPI struct {
+ invoker bridge.Invoker
+ connType *models.DynamicTabler
+ txRuleType *models.DynamicTabler
+ helper *api.ConnectionApiHelper
+}
+
func GetDefaultAPI(
invoker bridge.Invoker,
connType *models.DynamicTabler,
txRuleType *models.DynamicTabler,
helper *api.ConnectionApiHelper) map[string]map[string]plugin.ApiResourceHandler {
- connectionApi := &ConnectionAPI{
- invoker: invoker,
- connType: connType,
- helper: helper,
- }
-
- scopeApi := &ScopeAPI{
- txRuleType: txRuleType,
- }
- txruleApi := &TransformationRuleAPI{
+ papi := &pluginAPI{
+ invoker: invoker,
+ connType: connType,
txRuleType: txRuleType,
+ helper: helper,
}
return map[string]map[string]plugin.ApiResourceHandler{
"test": {
- "POST": connectionApi.TestConnection,
+ "POST": papi.TestConnection,
},
"connections": {
- "POST": connectionApi.PostConnections,
- "GET": connectionApi.ListConnections,
+ "POST": papi.PostConnections,
+ "GET": papi.ListConnections,
},
"connections/:connectionId": {
- "GET": connectionApi.GetConnection,
- "PATCH": connectionApi.PatchConnection,
- "DELETE": connectionApi.DeleteConnection,
+ "GET": papi.GetConnection,
+ "PATCH": papi.PatchConnection,
+ "DELETE": papi.DeleteConnection,
},
"connections/:connectionId/scopes": {
- "PUT": scopeApi.PutScope,
- "GET": scopeApi.ListScopes,
+ "PUT": papi.PutScope,
+ "GET": papi.ListScopes,
},
"connections/:connectionId/scopes/*scopeId": {
- "GET": scopeApi.GetScope,
- "PATCH": scopeApi.PatchScope,
+ "GET": papi.GetScope,
+ "PATCH": papi.PatchScope,
+ },
+ "connections/:connectionId/remote-scopes": {
+ "GET": papi.GetRemoteScopes,
+ },
+ "connections/:connectionId/search-remote-scopes": {
+ "GET": papi.GetRemoteScopes,
},
"transformation_rules": {
- "POST": txruleApi.PostTransformationRules,
- "GET": txruleApi.ListTransformationRules,
+ "POST": papi.PostTransformationRules,
+ "GET": papi.ListTransformationRules,
},
"transformation_rules/:id": {
- "GET": txruleApi.GetTransformationRule,
- "PATCH": txruleApi.PatchTransformationRule,
+ "GET": papi.GetTransformationRule,
+ "PATCH": papi.PatchTransformationRule,
},
}
}
diff --git a/backend/server/services/remote/plugin/plugin_extensions.go b/backend/server/services/remote/plugin/plugin_extensions.go
index 537e4640f..e81abeed3 100644
--- a/backend/server/services/remote/plugin/plugin_extensions.go
+++ b/backend/server/services/remote/plugin/plugin_extensions.go
@@ -36,18 +36,19 @@ type (
)
func (p *remoteMetricPlugin) MakeMetricPluginPipelinePlanV200(projectName string, options json.RawMessage) (plugin.PipelinePlan, errors.Error) {
- plan := plugin.PipelinePlan{}
- err := p.invoker.Call("MakeMetricPluginPipelinePlanV200", bridge.DefaultContext, projectName, options).Get(&plan)
- if err != nil {
- return nil, err
- }
- return plan, err
+ return nil, errors.Internal.New("Remote metric plugins not supported")
}
func (p *remoteDatasourcePlugin) MakeDataSourcePipelinePlanV200(connectionId uint64, bpScopes []*plugin.BlueprintScopeV200, syncPolicy plugin.BlueprintSyncPolicy) (plugin.PipelinePlan, []plugin.Scope, errors.Error) {
+ connection := p.connectionTabler.New()
+ err := connectionHelper.FirstById(connection, connectionId)
+ if err != nil {
+ return nil, nil, err
+ }
+
plan := plugin.PipelinePlan{}
var scopes []models.PipelineScope
- err := p.invoker.Call("make-pipeline", bridge.DefaultContext, connectionId, bpScopes).Get(&plan, &scopes)
+ err = p.invoker.Call("make-pipeline", bridge.DefaultContext, connectionId, bpScopes).Get(&plan, &scopes)
if err != nil {
return nil, nil, err
}
diff --git a/backend/server/services/remote/plugin/remote_scope_api.go b/backend/server/services/remote/plugin/remote_scope_api.go
new file mode 100644
index 000000000..fc428bd9a
--- /dev/null
+++ b/backend/server/services/remote/plugin/remote_scope_api.go
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package plugin
+
+import (
+ "net/http"
+ "strconv"
+
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/server/services/remote/bridge"
+)
+
+type RemoteScopesOutput struct {
+ Children []ScopeItem `json:"children"`
+}
+
+func (pa *pluginAPI) GetRemoteScopes(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ connectionId, _ := strconv.ParseUint(input.Params["connectionId"], 10, 64)
+ if connectionId == 0 {
+ return nil, errors.BadInput.New("invalid connectionId")
+ }
+
+ connection := pa.connType.New()
+ err := pa.helper.First(connection, input.Params)
+ if err != nil {
+ return nil, err
+ }
+
+ query, ok := input.Params["query"]
+ if !ok {
+ query = ""
+ }
+
+ var scopes []ScopeItem
+ err = pa.invoker.Call("remote-scopes", bridge.DefaultContext, connection, query).Get(&scopes)
+ if err != nil {
+ return nil, err
+ }
+
+ res := RemoteScopesOutput{
+ Children: scopes,
+ }
+
+ return &plugin.ApiResourceOutput{Body: res, Status: http.StatusOK}, nil
+}
diff --git a/backend/server/services/remote/plugin/scope_api.go b/backend/server/services/remote/plugin/scope_api.go
index 9bbdbaaf8..e793d6119 100644
--- a/backend/server/services/remote/plugin/scope_api.go
+++ b/backend/server/services/remote/plugin/scope_api.go
@@ -25,7 +25,6 @@ import (
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
- "github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
)
@@ -48,11 +47,7 @@ type request struct {
Data []*ScopeItem `json:"data"`
}
-type ScopeAPI struct {
- txRuleType *models.DynamicTabler
-}
-
-func (s *ScopeAPI) PutScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+func (pa *pluginAPI) PutScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
connectionId, _ := extractParam(input.Params)
if connectionId == 0 {
return nil, errors.BadInput.New("invalid connectionId")
@@ -87,7 +82,7 @@ func (s *ScopeAPI) PutScope(input *plugin.ApiResourceInput) (*plugin.ApiResource
return &plugin.ApiResourceOutput{Body: scopes.Data, Status: http.StatusOK}, nil
}
-func (s *ScopeAPI) PatchScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+func (pa *pluginAPI) PatchScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
connectionId, scopeId := extractParam(input.Params)
if connectionId == 0 {
return nil, errors.BadInput.New("invalid connectionId")
@@ -117,7 +112,7 @@ func (s *ScopeAPI) PatchScope(input *plugin.ApiResourceInput) (*plugin.ApiResour
return &plugin.ApiResourceOutput{Body: scope, Status: http.StatusOK}, nil
}
-func (s *ScopeAPI) ListScopes(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+func (pa *pluginAPI) ListScopes(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
var scopes []ScopeItem
connectionId, _ := extractParam(input.Params)
@@ -151,7 +146,7 @@ func (s *ScopeAPI) ListScopes(input *plugin.ApiResourceInput) (*plugin.ApiResour
if len(ruleIds) > 0 {
err = db.All(&txRuleId2Name,
dal.Select("id, name"),
- dal.From(s.txRuleType.TableName()),
+ dal.From(pa.txRuleType.TableName()),
dal.Where("id IN (?)", ruleIds))
if err != nil {
return nil, err
@@ -176,7 +171,7 @@ func (s *ScopeAPI) ListScopes(input *plugin.ApiResourceInput) (*plugin.ApiResour
return &plugin.ApiResourceOutput{Body: apiScopes, Status: http.StatusOK}, nil
}
-func (s *ScopeAPI) GetScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+func (pa *pluginAPI) GetScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
var scope ScopeItem
connectionId, scopeId := extractParam(input.Params)
if connectionId == 0 {
@@ -194,7 +189,7 @@ func (s *ScopeAPI) GetScope(input *plugin.ApiResourceInput) (*plugin.ApiResource
var ruleName string
if scope.TransformationRuleId > 0 {
- err = db.First(&ruleName, dal.Select("name"), dal.From(s.txRuleType.TableName()), dal.Where("id = ?", scope.TransformationRuleId))
+ err = db.First(&ruleName, dal.Select("name"), dal.From(pa.txRuleType.TableName()), dal.Where("id = ?", scope.TransformationRuleId))
if err != nil {
return nil, err
}
diff --git a/backend/server/services/remote/plugin/transformation_rule_api.go b/backend/server/services/remote/plugin/transformation_rule_api.go
index 8a97bfc16..faf2d44c0 100644
--- a/backend/server/services/remote/plugin/transformation_rule_api.go
+++ b/backend/server/services/remote/plugin/transformation_rule_api.go
@@ -23,17 +23,12 @@ import (
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
- "github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
)
-type TransformationRuleAPI struct {
- txRuleType *models.DynamicTabler
-}
-
-func (t *TransformationRuleAPI) PostTransformationRules(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- txRule := t.txRuleType.New()
+func (pa *pluginAPI) PostTransformationRules(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ txRule := pa.txRuleType.New()
err := api.Decode(input.Body, txRule, vld)
if err != nil {
return nil, errors.BadInput.Wrap(err, "error in decoding transformation rule")
@@ -46,13 +41,13 @@ func (t *TransformationRuleAPI) PostTransformationRules(input *plugin.ApiResourc
return &plugin.ApiResourceOutput{Body: txRule.Unwrap(), Status: http.StatusOK}, nil
}
-func (t *TransformationRuleAPI) PatchTransformationRule(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+func (pa *pluginAPI) PatchTransformationRule(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
id, err := strconv.ParseUint(input.Params["id"], 10, 64)
if err != nil {
return nil, errors.Default.Wrap(err, "id should be an integer")
}
- txRule := t.txRuleType.New()
+ txRule := pa.txRuleType.New()
db := basicRes.GetDal()
err = api.CallDB(db.First, txRule, dal.Where("id = ?", id))
if err != nil {
@@ -67,8 +62,8 @@ func (t *TransformationRuleAPI) PatchTransformationRule(input *plugin.ApiResourc
return &plugin.ApiResourceOutput{Body: txRule.Unwrap(), Status: http.StatusOK}, nil
}
-func (t *TransformationRuleAPI) GetTransformationRule(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- txRule := t.txRuleType.New()
+func (pa *pluginAPI) GetTransformationRule(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ txRule := pa.txRuleType.New()
db := basicRes.GetDal()
err := api.CallDB(db.First, txRule, dal.Where("id = ?", input.Params))
if err != nil {
@@ -78,8 +73,8 @@ func (t *TransformationRuleAPI) GetTransformationRule(input *plugin.ApiResourceI
return &plugin.ApiResourceOutput{Body: txRule.Unwrap()}, nil
}
-func (t *TransformationRuleAPI) ListTransformationRules(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- txRules := t.txRuleType.NewSlice()
+func (pa *pluginAPI) ListTransformationRules(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ txRules := pa.txRuleType.NewSlice()
limit, offset := api.GetLimitOffset(input.Query, "pageSize", "page")
if limit > 100 {
return nil, errors.BadInput.New("pageSize cannot exceed 100")
diff --git a/backend/test/remote/fakeplugin/fakeplugin/main.py b/backend/test/remote/fakeplugin/fakeplugin/main.py
index 51c13bb9b..969c70d7a 100644
--- a/backend/test/remote/fakeplugin/fakeplugin/main.py
+++ b/backend/test/remote/fakeplugin/fakeplugin/main.py
@@ -19,7 +19,7 @@ from typing import Optional
from sqlmodel import Field
-from pydevlake import Plugin, Connection, TransformationRule, Stream, ToolModel
+from pydevlake import Plugin, Connection, TransformationRule, Stream, ToolModel, RemoteScope
from pydevlake.domain_layer.devops import CICDScope, CICDPipeline
@@ -74,7 +74,7 @@ class FakeStream(Stream):
return CICDPipeline.Status.DONE
case _:
return CICDPipeline.Status.IN_PROGRESS
-
+
def convert_result(self, state: FakePipeline.State):
match state:
case FakePipeline.State.SUCCESS:
@@ -82,7 +82,7 @@ class FakeStream(Stream):
case FakePipeline.State.FAILURE:
return CICDPipeline.Status.FAILURE
case _:
- return None
+ return None
def duration(self, pipeline: FakePipeline):
if pipeline.finished_at:
@@ -111,10 +111,16 @@ class FakePlugin(Plugin):
url=f"http://fake.org/api/project/{scope_name}"
)
+ def remote_scopes(self, connection: FakeConnection, query: str = ''):
+ yield RemoteScope(
+ id='test',
+ name='Not a real scope'
+ )
+
def test_connection(self, connection: FakeConnection):
if connection.token != VALID_TOKEN:
raise Exception("Invalid token")
-
+
@property
def streams(self):
return [