You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by he...@apache.org on 2023/02/17 23:11:18 UTC
[incubator-devlake] branch main updated: feat(pyplugins): tx rule scope ap is py (#4438)
This is an automated email from the ASF dual-hosted git repository.
hez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 0d573063a feat(pyplugins): tx rule scope ap is py (#4438)
0d573063a is described below
commit 0d573063acfb1d3fe7649a2ff375badcec2a0dd5
Author: Camille Teruel <ca...@gmail.com>
AuthorDate: Sat Feb 18 00:11:14 2023 +0100
feat(pyplugins): tx rule scope ap is py (#4438)
* docs: Document hook execution order
* fix: Add missing initialization of remote plugins
* fix: Fix swagger body parameter for test connection endpoint
* feat: Implement txRule and scope plugin APIs
Provide a generic implementation for the transformation rule and scope APIs.
* feat: Pass scopeId and transformation rule to python plugin
---------
Co-authored-by: Camille Teruel <ca...@meri.co>
---
backend/core/runner/loader.go | 14 +-
backend/python/pydevlake/README.md | 3 +
backend/python/pydevlake/pydevlake/__init__.py | 2 +-
backend/python/pydevlake/pydevlake/context.py | 14 +-
.../python/pydevlake/pydevlake/doc.template.json | 281 ++++++++++++++++++++-
backend/python/pydevlake/pydevlake/docgen.py | 12 +-
backend/python/pydevlake/pydevlake/ipc.py | 4 +-
backend/python/pydevlake/pydevlake/message.py | 5 +
backend/python/pydevlake/pydevlake/plugin.py | 7 +-
backend/python/pydevlake/pydevlake/subtasks.py | 2 +-
backend/python/pydevlake/test/remote_test.go | 3 +-
backend/python/pydevlake/test/stream_test.py | 5 +-
backend/server/services/remote/models/models.go | 17 +-
.../server/services/remote/plugin/default_api.go | 30 ++-
backend/server/services/remote/plugin/init.go | 3 +-
.../server/services/remote/plugin/plugin_impl.go | 77 ++++--
backend/server/services/remote/plugin/scope_api.go | 222 ++++++++++++++++
.../remote/plugin/transformation_rule_api.go | 94 +++++++
backend/test/remote/fakeplugin/fakeplugin/main.py | 6 +-
19 files changed, 747 insertions(+), 54 deletions(-)
diff --git a/backend/core/runner/loader.go b/backend/core/runner/loader.go
index 171e4c4f0..5c1a7c7a4 100644
--- a/backend/core/runner/loader.go
+++ b/backend/core/runner/loader.go
@@ -19,17 +19,25 @@ package runner
import (
"fmt"
- "github.com/apache/incubator-devlake/core/context"
- "github.com/apache/incubator-devlake/core/errors"
- "github.com/apache/incubator-devlake/core/plugin"
"io/fs"
"path/filepath"
goplugin "plugin"
+ "strconv"
"strings"
+
+ "github.com/apache/incubator-devlake/core/context"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/server/services/remote"
)
// LoadPlugins load plugins from local directory
func LoadPlugins(basicRes context.BasicRes) errors.Error {
+ remote_plugins_enabled, err := strconv.ParseBool(basicRes.GetConfig("ENABLE_REMOTE_PLUGINS"))
+ if err == nil && remote_plugins_enabled {
+ remote.Init(basicRes)
+ }
+
pluginsDir := basicRes.GetConfig("PLUGIN_DIR")
walkErr := filepath.WalkDir(pluginsDir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
diff --git a/backend/python/pydevlake/README.md b/backend/python/pydevlake/README.md
index ef675d307..fbfc976d2 100644
--- a/backend/python/pydevlake/README.md
+++ b/backend/python/pydevlake/README.md
@@ -223,6 +223,9 @@ class MyAPI(API):
Here the method `authenticate` is a hook that is run on each request.
Similarly you can declare response hooks with `@response_hook`.
+Multiple hooks are executed in the order of their declaration.
+The `API` base class declares some hooks that are executed first.
+
#### Pagination
diff --git a/backend/python/pydevlake/pydevlake/__init__.py b/backend/python/pydevlake/pydevlake/__init__.py
index edabf45f0..d45d1ba7d 100644
--- a/backend/python/pydevlake/pydevlake/__init__.py
+++ b/backend/python/pydevlake/pydevlake/__init__.py
@@ -16,7 +16,7 @@
from .model import ToolModel
from .logger import logger
-from .message import Connection
+from .message import Connection, TransformationRule
from .plugin import Plugin
from .stream import Stream, Substream
from .context import Context
diff --git a/backend/python/pydevlake/pydevlake/context.py b/backend/python/pydevlake/pydevlake/context.py
index a5c97b5ff..85eb9fb7c 100644
--- a/backend/python/pydevlake/pydevlake/context.py
+++ b/backend/python/pydevlake/pydevlake/context.py
@@ -17,15 +17,23 @@
from urllib.parse import urlparse, parse_qsl
from sqlmodel import SQLModel, create_engine
-from pydevlake.message import Connection
+from pydevlake.message import Connection, TransformationRule
class Context:
- def __init__(self, db_url: str, connection_id: int, connection: Connection, options: dict):
+ def __init__(self,
+ db_url: str,
+ scope_id: str,
+ connection_id: int,
+ connection: Connection,
+ transformation_rule: TransformationRule = None,
+ options: dict = None):
self.db_url = db_url
+ self.scope_id = scope_id
self.connection_id = connection_id
self.connection = connection
- self.options = options
+ self.transformation_rule = transformation_rule
+ self.options = options or {}
self._engine = None
@property
diff --git a/backend/python/pydevlake/pydevlake/doc.template.json b/backend/python/pydevlake/pydevlake/doc.template.json
index b152a5cf6..a565187cd 100644
--- a/backend/python/pydevlake/pydevlake/doc.template.json
+++ b/backend/python/pydevlake/pydevlake/doc.template.json
@@ -118,13 +118,16 @@
"/plugins/$plugin_name/test": {
"post": {
"description": "Test if a connection is valid",
- "body": {
- "application/json": {
+ "parameters": [
+ {
+ "name": "connection",
+ "required": true,
+ "in": "body",
"schema": {
"$$ref": "#/components/schemas/connection"
}
}
- }
+ ]
},
"response": {
"200": {
@@ -134,11 +137,245 @@
"description": "The connection is not valid"
}
}
+ },
+ "/plugins/$plugin_name/connections/{connectionId}/scopes/{scopeId}": {
+ "get": {
+ "description": "Get a scope",
+ "parameters": [
+ {
+ "$$ref": "#/components/parameters/connectionId"
+ },
+ {
+ "$$ref": "#/components/parameters/scopeId"
+ }
+ ],
+ "responses": {
+ "200": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "$$ref": "#/components/schemas/scope"
+ }
+ }
+ }
+ }
+ }
+ },
+ "patch": {
+ "description": "Update a scope",
+ "parameters": [
+ {
+ "$$ref": "#/components/parameters/connectionId"
+ },
+ {
+ "$$ref": "#/components/parameters/scopeId"
+ },
+ {
+ "name": "scope",
+ "required": true,
+ "in": "body",
+ "schema": {
+ "$$ref": "#/components/schemas/scope"
+ }
+ }
+ ],
+ "responses": {
+ "200": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "$$ref": "#/components/schemas/scope"
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "/plugins/$plugin_name/connections/{connectionId}/scopes": {
+ "get": {
+ "description": "Get all scopes",
+ "parameters": [
+ {
+ "$$ref": "#/components/parameters/pageSize"
+ },
+ {
+ "$$ref": "#/components/parameters/page"
+ }
+ ],
+ "responses": {
+ "200": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "array",
+ "items": {
+ "$$ref": "#/components/schemas/scope"
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "put": {
+ "description": "Create a list of scopes",
+ "parameters": [
+ {
+ "name": "scopes",
+ "required": true,
+ "in": "body",
+ "schema": {
+ "type": "array",
+ "items": {
+ "$$ref": "#/components/schemas/scope"
+ }
+ }
+ }
+ ],
+ "responses": {
+ "200": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "array",
+ "items": {
+ "$$ref": "#/components/schemas/scope"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "/plugins/$plugin_name/transformation_rules": {
+ "get": {
+ "description": "Get all transformation rules",
+ "parameters": [
+ {
+ "$$ref": "#/components/parameters/pageSize"
+ },
+ {
+ "$$ref": "#/components/parameters/page"
+ }
+ ],
+ "responses": {
+ "200": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "array",
+ "items": {
+ "$$ref": "#/components/schemas/transformationRule"
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "post": {
+ "description": "Create a transformation rule",
+ "parameters": [
+ {
+ "name": "rules",
+ "required": true,
+ "in": "body",
+ "schema": {
+ "$$ref": "#/components/schemas/transformationRule"
+ }
+ }
+ ],
+ "responses": {
+ "200": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "$$ref": "#/components/schemas/transformationRule"
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "/plugins/$plugin_name/transformation_rules/{ruleId}": {
+ "get": {
+ "description": "Get a transformation rule",
+ "parameters": [
+ {
+ "$$ref": "#/components/parameters/ruleId"
+ }
+ ],
+ "responses": {
+ "200": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "$$ref": "#/components/schemas/transformationRule"
+ }
+ }
+ }
+ }
+ }
+ },
+ "patch": {
+ "description": "Update a transformation rule",
+ "parameters": [
+ {
+ "$$ref": "#/components/parameters/ruleId"
+ },
+ {
+ "name": "rule",
+ "required": true,
+ "in": "body",
+ "schema": {
+ "$$ref": "#/components/schemas/transformationRule"
+ }
+ }
+ ],
+ "responses": {
+ "200": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "$$ref": "#/components/schemas/transformationRule"
+ }
+ }
+ }
+ }
+ }
+ }
}
},
"components": {
"schemas": {
- "connection": $connection_schema
+ "connection": $connection_schema,
+ "transformationRule": $transformation_rule_schema,
+ "scope": {
+ "title": "Scope",
+ "type": "object",
+ "properties": {
+ "scopeId": {
+ "title": "scope id",
+ "type": "string"
+ },
+ "scopeName": {
+ "title": "scope name",
+ "type": "string"
+ },
+ "connectionId": {
+ "title": "connection id",
+ "type": "integer"
+ },
+ "transformationRuleId": {
+ "title": "Transformation rule id",
+ "type": "integer"
+ }
+ },
+ "required": ["scopeId", "scopeName", "connectionId", "transformationRuleId"]
+ }
},
"parameters": {
"connectionId": {
@@ -149,6 +386,42 @@
"schema": {
"type": "int"
}
+ },
+ "scopeId": {
+ "name": "scopeId",
+ "description": "Id of the scope",
+ "in": "path",
+ "required": true,
+ "schema": {
+ "type": "string"
+ }
+ },
+ "ruleId": {
+ "name": "ruleId",
+ "description": "Id of the transformation rule",
+ "in": "path",
+ "required": true,
+ "schema": {
+ "type": "int"
+ }
+ },
+ "pageSize": {
+ "name": "pageSize",
+ "description": "Number of items per page",
+ "in": "query",
+ "required": false,
+ "schema": {
+ "type": "integer"
+ }
+ },
+ "page": {
+ "name": "page",
+ "description": "Page number",
+ "in": "query",
+ "required": false,
+ "schema": {
+ "type": "integer"
+ }
}
}
}
diff --git a/backend/python/pydevlake/pydevlake/docgen.py b/backend/python/pydevlake/pydevlake/docgen.py
index acce0a39f..9d91197cd 100644
--- a/backend/python/pydevlake/pydevlake/docgen.py
+++ b/backend/python/pydevlake/pydevlake/docgen.py
@@ -19,15 +19,21 @@ from pathlib import Path
from string import Template
import json
-from pydevlake.message import Connection
+from pydevlake.message import Connection, TransformationRule
# TODO: Move swagger documentation generation to GO side along with API implementation
TEMPLATE_PATH = str(Path(__file__).parent / 'doc.template.json')
-def generate_doc(plugin_name: str, connection_type: Type[Connection]):
+def generate_doc(plugin_name: str,
+ connection_type: Type[Connection],
+ transformation_rule_type: Type[TransformationRule]):
with open(TEMPLATE_PATH, 'r') as f:
doc_template = Template(f.read())
connection_schema = connection_type.schema_json()
- doc = doc_template.substitute(plugin_name=plugin_name, connection_schema=connection_schema)
+ transformation_rule_schema = transformation_rule_type.schema_json()
+ doc = doc_template.substitute(
+ plugin_name=plugin_name,
+ connection_schema=connection_schema,
+ transformation_rule_schema=transformation_rule_schema)
return json.loads(doc)
diff --git a/backend/python/pydevlake/pydevlake/ipc.py b/backend/python/pydevlake/pydevlake/ipc.py
index a457b9794..0b8d664e3 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -87,7 +87,9 @@ class PluginCommands:
def _mk_context(self, data: dict):
db_url = data['db_url']
+ scope_id = data['scope_id']
connection_id = data['connection_id']
connection = self._plugin.connection_type(**data['connection'])
+ transformation_rule = self._plugin.transformation_rule_type(**data['transformation_rule'])
options = data.get('options', {})
- return Context(db_url, connection_id, connection, options)
+ return Context(db_url, scope_id, connection_id, connection, transformation_rule, options)
diff --git a/backend/python/pydevlake/pydevlake/message.py b/backend/python/pydevlake/pydevlake/message.py
index de09167c4..aad4690d0 100644
--- a/backend/python/pydevlake/pydevlake/message.py
+++ b/backend/python/pydevlake/pydevlake/message.py
@@ -35,6 +35,7 @@ class PluginInfo(Message):
name: str
description: str
connection_schema: dict
+ transformation_rule_schema: dict
plugin_path: str
subtask_metas: list[SubtaskMeta]
extension: str = "datasource"
@@ -62,6 +63,10 @@ class Connection(Message):
pass
+class TransformationRule(Message):
+ pass
+
+
class PipelineTask(Message):
plugin: str
# Do not snake_case this attribute,
diff --git a/backend/python/pydevlake/pydevlake/plugin.py b/backend/python/pydevlake/pydevlake/plugin.py
index 0e3c0a166..3f81016d8 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -54,6 +54,10 @@ class Plugin:
def connection_type(self) -> Type[msg.Connection]:
pass
+ @property
+ def transformation_rule_type(self) -> Type[msg.TransformationRule]:
+ return msg.TransformationRule
+
@abstractmethod
def test_connection(self, connection: msg.Connection):
"""
@@ -133,7 +137,7 @@ class Plugin:
swagger=msg.SwaggerDoc(
name=self.name,
resource=self.name,
- spec=generate_doc(self.name, self.connection_type)
+ spec=generate_doc(self.name, self.connection_type, self.transformation_rule_type)
)
)
resp = requests.post(f"{endpoint}/plugins/register", data=details.json())
@@ -160,6 +164,7 @@ class Plugin:
plugin_path=self._plugin_path(),
extension="datasource",
connection_schema=self.connection_type.schema(),
+ transformation_rule_schema=self.transformation_rule_type.schema(),
subtask_metas=subtask_metas
)
diff --git a/backend/python/pydevlake/pydevlake/subtasks.py b/backend/python/pydevlake/pydevlake/subtasks.py
index 98591d331..343c29ac5 100644
--- a/backend/python/pydevlake/pydevlake/subtasks.py
+++ b/backend/python/pydevlake/pydevlake/subtasks.py
@@ -152,7 +152,7 @@ class Collector(Subtask):
def _params(self, ctx: Context) -> str:
return json.dumps({
"connection_id": ctx.connection_id,
- "scope_id": ctx.options['scopeId']
+ "scope_id": ctx.scope_id
})
def delete(self, session, ctx):
diff --git a/backend/python/pydevlake/test/remote_test.go b/backend/python/pydevlake/test/remote_test.go
index 9aaeec877..3c11ba971 100644
--- a/backend/python/pydevlake/test/remote_test.go
+++ b/backend/python/pydevlake/test/remote_test.go
@@ -64,10 +64,9 @@ func TestRunSubTask(t *testing.T) {
dataflowTester := e2ehelper.NewDataFlowTester(t, "circleci", remotePlugin)
subtask := remotePlugin.SubTaskMetas()[0]
options := make(map[string]interface{})
- options["project_slug"] = "gh/circleci/bond"
- options["scopeId"] = "1"
taskData := plg.RemotePluginTaskData{
DbUrl: bridge.DefaultContext.GetConfig("db_url"),
+ ScopeId: "gh/circleci/bond",
Connection: CircleCIConnection{ID: 1},
Options: options,
}
diff --git a/backend/python/pydevlake/test/stream_test.py b/backend/python/pydevlake/test/stream_test.py
index db24465a2..234c7e79a 100644
--- a/backend/python/pydevlake/test/stream_test.py
+++ b/backend/python/pydevlake/test/stream_test.py
@@ -75,9 +75,10 @@ def connection(raw_data):
def ctx(connection):
return Context(
db_url="sqlite+pysqlite:///:memory:",
+ scope_id="1",
connection_id=11,
connection=connection,
- options={"scopeId": 1, "scopeName": "foo"}
+ options={}
)
@@ -123,7 +124,7 @@ def test_convert_data(stream, raw_data, ctx):
id=each["i"],
name=each["n"],
raw_data_table="_raw_dummy_model",
- raw_data_params=json.dumps({"connection_id": ctx.connection_id, "scope_id": ctx.options.scopeId})
+ raw_data_params=json.dumps({"connection_id": ctx.connection_id, "scope_id": ctx.scope_id})
)
)
session.commit()
diff --git a/backend/server/services/remote/models/models.go b/backend/server/services/remote/models/models.go
index d19a26fe7..c82783f32 100644
--- a/backend/server/services/remote/models/models.go
+++ b/backend/server/services/remote/models/models.go
@@ -31,14 +31,15 @@ type (
)
type PluginInfo struct {
- Type PluginType `json:"type" validate:"required"`
- Name string `json:"name" validate:"required"`
- Extension PluginExtension `json:"extension"`
- ConnectionSchema map[string]any `json:"connection_schema" validate:"required"`
- Description string `json:"description"`
- PluginPath string `json:"plugin_path" validate:"required"`
- ApiEndpoints []Endpoint `json:"api_endpoints" validate:"dive"`
- SubtaskMetas []SubtaskMeta `json:"subtask_metas" validate:"dive"`
+ Type PluginType `json:"type" validate:"required"`
+ Name string `json:"name" validate:"required"`
+ Extension PluginExtension `json:"extension"`
+ ConnectionSchema map[string]any `json:"connection_schema" validate:"required"`
+ TransformationRuleSchema map[string]any `json:"transformation_rule_schema" validate:"required"`
+ Description string `json:"description"`
+ PluginPath string `json:"plugin_path" validate:"required"`
+ ApiEndpoints []Endpoint `json:"api_endpoints" validate:"dive"`
+ SubtaskMetas []SubtaskMeta `json:"subtask_metas" validate:"dive"`
}
type SubtaskMeta struct {
diff --git a/backend/server/services/remote/plugin/default_api.go b/backend/server/services/remote/plugin/default_api.go
index a2d500b28..b1fa1879c 100644
--- a/backend/server/services/remote/plugin/default_api.go
+++ b/backend/server/services/remote/plugin/default_api.go
@@ -24,12 +24,24 @@ import (
"github.com/apache/incubator-devlake/server/services/remote/bridge"
)
-func GetDefaultAPI(invoker bridge.Invoker, connType *models.DynamicTabler, helper *api.ConnectionApiHelper) map[string]map[string]plugin.ApiResourceHandler {
+func GetDefaultAPI(
+ invoker bridge.Invoker,
+ connType *models.DynamicTabler,
+ txRuleType *models.DynamicTabler,
+ helper *api.ConnectionApiHelper) map[string]map[string]plugin.ApiResourceHandler {
connectionApi := &ConnectionAPI{
invoker: invoker,
connType: connType,
helper: helper,
}
+
+ scopeApi := &ScopeAPI{
+ txRuleType: txRuleType,
+ }
+ txruleApi := &TransformationRuleAPI{
+ txRuleType: txRuleType,
+ }
+
return map[string]map[string]plugin.ApiResourceHandler{
"test": {
"POST": connectionApi.TestConnection,
@@ -43,5 +55,21 @@ func GetDefaultAPI(invoker bridge.Invoker, connType *models.DynamicTabler, helpe
"PATCH": connectionApi.PatchConnection,
"DELETE": connectionApi.DeleteConnection,
},
+ "connections/:connectionId/scopes": {
+ "PUT": scopeApi.PutScope,
+ "GET": scopeApi.ListScopes,
+ },
+ "connections/:connectionId/scopes/*scopeId": {
+ "GET": scopeApi.GetScope,
+ "PATCH": scopeApi.PatchScope,
+ },
+ "transformation_rules": {
+ "POST": txruleApi.PostTransformationRules,
+ "GET": txruleApi.ListTransformationRules,
+ },
+ "transformation_rules/:id": {
+ "GET": txruleApi.GetTransformationRule,
+ "PATCH": txruleApi.PatchTransformationRule,
+ },
}
}
diff --git a/backend/server/services/remote/plugin/init.go b/backend/server/services/remote/plugin/init.go
index 1ddd03001..676fa14d9 100644
--- a/backend/server/services/remote/plugin/init.go
+++ b/backend/server/services/remote/plugin/init.go
@@ -29,10 +29,11 @@ import (
var (
connectionHelper *api.ConnectionApiHelper
basicRes context.BasicRes
+ vld *validator.Validate
)
func Init(br context.BasicRes) {
- vld := validator.New()
+ vld = validator.New()
basicRes = br
connectionHelper = api.NewConnectionHelper(
br,
diff --git a/backend/server/services/remote/plugin/plugin_impl.go b/backend/server/services/remote/plugin/plugin_impl.go
index 7e611b283..8b34b60d7 100644
--- a/backend/server/services/remote/plugin/plugin_impl.go
+++ b/backend/server/services/remote/plugin/plugin_impl.go
@@ -20,6 +20,7 @@ package plugin
import (
"fmt"
+ "github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
coreModels "github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/plugin"
@@ -30,35 +31,46 @@ import (
type (
remotePluginImpl struct {
- name string
- subtaskMetas []plugin.SubTaskMeta
- pluginPath string
- description string
- invoker bridge.Invoker
- connectionTabler *coreModels.DynamicTabler
- resources map[string]map[string]plugin.ApiResourceHandler
+ name string
+ subtaskMetas []plugin.SubTaskMeta
+ pluginPath string
+ description string
+ invoker bridge.Invoker
+ connectionTabler *coreModels.DynamicTabler
+ transformationRuleTabler *coreModels.DynamicTabler
+ resources map[string]map[string]plugin.ApiResourceHandler
}
RemotePluginTaskData struct {
- DbUrl string `json:"db_url"`
- ConnectionId uint64 `json:"connection_id"`
- Connection interface{} `json:"connection"`
- Options map[string]interface{} `json:"options"`
+ DbUrl string `json:"db_url"`
+ ScopeId string `json:"scope_id"`
+ ConnectionId uint64 `json:"connection_id"`
+ Connection interface{} `json:"connection"`
+ TransformationRule interface{} `json:"transformation_rule"`
+ Options map[string]interface{} `json:"options"`
}
)
func newPlugin(info *models.PluginInfo, invoker bridge.Invoker) (*remotePluginImpl, errors.Error) {
connectionTableName := fmt.Sprintf("_tool_%s_connections", info.Name)
- dynamicTabler, err := models.LoadTableModel(connectionTableName, info.ConnectionSchema)
+ connectionTabler, err := models.LoadTableModel(connectionTableName, info.ConnectionSchema)
if err != nil {
return nil, err
}
+
+ txRuleTableName := fmt.Sprintf("_tool_%s_transformation_rules", info.Name)
+ txRuleTabler, err := models.LoadTableModel(txRuleTableName, info.TransformationRuleSchema)
+ if err != nil {
+ return nil, err
+ }
+
p := remotePluginImpl{
- name: info.Name,
- invoker: invoker,
- pluginPath: info.PluginPath,
- description: info.Description,
- connectionTabler: dynamicTabler,
- resources: GetDefaultAPI(invoker, dynamicTabler, connectionHelper),
+ name: info.Name,
+ invoker: invoker,
+ pluginPath: info.PluginPath,
+ description: info.Description,
+ connectionTabler: connectionTabler,
+ transformationRuleTabler: txRuleTabler,
+ resources: GetDefaultAPI(invoker, connectionTabler, txRuleTabler, connectionHelper),
}
remoteBridge := bridge.NewBridge(invoker)
for _, subtask := range info.SubtaskMetas {
@@ -93,11 +105,27 @@ func (p *remotePluginImpl) PrepareTaskData(taskCtx plugin.TaskContext, options m
return nil, errors.Convert(err)
}
+ scopeId, ok := options["scopeId"].(string)
+ if !ok {
+ return nil, errors.BadInput.New("missing scopeId")
+ }
+
+ txRule := p.transformationRuleTabler.New()
+ txRuleId, ok := options["transformation_rule_id"].(uint64)
+ if ok {
+ db := taskCtx.GetDal()
+ err = db.First(&txRule, dal.Where("id = ?", txRuleId))
+ if err != nil {
+ return nil, errors.BadInput.New("invalid transformation rule id")
+ }
+ }
+
return RemotePluginTaskData{
- DbUrl: dbUrl,
- ConnectionId: connectionId,
- Connection: connection.Unwrap(),
- Options: options,
+ DbUrl: dbUrl,
+ ScopeId: scopeId,
+ ConnectionId: connectionId,
+ Connection: connection.Unwrap(),
+ TransformationRule: txRule,
}, nil
}
@@ -123,6 +151,11 @@ func (p *remotePluginImpl) RunMigrations(forceMigrate bool) errors.Error {
return err
}
+ err = api.CallDB(basicRes.GetDal().AutoMigrate, p.transformationRuleTabler.New())
+ if err != nil {
+ return err
+ }
+
err = p.invoker.Call("run-migrations", bridge.DefaultContext, forceMigrate).Get()
return err
}
diff --git a/backend/server/services/remote/plugin/scope_api.go b/backend/server/services/remote/plugin/scope_api.go
new file mode 100644
index 000000000..9bbdbaaf8
--- /dev/null
+++ b/backend/server/services/remote/plugin/scope_api.go
@@ -0,0 +1,222 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package plugin
+
+import (
+ "net/http"
+ "strconv"
+
+ "github.com/mitchellh/mapstructure"
+
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/models"
+ "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+)
+
+type ScopeItem struct {
+ ScopeId string `json:"scopeId"`
+ ScopeName string `json:"scopeName"`
+ ConnectionId uint64 `json:"connectionId"`
+ TransformationRuleId uint64 `json:"transformationRuleId,omitempty"`
+}
+
+// DTO that includes the transformation rule name
+type apiScopeResponse struct {
+ Scope ScopeItem
+ TransformationRuleName string `json:"transformationRuleId,omitempty"`
+}
+
+// Why a batch PUT?
+type request struct {
+ Data []*ScopeItem `json:"data"`
+}
+
+type ScopeAPI struct {
+ txRuleType *models.DynamicTabler
+}
+
+func (s *ScopeAPI) PutScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ connectionId, _ := extractParam(input.Params)
+ if connectionId == 0 {
+ return nil, errors.BadInput.New("invalid connectionId")
+ }
+
+ var scopes request
+ err := errors.Convert(mapstructure.Decode(input.Body, &scopes))
+ if err != nil {
+ return nil, errors.BadInput.Wrap(err, "decoding scope error")
+ }
+
+ keeper := make(map[string]struct{})
+ for _, scope := range scopes.Data {
+ if _, ok := keeper[scope.ScopeId]; ok {
+ return nil, errors.BadInput.New("duplicated item")
+ } else {
+ keeper[scope.ScopeId] = struct{}{}
+ }
+ scope.ConnectionId = connectionId
+
+ err = verifyScope(scope)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ err = basicRes.GetDal().CreateOrUpdate(scopes.Data)
+ if err != nil {
+ return nil, errors.Default.Wrap(err, "error on saving scope")
+ }
+
+ return &plugin.ApiResourceOutput{Body: scopes.Data, Status: http.StatusOK}, nil
+}
+
+func (s *ScopeAPI) PatchScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ connectionId, scopeId := extractParam(input.Params)
+ if connectionId == 0 {
+ return nil, errors.BadInput.New("invalid connectionId")
+ }
+
+ db := basicRes.GetDal()
+ scope := ScopeItem{}
+ err := db.First(&scope, dal.Where("connection_id = ? AND scope_id = ?", connectionId, scopeId))
+ if err != nil {
+ return nil, errors.Default.Wrap(err, "scope not found")
+ }
+
+ err = api.DecodeMapStruct(input.Body, &scope)
+ if err != nil {
+ return nil, errors.Default.Wrap(err, "patch scope error")
+ }
+
+ err = verifyScope(&scope)
+ if err != nil {
+ return nil, err
+ }
+
+ err = db.Update(&scope)
+ if err != nil {
+ return nil, errors.Default.Wrap(err, "error on saving scope")
+ }
+ return &plugin.ApiResourceOutput{Body: scope, Status: http.StatusOK}, nil
+}
+
+func (s *ScopeAPI) ListScopes(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ var scopes []ScopeItem
+ connectionId, _ := extractParam(input.Params)
+
+ if connectionId == 0 {
+ return nil, errors.BadInput.New("invalid connectionId")
+ }
+
+ limit, offset := api.GetLimitOffset(input.Query, "pageSize", "page")
+
+ if limit > 100 {
+ return nil, errors.BadInput.New("Page limit cannot exceed 100")
+ }
+
+ db := basicRes.GetDal()
+ err := db.All(&scopes, dal.Where("connection_id = ?", connectionId), dal.Limit(limit), dal.Offset(offset))
+ if err != nil {
+ return nil, err
+ }
+
+ var ruleIds []uint64
+ for _, scope := range scopes {
+ if scope.TransformationRuleId > 0 {
+ ruleIds = append(ruleIds, scope.TransformationRuleId)
+ }
+ }
+
+ var txRuleId2Name []struct {
+ id uint64
+ name string
+ }
+ if len(ruleIds) > 0 {
+ err = db.All(&txRuleId2Name,
+ dal.Select("id, name"),
+ dal.From(s.txRuleType.TableName()),
+ dal.Where("id IN (?)", ruleIds))
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ names := make(map[uint64]string)
+ for _, r := range txRuleId2Name {
+ names[r.id] = r.name
+ }
+
+ var apiScopes []apiScopeResponse
+ for _, scope := range scopes {
+ txRuleName := names[scope.TransformationRuleId]
+ scopeRes := apiScopeResponse{
+ Scope: scope,
+ TransformationRuleName: txRuleName,
+ }
+ apiScopes = append(apiScopes, scopeRes)
+ }
+
+ return &plugin.ApiResourceOutput{Body: apiScopes, Status: http.StatusOK}, nil
+}
+
+func (s *ScopeAPI) GetScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ var scope ScopeItem
+ connectionId, scopeId := extractParam(input.Params)
+ if connectionId == 0 {
+ return nil, errors.BadInput.New("invalid path params")
+ }
+
+ db := basicRes.GetDal()
+ err := db.First(&scope, dal.Where("connection_id = ? AND scope_id = ?", connectionId, scopeId))
+ if db.IsErrorNotFound(err) {
+ return nil, errors.NotFound.New("record not found")
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ var ruleName string
+ if scope.TransformationRuleId > 0 {
+ err = db.First(&ruleName, dal.Select("name"), dal.From(s.txRuleType.TableName()), dal.Where("id = ?", scope.TransformationRuleId))
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return &plugin.ApiResourceOutput{Body: apiScopeResponse{scope, ruleName}, Status: http.StatusOK}, nil
+}
+
+func extractParam(params map[string]string) (uint64, string) {
+ connectionId, _ := strconv.ParseUint(params["connectionId"], 10, 64)
+ scopeId := params["scopeId"]
+ return connectionId, scopeId
+}
+
+func verifyScope(scope *ScopeItem) errors.Error {
+ if scope.ConnectionId == 0 {
+ return errors.BadInput.New("invalid connectionId")
+ }
+
+ if scope.ScopeId == "" {
+ return errors.BadInput.New("invalid scope ID")
+ }
+
+ return nil
+}
diff --git a/backend/server/services/remote/plugin/transformation_rule_api.go b/backend/server/services/remote/plugin/transformation_rule_api.go
new file mode 100644
index 000000000..8a97bfc16
--- /dev/null
+++ b/backend/server/services/remote/plugin/transformation_rule_api.go
@@ -0,0 +1,94 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package plugin
+
+import (
+ "net/http"
+ "strconv"
+
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/models"
+ "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+)
+
+type TransformationRuleAPI struct {
+ txRuleType *models.DynamicTabler
+}
+
+func (t *TransformationRuleAPI) PostTransformationRules(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ txRule := t.txRuleType.New()
+ err := api.Decode(input.Body, txRule, vld)
+ if err != nil {
+ return nil, errors.BadInput.Wrap(err, "error in decoding transformation rule")
+ }
+ db := basicRes.GetDal()
+ err = api.CallDB(db.Create, txRule)
+ if err != nil {
+ return nil, err
+ }
+ return &plugin.ApiResourceOutput{Body: txRule.Unwrap(), Status: http.StatusOK}, nil
+}
+
+func (t *TransformationRuleAPI) PatchTransformationRule(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ id, err := strconv.ParseUint(input.Params["id"], 10, 64)
+ if err != nil {
+ return nil, errors.Default.Wrap(err, "id should be an integer")
+ }
+
+ txRule := t.txRuleType.New()
+ db := basicRes.GetDal()
+ err = api.CallDB(db.First, txRule, dal.Where("id = ?", id))
+ if err != nil {
+ return nil, errors.Default.Wrap(err, "no transformation rule with given id")
+ }
+
+ err = api.Decode(input.Body, txRule, vld)
+ if err != nil {
+ return nil, errors.Default.Wrap(err, "decoding error")
+ }
+
+ return &plugin.ApiResourceOutput{Body: txRule.Unwrap(), Status: http.StatusOK}, nil
+}
+
+func (t *TransformationRuleAPI) GetTransformationRule(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ txRule := t.txRuleType.New()
+ db := basicRes.GetDal()
+ err := api.CallDB(db.First, txRule, dal.Where("id = ?", input.Params))
+ if err != nil {
+ return nil, errors.Default.Wrap(err, "no transformation rule with given id")
+ }
+
+ return &plugin.ApiResourceOutput{Body: txRule.Unwrap()}, nil
+}
+
+func (t *TransformationRuleAPI) ListTransformationRules(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ txRules := t.txRuleType.NewSlice()
+ limit, offset := api.GetLimitOffset(input.Query, "pageSize", "page")
+ if limit > 100 {
+ return nil, errors.BadInput.New("pageSize cannot exceed 100")
+ }
+
+ db := basicRes.GetDal()
+ err := api.CallDB(db.All, txRules, dal.Limit(limit), dal.Offset(offset))
+ if err != nil {
+ return nil, err
+ }
+ return &plugin.ApiResourceOutput{Body: txRules.Unwrap()}, nil
+}
diff --git a/backend/test/remote/fakeplugin/fakeplugin/main.py b/backend/test/remote/fakeplugin/fakeplugin/main.py
index 0c7c2db8e..51c13bb9b 100644
--- a/backend/test/remote/fakeplugin/fakeplugin/main.py
+++ b/backend/test/remote/fakeplugin/fakeplugin/main.py
@@ -19,7 +19,7 @@ from typing import Optional
from sqlmodel import Field
-from pydevlake import Plugin, Connection, Stream, ToolModel
+from pydevlake import Plugin, Connection, TransformationRule, Stream, ToolModel
from pydevlake.domain_layer.devops import CICDScope, CICDPipeline
@@ -94,6 +94,10 @@ class FakeConnection(Connection):
token: str
+class FakeTransformationRule(TransformationRule):
+ tx1: str
+
+
class FakePlugin(Plugin):
@property
def connection_type(self):