You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by he...@apache.org on 2023/02/17 23:11:18 UTC

[incubator-devlake] branch main updated: feat(pyplugins): tx rule scope ap is py (#4438)

This is an automated email from the ASF dual-hosted git repository.

hez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d573063a feat(pyplugins): tx rule scope ap is py (#4438)
0d573063a is described below

commit 0d573063acfb1d3fe7649a2ff375badcec2a0dd5
Author: Camille Teruel <ca...@gmail.com>
AuthorDate: Sat Feb 18 00:11:14 2023 +0100

    feat(pyplugins): tx rule scope ap is py (#4438)
    
    * docs: Document hook execution order
    
    * fix: Add missing initialization of remote plugins
    
    * fix: Fix swagger body parameter for test connection endpoint
    
    * feat: Implement txRule and scope plugin APIs
    
    Provide a generic implementation for the transformation rule and scope APIs.
    
    * feat: Pass scopeId and transformation rule to python plugin
    
    ---------
    
    Co-authored-by: Camille Teruel <ca...@meri.co>
---
 backend/core/runner/loader.go                      |  14 +-
 backend/python/pydevlake/README.md                 |   3 +
 backend/python/pydevlake/pydevlake/__init__.py     |   2 +-
 backend/python/pydevlake/pydevlake/context.py      |  14 +-
 .../python/pydevlake/pydevlake/doc.template.json   | 281 ++++++++++++++++++++-
 backend/python/pydevlake/pydevlake/docgen.py       |  12 +-
 backend/python/pydevlake/pydevlake/ipc.py          |   4 +-
 backend/python/pydevlake/pydevlake/message.py      |   5 +
 backend/python/pydevlake/pydevlake/plugin.py       |   7 +-
 backend/python/pydevlake/pydevlake/subtasks.py     |   2 +-
 backend/python/pydevlake/test/remote_test.go       |   3 +-
 backend/python/pydevlake/test/stream_test.py       |   5 +-
 backend/server/services/remote/models/models.go    |  17 +-
 .../server/services/remote/plugin/default_api.go   |  30 ++-
 backend/server/services/remote/plugin/init.go      |   3 +-
 .../server/services/remote/plugin/plugin_impl.go   |  77 ++++--
 backend/server/services/remote/plugin/scope_api.go | 222 ++++++++++++++++
 .../remote/plugin/transformation_rule_api.go       |  94 +++++++
 backend/test/remote/fakeplugin/fakeplugin/main.py  |   6 +-
 19 files changed, 747 insertions(+), 54 deletions(-)

diff --git a/backend/core/runner/loader.go b/backend/core/runner/loader.go
index 171e4c4f0..5c1a7c7a4 100644
--- a/backend/core/runner/loader.go
+++ b/backend/core/runner/loader.go
@@ -19,17 +19,25 @@ package runner
 
 import (
 	"fmt"
-	"github.com/apache/incubator-devlake/core/context"
-	"github.com/apache/incubator-devlake/core/errors"
-	"github.com/apache/incubator-devlake/core/plugin"
 	"io/fs"
 	"path/filepath"
 	goplugin "plugin"
+	"strconv"
 	"strings"
+
+	"github.com/apache/incubator-devlake/core/context"
+	"github.com/apache/incubator-devlake/core/errors"
+	"github.com/apache/incubator-devlake/core/plugin"
+	"github.com/apache/incubator-devlake/server/services/remote"
 )
 
 // LoadPlugins load plugins from local directory
 func LoadPlugins(basicRes context.BasicRes) errors.Error {
+	remote_plugins_enabled, err := strconv.ParseBool(basicRes.GetConfig("ENABLE_REMOTE_PLUGINS"))
+	if err == nil && remote_plugins_enabled {
+		remote.Init(basicRes)
+	}
+
 	pluginsDir := basicRes.GetConfig("PLUGIN_DIR")
 	walkErr := filepath.WalkDir(pluginsDir, func(path string, d fs.DirEntry, err error) error {
 		if err != nil {
diff --git a/backend/python/pydevlake/README.md b/backend/python/pydevlake/README.md
index ef675d307..fbfc976d2 100644
--- a/backend/python/pydevlake/README.md
+++ b/backend/python/pydevlake/README.md
@@ -223,6 +223,9 @@ class MyAPI(API):
 
 Here the method `authenticate` is a hook that is run on each request.
 Similarly you can declare response hooks with `@response_hook`.
+Multiple hooks are executed in the order of their declaration.
+The `API` base class declares some hooks that are executed first.
+
 
 #### Pagination
 
diff --git a/backend/python/pydevlake/pydevlake/__init__.py b/backend/python/pydevlake/pydevlake/__init__.py
index edabf45f0..d45d1ba7d 100644
--- a/backend/python/pydevlake/pydevlake/__init__.py
+++ b/backend/python/pydevlake/pydevlake/__init__.py
@@ -16,7 +16,7 @@
 
 from .model import ToolModel
 from .logger import logger
-from .message import Connection
+from .message import Connection, TransformationRule
 from .plugin import Plugin
 from .stream import Stream, Substream
 from .context import Context
diff --git a/backend/python/pydevlake/pydevlake/context.py b/backend/python/pydevlake/pydevlake/context.py
index a5c97b5ff..85eb9fb7c 100644
--- a/backend/python/pydevlake/pydevlake/context.py
+++ b/backend/python/pydevlake/pydevlake/context.py
@@ -17,15 +17,23 @@
 from urllib.parse import urlparse, parse_qsl
 from sqlmodel import SQLModel, create_engine
 
-from pydevlake.message import Connection
+from pydevlake.message import Connection, TransformationRule
 
 
 class Context:
-    def __init__(self, db_url: str, connection_id: int, connection: Connection, options: dict):
+    def __init__(self,
+                 db_url: str,
+                 scope_id: str,
+                 connection_id: int,
+                 connection: Connection,
+                 transformation_rule: TransformationRule = None,
+                 options: dict = None):
         self.db_url = db_url
+        self.scope_id = scope_id
         self.connection_id = connection_id
         self.connection = connection
-        self.options = options
+        self.transformation_rule = transformation_rule
+        self.options = options or {}
         self._engine = None
 
     @property
diff --git a/backend/python/pydevlake/pydevlake/doc.template.json b/backend/python/pydevlake/pydevlake/doc.template.json
index b152a5cf6..a565187cd 100644
--- a/backend/python/pydevlake/pydevlake/doc.template.json
+++ b/backend/python/pydevlake/pydevlake/doc.template.json
@@ -118,13 +118,16 @@
         "/plugins/$plugin_name/test": {
             "post": {
                 "description": "Test if a connection is valid",
-                "body": {
-                    "application/json": {
+                "parameters": [
+                    {
+                        "name": "connection",
+                        "required": true,
+                        "in": "body",
                         "schema": {
                             "$$ref": "#/components/schemas/connection"
                         }
                     }
-                }
+                ]
             },
             "response": {
                 "200": {
@@ -134,11 +137,245 @@
                     "description": "The connection is not valid"
                 }
             }
+        },
+        "/plugins/$plugin_name/connections/{connectionId}/scopes/{scopeId}": {
+            "get": {
+                "description": "Get a scope",
+                "parameters": [
+                    {
+                        "$$ref": "#/components/parameters/connectionId"
+                    },
+                    {
+                        "$$ref": "#/components/parameters/scopeId"
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "content": {
+                            "application/json": {
+                                "schema": {
+                                    "$$ref": "#/components/schemas/scope"
+                                }
+                            }
+                        }
+                    }
+                }
+            },
+            "patch": {
+                "description": "Update a scope",
+                "parameters": [
+                    {
+                        "$$ref": "#/components/parameters/connectionId"
+                    },
+                    {
+                        "$$ref": "#/components/parameters/scopeId"
+                    },
+                    {
+                        "name": "scope",
+                        "required": true,
+                        "in": "body",
+                        "schema": {
+                            "$$ref": "#/components/schemas/scope"
+                        }
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "content": {
+                            "application/json": {
+                                "schema": {
+                                    "$$ref": "#/components/schemas/scope"
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        },
+        "/plugins/$plugin_name/connections/{connectionId}/scopes": {
+            "get": {
+                "description": "Get all scopes",
+                "parameters": [
+                    {
+                        "$$ref": "#/components/parameters/pageSize"
+                    },
+                    {
+                        "$$ref": "#/components/parameters/page"
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "content": {
+                            "application/json": {
+                                "schema": {
+                                    "type": "array",
+                                    "items": {
+                                        "$$ref": "#/components/schemas/scope"
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            },
+            "put": {
+                "description": "Create a list of scopes",
+                "parameters": [
+                    {
+                        "name": "scopes",
+                        "required": true,
+                        "in": "body",
+                        "schema": {
+                            "type": "array",
+                            "items": {
+                                "$$ref": "#/components/schemas/scope"
+                            }
+                        }
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "content": {
+                            "application/json": {
+                                "schema": {
+                                    "type": "array",
+                                    "items": {
+                                        "$$ref": "#/components/schemas/scope"
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        },
+        "/plugins/$plugin_name/transformation_rules": {
+            "get": {
+                "description": "Get all transformation rules",
+                "parameters": [
+                    {
+                        "$$ref": "#/components/parameters/pageSize"
+                    },
+                    {
+                        "$$ref": "#/components/parameters/page"
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "content": {
+                            "application/json": {
+                                "schema": {
+                                    "type": "array",
+                                    "items": {
+                                        "$$ref": "#/components/schemas/transformationRule"
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            },
+            "post": {
+                "description": "Create a transformation rule",
+                "parameters": [
+                    {
+                        "name": "rules",
+                        "required": true,
+                        "in": "body",
+                        "schema": {
+                            "$$ref": "#/components/schemas/transformationRule"
+                        }
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "content": {
+                            "application/json": {
+                                "schema": {
+                                    "$$ref": "#/components/schemas/transformationRule"
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        },
+        "/plugins/$plugin_name/transformation_rules/{ruleId}": {
+            "get": {
+                "description": "Get a transformation rule",
+                "parameters": [
+                    {
+                        "$$ref": "#/components/parameters/ruleId"
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "content": {
+                            "application/json": {
+                                "schema": {
+                                    "$$ref": "#/components/schemas/transformationRule"
+                                }
+                            }
+                        }
+                    }
+                }
+            },
+            "patch": {
+                "description": "Update a transformation rule",
+                "parameters": [
+                    {
+                        "$$ref": "#/components/parameters/ruleId"
+                    },
+                    {
+                        "name": "rule",
+                        "required": true,
+                        "in": "body",
+                        "schema": {
+                            "$$ref": "#/components/schemas/transformationRule"
+                        }
+                    }
+                ],
+                "responses": {
+                    "200": {
+                        "content": {
+                            "application/json": {
+                                "schema": {
+                                    "$$ref": "#/components/schemas/transformationRule"
+                                }
+                            }
+                        }
+                    }
+                }
+            }
         }
     },
     "components": {
         "schemas": {
-            "connection": $connection_schema
+            "connection": $connection_schema,
+            "transformationRule": $transformation_rule_schema,
+            "scope": {
+                "title": "Scope",
+                "type": "object",
+                "properties": {
+                    "scopeId": {
+                        "title": "scope id",
+                        "type": "string"
+                    },
+                    "scopeName": {
+                        "title": "scope name",
+                        "type": "string"
+                    },
+                    "connectionId": {
+                        "title": "connection id",
+                        "type": "integer"
+                    },
+                    "transformationRuleId": {
+                        "title": "Transformation rule id",
+                        "type": "integer"
+                    }
+                },
+                "required": ["scopeId", "scopeName", "connectionId", "transformationRuleId"]
+            }
         },
         "parameters": {
             "connectionId": {
@@ -149,6 +386,42 @@
                 "schema": {
                     "type": "int"
                 }
+            },
+            "scopeId": {
+                "name": "scopeId",
+                "description": "Id of the scope",
+                "in": "path",
+                "required": true,
+                "schema": {
+                    "type": "string"
+                }
+            },
+            "ruleId": {
+                "name": "ruleId",
+                "description": "Id of the transformation rule",
+                "in": "path",
+                "required": true,
+                "schema": {
+                    "type": "int"
+                }
+            },
+            "pageSize": {
+                "name": "pageSize",
+                "description": "Number of items per page",
+                "in": "query",
+                "required": false,
+                "schema": {
+                    "type": "integer"
+                }
+            },
+            "page": {
+                "name": "page",
+                "description": "Page number",
+                "in": "query",
+                "required": false,
+                "schema": {
+                    "type": "integer"
+                }
             }
         }
     }
diff --git a/backend/python/pydevlake/pydevlake/docgen.py b/backend/python/pydevlake/pydevlake/docgen.py
index acce0a39f..9d91197cd 100644
--- a/backend/python/pydevlake/pydevlake/docgen.py
+++ b/backend/python/pydevlake/pydevlake/docgen.py
@@ -19,15 +19,21 @@ from pathlib import Path
 from string import Template
 import json
 
-from pydevlake.message import Connection
+from pydevlake.message import Connection, TransformationRule
 
 
 # TODO: Move swagger documentation generation to GO side along with API implementation
 TEMPLATE_PATH = str(Path(__file__).parent / 'doc.template.json')
 
-def generate_doc(plugin_name: str, connection_type: Type[Connection]):
+def generate_doc(plugin_name: str, 
+                 connection_type: Type[Connection], 
+                 transformation_rule_type: Type[TransformationRule]):
     with open(TEMPLATE_PATH, 'r') as f:
         doc_template = Template(f.read())
         connection_schema = connection_type.schema_json()
-        doc = doc_template.substitute(plugin_name=plugin_name, connection_schema=connection_schema)
+        transformation_rule_schema = transformation_rule_type.schema_json()
+        doc = doc_template.substitute(
+            plugin_name=plugin_name, 
+            connection_schema=connection_schema,
+            transformation_rule_schema=transformation_rule_schema)
         return json.loads(doc)
diff --git a/backend/python/pydevlake/pydevlake/ipc.py b/backend/python/pydevlake/pydevlake/ipc.py
index a457b9794..0b8d664e3 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -87,7 +87,9 @@ class PluginCommands:
 
     def _mk_context(self, data: dict):
         db_url = data['db_url']
+        scope_id = data['scope_id']
         connection_id = data['connection_id']
         connection = self._plugin.connection_type(**data['connection'])
+        transformation_rule = self._plugin.transformation_rule_type(**data['transformation_rule'])
         options = data.get('options', {})
-        return Context(db_url, connection_id, connection, options)
+        return Context(db_url, scope_id, connection_id, connection, transformation_rule, options)
diff --git a/backend/python/pydevlake/pydevlake/message.py b/backend/python/pydevlake/pydevlake/message.py
index de09167c4..aad4690d0 100644
--- a/backend/python/pydevlake/pydevlake/message.py
+++ b/backend/python/pydevlake/pydevlake/message.py
@@ -35,6 +35,7 @@ class PluginInfo(Message):
     name: str
     description: str
     connection_schema: dict
+    transformation_rule_schema: dict
     plugin_path: str
     subtask_metas: list[SubtaskMeta]
     extension: str = "datasource"
@@ -62,6 +63,10 @@ class Connection(Message):
     pass
 
 
+class TransformationRule(Message):
+    pass
+
+
 class PipelineTask(Message):
     plugin: str
     # Do not snake_case this attribute,
diff --git a/backend/python/pydevlake/pydevlake/plugin.py b/backend/python/pydevlake/pydevlake/plugin.py
index 0e3c0a166..3f81016d8 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -54,6 +54,10 @@ class Plugin:
     def connection_type(self) -> Type[msg.Connection]:
         pass
 
+    @property
+    def transformation_rule_type(self) -> Type[msg.TransformationRule]:
+        return msg.TransformationRule
+
     @abstractmethod
     def test_connection(self, connection: msg.Connection):
         """
@@ -133,7 +137,7 @@ class Plugin:
             swagger=msg.SwaggerDoc(
                 name=self.name,
                 resource=self.name,
-                spec=generate_doc(self.name, self.connection_type)
+                spec=generate_doc(self.name, self.connection_type, self.transformation_rule_type)
             )
         )
         resp = requests.post(f"{endpoint}/plugins/register", data=details.json())
@@ -160,6 +164,7 @@ class Plugin:
             plugin_path=self._plugin_path(),
             extension="datasource",
             connection_schema=self.connection_type.schema(),
+            transformation_rule_schema=self.transformation_rule_type.schema(),
             subtask_metas=subtask_metas
         )
 
diff --git a/backend/python/pydevlake/pydevlake/subtasks.py b/backend/python/pydevlake/pydevlake/subtasks.py
index 98591d331..343c29ac5 100644
--- a/backend/python/pydevlake/pydevlake/subtasks.py
+++ b/backend/python/pydevlake/pydevlake/subtasks.py
@@ -152,7 +152,7 @@ class Collector(Subtask):
     def _params(self, ctx: Context) -> str:
         return json.dumps({
             "connection_id": ctx.connection_id,
-            "scope_id": ctx.options['scopeId']
+            "scope_id": ctx.scope_id
         })
 
     def delete(self, session, ctx):
diff --git a/backend/python/pydevlake/test/remote_test.go b/backend/python/pydevlake/test/remote_test.go
index 9aaeec877..3c11ba971 100644
--- a/backend/python/pydevlake/test/remote_test.go
+++ b/backend/python/pydevlake/test/remote_test.go
@@ -64,10 +64,9 @@ func TestRunSubTask(t *testing.T) {
 	dataflowTester := e2ehelper.NewDataFlowTester(t, "circleci", remotePlugin)
 	subtask := remotePlugin.SubTaskMetas()[0]
 	options := make(map[string]interface{})
-	options["project_slug"] = "gh/circleci/bond"
-	options["scopeId"] = "1"
 	taskData := plg.RemotePluginTaskData{
 		DbUrl:      bridge.DefaultContext.GetConfig("db_url"),
+		ScopeId:    "gh/circleci/bond",
 		Connection: CircleCIConnection{ID: 1},
 		Options:    options,
 	}
diff --git a/backend/python/pydevlake/test/stream_test.py b/backend/python/pydevlake/test/stream_test.py
index db24465a2..234c7e79a 100644
--- a/backend/python/pydevlake/test/stream_test.py
+++ b/backend/python/pydevlake/test/stream_test.py
@@ -75,9 +75,10 @@ def connection(raw_data):
 def ctx(connection):
     return Context(
         db_url="sqlite+pysqlite:///:memory:",
+        scope_id="1",
         connection_id=11,
         connection=connection,
-        options={"scopeId": 1, "scopeName": "foo"}
+        options={}
     )
 
 
@@ -123,7 +124,7 @@ def test_convert_data(stream, raw_data, ctx):
                     id=each["i"],
                     name=each["n"],
                     raw_data_table="_raw_dummy_model",
-                    raw_data_params=json.dumps({"connection_id": ctx.connection_id, "scope_id": ctx.options.scopeId})
+                    raw_data_params=json.dumps({"connection_id": ctx.connection_id, "scope_id": ctx.scope_id})
                 )
             )
         session.commit()
diff --git a/backend/server/services/remote/models/models.go b/backend/server/services/remote/models/models.go
index d19a26fe7..c82783f32 100644
--- a/backend/server/services/remote/models/models.go
+++ b/backend/server/services/remote/models/models.go
@@ -31,14 +31,15 @@ type (
 )
 
 type PluginInfo struct {
-	Type             PluginType      `json:"type" validate:"required"`
-	Name             string          `json:"name" validate:"required"`
-	Extension        PluginExtension `json:"extension"`
-	ConnectionSchema map[string]any  `json:"connection_schema" validate:"required"`
-	Description      string          `json:"description"`
-	PluginPath       string          `json:"plugin_path" validate:"required"`
-	ApiEndpoints     []Endpoint      `json:"api_endpoints" validate:"dive"`
-	SubtaskMetas     []SubtaskMeta   `json:"subtask_metas" validate:"dive"`
+	Type                     PluginType      `json:"type" validate:"required"`
+	Name                     string          `json:"name" validate:"required"`
+	Extension                PluginExtension `json:"extension"`
+	ConnectionSchema         map[string]any  `json:"connection_schema" validate:"required"`
+	TransformationRuleSchema map[string]any  `json:"transformation_rule_schema" validate:"required"`
+	Description              string          `json:"description"`
+	PluginPath               string          `json:"plugin_path" validate:"required"`
+	ApiEndpoints             []Endpoint      `json:"api_endpoints" validate:"dive"`
+	SubtaskMetas             []SubtaskMeta   `json:"subtask_metas" validate:"dive"`
 }
 
 type SubtaskMeta struct {
diff --git a/backend/server/services/remote/plugin/default_api.go b/backend/server/services/remote/plugin/default_api.go
index a2d500b28..b1fa1879c 100644
--- a/backend/server/services/remote/plugin/default_api.go
+++ b/backend/server/services/remote/plugin/default_api.go
@@ -24,12 +24,24 @@ import (
 	"github.com/apache/incubator-devlake/server/services/remote/bridge"
 )
 
-func GetDefaultAPI(invoker bridge.Invoker, connType *models.DynamicTabler, helper *api.ConnectionApiHelper) map[string]map[string]plugin.ApiResourceHandler {
+func GetDefaultAPI(
+	invoker bridge.Invoker,
+	connType *models.DynamicTabler,
+	txRuleType *models.DynamicTabler,
+	helper *api.ConnectionApiHelper) map[string]map[string]plugin.ApiResourceHandler {
 	connectionApi := &ConnectionAPI{
 		invoker:  invoker,
 		connType: connType,
 		helper:   helper,
 	}
+
+	scopeApi := &ScopeAPI{
+		txRuleType: txRuleType,
+	}
+	txruleApi := &TransformationRuleAPI{
+		txRuleType: txRuleType,
+	}
+
 	return map[string]map[string]plugin.ApiResourceHandler{
 		"test": {
 			"POST": connectionApi.TestConnection,
@@ -43,5 +55,21 @@ func GetDefaultAPI(invoker bridge.Invoker, connType *models.DynamicTabler, helpe
 			"PATCH":  connectionApi.PatchConnection,
 			"DELETE": connectionApi.DeleteConnection,
 		},
+		"connections/:connectionId/scopes": {
+			"PUT": scopeApi.PutScope,
+			"GET": scopeApi.ListScopes,
+		},
+		"connections/:connectionId/scopes/*scopeId": {
+			"GET":   scopeApi.GetScope,
+			"PATCH": scopeApi.PatchScope,
+		},
+		"transformation_rules": {
+			"POST": txruleApi.PostTransformationRules,
+			"GET":  txruleApi.ListTransformationRules,
+		},
+		"transformation_rules/:id": {
+			"GET":   txruleApi.GetTransformationRule,
+			"PATCH": txruleApi.PatchTransformationRule,
+		},
 	}
 }
diff --git a/backend/server/services/remote/plugin/init.go b/backend/server/services/remote/plugin/init.go
index 1ddd03001..676fa14d9 100644
--- a/backend/server/services/remote/plugin/init.go
+++ b/backend/server/services/remote/plugin/init.go
@@ -29,10 +29,11 @@ import (
 var (
 	connectionHelper *api.ConnectionApiHelper
 	basicRes         context.BasicRes
+	vld              *validator.Validate
 )
 
 func Init(br context.BasicRes) {
-	vld := validator.New()
+	vld = validator.New()
 	basicRes = br
 	connectionHelper = api.NewConnectionHelper(
 		br,
diff --git a/backend/server/services/remote/plugin/plugin_impl.go b/backend/server/services/remote/plugin/plugin_impl.go
index 7e611b283..8b34b60d7 100644
--- a/backend/server/services/remote/plugin/plugin_impl.go
+++ b/backend/server/services/remote/plugin/plugin_impl.go
@@ -20,6 +20,7 @@ package plugin
 import (
 	"fmt"
 
+	"github.com/apache/incubator-devlake/core/dal"
 	"github.com/apache/incubator-devlake/core/errors"
 	coreModels "github.com/apache/incubator-devlake/core/models"
 	"github.com/apache/incubator-devlake/core/plugin"
@@ -30,35 +31,46 @@ import (
 
 type (
 	remotePluginImpl struct {
-		name             string
-		subtaskMetas     []plugin.SubTaskMeta
-		pluginPath       string
-		description      string
-		invoker          bridge.Invoker
-		connectionTabler *coreModels.DynamicTabler
-		resources        map[string]map[string]plugin.ApiResourceHandler
+		name                     string
+		subtaskMetas             []plugin.SubTaskMeta
+		pluginPath               string
+		description              string
+		invoker                  bridge.Invoker
+		connectionTabler         *coreModels.DynamicTabler
+		transformationRuleTabler *coreModels.DynamicTabler
+		resources                map[string]map[string]plugin.ApiResourceHandler
 	}
 	RemotePluginTaskData struct {
-		DbUrl        string                 `json:"db_url"`
-		ConnectionId uint64                 `json:"connection_id"`
-		Connection   interface{}            `json:"connection"`
-		Options      map[string]interface{} `json:"options"`
+		DbUrl              string                 `json:"db_url"`
+		ScopeId            string                 `json:"scope_id"`
+		ConnectionId       uint64                 `json:"connection_id"`
+		Connection         interface{}            `json:"connection"`
+		TransformationRule interface{}            `json:"transformation_rule"`
+		Options            map[string]interface{} `json:"options"`
 	}
 )
 
 func newPlugin(info *models.PluginInfo, invoker bridge.Invoker) (*remotePluginImpl, errors.Error) {
 	connectionTableName := fmt.Sprintf("_tool_%s_connections", info.Name)
-	dynamicTabler, err := models.LoadTableModel(connectionTableName, info.ConnectionSchema)
+	connectionTabler, err := models.LoadTableModel(connectionTableName, info.ConnectionSchema)
 	if err != nil {
 		return nil, err
 	}
+
+	txRuleTableName := fmt.Sprintf("_tool_%s_transformation_rules", info.Name)
+	txRuleTabler, err := models.LoadTableModel(txRuleTableName, info.TransformationRuleSchema)
+	if err != nil {
+		return nil, err
+	}
+
 	p := remotePluginImpl{
-		name:             info.Name,
-		invoker:          invoker,
-		pluginPath:       info.PluginPath,
-		description:      info.Description,
-		connectionTabler: dynamicTabler,
-		resources:        GetDefaultAPI(invoker, dynamicTabler, connectionHelper),
+		name:                     info.Name,
+		invoker:                  invoker,
+		pluginPath:               info.PluginPath,
+		description:              info.Description,
+		connectionTabler:         connectionTabler,
+		transformationRuleTabler: txRuleTabler,
+		resources:                GetDefaultAPI(invoker, connectionTabler, txRuleTabler, connectionHelper),
 	}
 	remoteBridge := bridge.NewBridge(invoker)
 	for _, subtask := range info.SubtaskMetas {
@@ -93,11 +105,27 @@ func (p *remotePluginImpl) PrepareTaskData(taskCtx plugin.TaskContext, options m
 		return nil, errors.Convert(err)
 	}
 
+	scopeId, ok := options["scopeId"].(string)
+	if !ok {
+		return nil, errors.BadInput.New("missing scopeId")
+	}
+
+	txRule := p.transformationRuleTabler.New()
+	txRuleId, ok := options["transformation_rule_id"].(uint64)
+	if ok {
+		db := taskCtx.GetDal()
+		err = db.First(&txRule, dal.Where("id = ?", txRuleId))
+		if err != nil {
+			return nil, errors.BadInput.New("invalid transformation rule id")
+		}
+	}
+
 	return RemotePluginTaskData{
-		DbUrl:        dbUrl,
-		ConnectionId: connectionId,
-		Connection:   connection.Unwrap(),
-		Options:      options,
+		DbUrl:              dbUrl,
+		ScopeId:            scopeId,
+		ConnectionId:       connectionId,
+		Connection:         connection.Unwrap(),
+		TransformationRule: txRule,
 	}, nil
 }
 
@@ -123,6 +151,11 @@ func (p *remotePluginImpl) RunMigrations(forceMigrate bool) errors.Error {
 		return err
 	}
 
+	err = api.CallDB(basicRes.GetDal().AutoMigrate, p.transformationRuleTabler.New())
+	if err != nil {
+		return err
+	}
+
 	err = p.invoker.Call("run-migrations", bridge.DefaultContext, forceMigrate).Get()
 	return err
 }
diff --git a/backend/server/services/remote/plugin/scope_api.go b/backend/server/services/remote/plugin/scope_api.go
new file mode 100644
index 000000000..9bbdbaaf8
--- /dev/null
+++ b/backend/server/services/remote/plugin/scope_api.go
@@ -0,0 +1,222 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package plugin
+
+import (
+	"net/http"
+	"strconv"
+
+	"github.com/mitchellh/mapstructure"
+
+	"github.com/apache/incubator-devlake/core/dal"
+	"github.com/apache/incubator-devlake/core/errors"
+	"github.com/apache/incubator-devlake/core/models"
+	"github.com/apache/incubator-devlake/core/plugin"
+	"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+)
+
+type ScopeItem struct {
+	ScopeId              string `json:"scopeId"`
+	ScopeName            string `json:"scopeName"`
+	ConnectionId         uint64 `json:"connectionId"`
+	TransformationRuleId uint64 `json:"transformationRuleId,omitempty"`
+}
+
+// DTO that includes the transformation rule name
+type apiScopeResponse struct {
+	Scope                  ScopeItem
+	TransformationRuleName string `json:"transformationRuleId,omitempty"`
+}
+
+// Why a batch PUT?
+type request struct {
+	Data []*ScopeItem `json:"data"`
+}
+
+type ScopeAPI struct {
+	txRuleType *models.DynamicTabler
+}
+
+func (s *ScopeAPI) PutScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	connectionId, _ := extractParam(input.Params)
+	if connectionId == 0 {
+		return nil, errors.BadInput.New("invalid connectionId")
+	}
+
+	var scopes request
+	err := errors.Convert(mapstructure.Decode(input.Body, &scopes))
+	if err != nil {
+		return nil, errors.BadInput.Wrap(err, "decoding scope error")
+	}
+
+	keeper := make(map[string]struct{})
+	for _, scope := range scopes.Data {
+		if _, ok := keeper[scope.ScopeId]; ok {
+			return nil, errors.BadInput.New("duplicated item")
+		} else {
+			keeper[scope.ScopeId] = struct{}{}
+		}
+		scope.ConnectionId = connectionId
+
+		err = verifyScope(scope)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	err = basicRes.GetDal().CreateOrUpdate(scopes.Data)
+	if err != nil {
+		return nil, errors.Default.Wrap(err, "error on saving scope")
+	}
+
+	return &plugin.ApiResourceOutput{Body: scopes.Data, Status: http.StatusOK}, nil
+}
+
+func (s *ScopeAPI) PatchScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	connectionId, scopeId := extractParam(input.Params)
+	if connectionId == 0 {
+		return nil, errors.BadInput.New("invalid connectionId")
+	}
+
+	db := basicRes.GetDal()
+	scope := ScopeItem{}
+	err := db.First(&scope, dal.Where("connection_id = ? AND scope_id = ?", connectionId, scopeId))
+	if err != nil {
+		return nil, errors.Default.Wrap(err, "scope not found")
+	}
+
+	err = api.DecodeMapStruct(input.Body, &scope)
+	if err != nil {
+		return nil, errors.Default.Wrap(err, "patch scope error")
+	}
+
+	err = verifyScope(&scope)
+	if err != nil {
+		return nil, err
+	}
+
+	err = db.Update(&scope)
+	if err != nil {
+		return nil, errors.Default.Wrap(err, "error on saving scope")
+	}
+	return &plugin.ApiResourceOutput{Body: scope, Status: http.StatusOK}, nil
+}
+
+func (s *ScopeAPI) ListScopes(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	var scopes []ScopeItem
+	connectionId, _ := extractParam(input.Params)
+
+	if connectionId == 0 {
+		return nil, errors.BadInput.New("invalid connectionId")
+	}
+
+	limit, offset := api.GetLimitOffset(input.Query, "pageSize", "page")
+
+	if limit > 100 {
+		return nil, errors.BadInput.New("Page limit cannot exceed 100")
+	}
+
+	db := basicRes.GetDal()
+	err := db.All(&scopes, dal.Where("connection_id = ?", connectionId), dal.Limit(limit), dal.Offset(offset))
+	if err != nil {
+		return nil, err
+	}
+
+	var ruleIds []uint64
+	for _, scope := range scopes {
+		if scope.TransformationRuleId > 0 {
+			ruleIds = append(ruleIds, scope.TransformationRuleId)
+		}
+	}
+
+	var txRuleId2Name []struct {
+		id   uint64
+		name string
+	}
+	if len(ruleIds) > 0 {
+		err = db.All(&txRuleId2Name,
+			dal.Select("id, name"),
+			dal.From(s.txRuleType.TableName()),
+			dal.Where("id IN (?)", ruleIds))
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	names := make(map[uint64]string)
+	for _, r := range txRuleId2Name {
+		names[r.id] = r.name
+	}
+
+	var apiScopes []apiScopeResponse
+	for _, scope := range scopes {
+		txRuleName := names[scope.TransformationRuleId]
+		scopeRes := apiScopeResponse{
+			Scope:                  scope,
+			TransformationRuleName: txRuleName,
+		}
+		apiScopes = append(apiScopes, scopeRes)
+	}
+
+	return &plugin.ApiResourceOutput{Body: apiScopes, Status: http.StatusOK}, nil
+}
+
+func (s *ScopeAPI) GetScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	var scope ScopeItem
+	connectionId, scopeId := extractParam(input.Params)
+	if connectionId == 0 {
+		return nil, errors.BadInput.New("invalid path params")
+	}
+
+	db := basicRes.GetDal()
+	err := db.First(&scope, dal.Where("connection_id = ? AND scope_id = ?", connectionId, scopeId))
+	if db.IsErrorNotFound(err) {
+		return nil, errors.NotFound.New("record not found")
+	}
+	if err != nil {
+		return nil, err
+	}
+
+	var ruleName string
+	if scope.TransformationRuleId > 0 {
+		err = db.First(&ruleName, dal.Select("name"), dal.From(s.txRuleType.TableName()), dal.Where("id = ?", scope.TransformationRuleId))
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	return &plugin.ApiResourceOutput{Body: apiScopeResponse{scope, ruleName}, Status: http.StatusOK}, nil
+}
+
+func extractParam(params map[string]string) (uint64, string) {
+	connectionId, _ := strconv.ParseUint(params["connectionId"], 10, 64)
+	scopeId := params["scopeId"]
+	return connectionId, scopeId
+}
+
+func verifyScope(scope *ScopeItem) errors.Error {
+	if scope.ConnectionId == 0 {
+		return errors.BadInput.New("invalid connectionId")
+	}
+
+	if scope.ScopeId == "" {
+		return errors.BadInput.New("invalid scope ID")
+	}
+
+	return nil
+}
diff --git a/backend/server/services/remote/plugin/transformation_rule_api.go b/backend/server/services/remote/plugin/transformation_rule_api.go
new file mode 100644
index 000000000..8a97bfc16
--- /dev/null
+++ b/backend/server/services/remote/plugin/transformation_rule_api.go
@@ -0,0 +1,94 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package plugin
+
+import (
+	"net/http"
+	"strconv"
+
+	"github.com/apache/incubator-devlake/core/dal"
+	"github.com/apache/incubator-devlake/core/errors"
+	"github.com/apache/incubator-devlake/core/models"
+	"github.com/apache/incubator-devlake/core/plugin"
+	"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+)
+
+type TransformationRuleAPI struct {
+	txRuleType *models.DynamicTabler
+}
+
+func (t *TransformationRuleAPI) PostTransformationRules(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	txRule := t.txRuleType.New()
+	err := api.Decode(input.Body, txRule, vld)
+	if err != nil {
+		return nil, errors.BadInput.Wrap(err, "error in decoding transformation rule")
+	}
+	db := basicRes.GetDal()
+	err = api.CallDB(db.Create, txRule)
+	if err != nil {
+		return nil, err
+	}
+	return &plugin.ApiResourceOutput{Body: txRule.Unwrap(), Status: http.StatusOK}, nil
+}
+
+func (t *TransformationRuleAPI) PatchTransformationRule(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	id, err := strconv.ParseUint(input.Params["id"], 10, 64)
+	if err != nil {
+		return nil, errors.Default.Wrap(err, "id should be an integer")
+	}
+
+	txRule := t.txRuleType.New()
+	db := basicRes.GetDal()
+	err = api.CallDB(db.First, txRule, dal.Where("id = ?", id))
+	if err != nil {
+		return nil, errors.Default.Wrap(err, "no transformation rule with given id")
+	}
+
+	err = api.Decode(input.Body, txRule, vld)
+	if err != nil {
+		return nil, errors.Default.Wrap(err, "decoding error")
+	}
+
+	return &plugin.ApiResourceOutput{Body: txRule.Unwrap(), Status: http.StatusOK}, nil
+}
+
+func (t *TransformationRuleAPI) GetTransformationRule(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	txRule := t.txRuleType.New()
+	db := basicRes.GetDal()
+	err := api.CallDB(db.First, txRule, dal.Where("id = ?", input.Params))
+	if err != nil {
+		return nil, errors.Default.Wrap(err, "no transformation rule with given id")
+	}
+
+	return &plugin.ApiResourceOutput{Body: txRule.Unwrap()}, nil
+}
+
+func (t *TransformationRuleAPI) ListTransformationRules(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	txRules := t.txRuleType.NewSlice()
+	limit, offset := api.GetLimitOffset(input.Query, "pageSize", "page")
+	if limit > 100 {
+		return nil, errors.BadInput.New("pageSize cannot exceed 100")
+	}
+
+	db := basicRes.GetDal()
+	err := api.CallDB(db.All, txRules, dal.Limit(limit), dal.Offset(offset))
+	if err != nil {
+		return nil, err
+	}
+	return &plugin.ApiResourceOutput{Body: txRules.Unwrap()}, nil
+}
diff --git a/backend/test/remote/fakeplugin/fakeplugin/main.py b/backend/test/remote/fakeplugin/fakeplugin/main.py
index 0c7c2db8e..51c13bb9b 100644
--- a/backend/test/remote/fakeplugin/fakeplugin/main.py
+++ b/backend/test/remote/fakeplugin/fakeplugin/main.py
@@ -19,7 +19,7 @@ from typing import Optional
 
 from sqlmodel import Field
 
-from pydevlake import Plugin, Connection, Stream, ToolModel
+from pydevlake import Plugin, Connection, TransformationRule, Stream, ToolModel
 from pydevlake.domain_layer.devops import CICDScope, CICDPipeline
 
 
@@ -94,6 +94,10 @@ class FakeConnection(Connection):
     token: str
 
 
+class FakeTransformationRule(TransformationRule):
+    tx1: str
+
+
 class FakePlugin(Plugin):
     @property
     def connection_type(self):