You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by he...@apache.org on 2023/02/28 17:14:35 UTC

[incubator-devlake] branch main updated: Blueprint v200 py plugins (#4521)

This is an automated email from the ASF dual-hosted git repository.

hez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new ce1031a72 Blueprint v200 py plugins (#4521)
ce1031a72 is described below

commit ce1031a726fe061b0986989cb5af25df45b81dcd
Author: Camille Teruel <ca...@gmail.com>
AuthorDate: Tue Feb 28 18:14:28 2023 +0100

    Blueprint v200 py plugins (#4521)
    
    * fix: Fix passing options to python plugin
    
    * fix: Stream must declare its domain types
    
    A stream must declare its domain types ("CICD", "TICKET", "CROSS", etc), not the domain model it produces.
    
    * fix: Add missing table name for CICDScope
    
    * feat: Supertype & optional encryption for dynamic loaded models
    
    * refactor: Use dynamic tabler for scope
    
    * fix: Fix CreateScope endpoint for batch PUT
    
    * style: Remove leftover code
    
    * refactor: Distinguish between tool and domain level scopes
    
    * fix: Do not generate fields that hide baseType fields
    
    * fix: Make remoteDatasourcePlugin castable to DataSourcePluginBlueprintV200
    
    A remoteDatasourcePlugin struct as a PluginMeta must be castable to DataSourcePluginBlueprintV200.
    
    * fix: Make Plugin an abstract class
    
    Make Plugin inherit ABC to ensure all abstract methods are implemented by subclasses.
    
    * feat: Make transformation rule optional
    
    Not all plugins need transformation rules, so we make it optional.
    
    * refactor: Connection and TransformationRule as Model instead of Message
    
    Even if connections and transformation rules are sent from go to python,
    they are also models backed by a tool table.
    
    * refactor: Remove connection_id from Context
    
    Now that we receive a Connection with its id, we don't need separate connection_id in Context.
    
    * refactor: Introduce message for dynamic models
    
    * fix: Make some model fields optional
    
    * fix: Rename CicdScope to match go side
    
    * test: Add test for blueprint V2
    
    * refactor: Use tool/domain scopes as input/output of make-pipeline command
    
    Use the genuine tool scopes as input and genuine domain scopes as output to the make-pipeline command.
    For tool scope inputs, python plugins are oblivious of the BlueprintScope concept, the go side taking care
    of fetching and passing the real tool scopes.
    For the domain scope outputs, it requires to send back the domain scope as a json object together with a type name to allow reconstruction on go side.
    Because of limited go reflective abilities, the reconstruction function hardcodes the existing domain scope types.
    
    * feat: Support remote scope groups
    
    Python plugins now support a two level scope hierarchy: scopes and scope groups.
    When the group_id query parameter is absent, the default implementation calls
    `remote_scope_groups` that is redefined in each plugin. If the group_id is given,
    the default implementation call `remote_scopes` that return the scopes that are in a given group id.
    
    ---------
    
    Co-authored-by: Camille Teruel <ca...@meri.co>
    Co-authored-by: Keon Amini <ke...@merico.dev>
---
 backend/core/models/dynamic_tabler.go              |  22 ++++
 backend/python/pydevlake/README.md                 |  16 +--
 backend/python/pydevlake/pydevlake/__init__.py     |   6 +-
 backend/python/pydevlake/pydevlake/context.py      |   4 +-
 backend/python/pydevlake/pydevlake/docgen.py       |  10 +-
 .../pydevlake/pydevlake/domain_layer/devops.py     |  18 +--
 backend/python/pydevlake/pydevlake/ipc.py          |  21 +--
 backend/python/pydevlake/pydevlake/logger.py       |   7 -
 backend/python/pydevlake/pydevlake/message.py      |  55 ++++----
 backend/python/pydevlake/pydevlake/model.py        |  82 ++++++++----
 backend/python/pydevlake/pydevlake/plugin.py       | 107 +++++++++------
 backend/python/pydevlake/pydevlake/stream.py       |  18 ++-
 backend/python/pydevlake/pydevlake/subtasks.py     |  10 +-
 backend/python/pydevlake/test/stream_test.py       |   9 +-
 .../server/services/remote/models/conversion.go    |  51 ++++++--
 .../remote/models/dynamic_domain_scopes.go         |  66 ++++++++++
 backend/server/services/remote/models/models.go    |  62 +++++++--
 .../server/services/remote/models/plugin_models.go |  41 ------
 .../server/services/remote/plugin/default_api.go   |  22 +++-
 backend/server/services/remote/plugin/init.go      |   4 +-
 .../services/remote/plugin/plugin_extensions.go    |  36 ++++--
 .../server/services/remote/plugin/plugin_impl.go   |  52 +++++---
 .../services/remote/plugin/remote_scope_api.go     |  35 +++--
 backend/server/services/remote/plugin/scope_api.go | 144 ++++++++++-----------
 backend/test/helper/api.go                         |   5 +-
 backend/test/remote/fakeplugin/fakeplugin/main.py  |  46 +++++--
 backend/test/remote/remote_test.go                 | 125 +++++++++++++-----
 27 files changed, 696 insertions(+), 378 deletions(-)

diff --git a/backend/core/models/dynamic_tabler.go b/backend/core/models/dynamic_tabler.go
index b7f92a7e7..ab58b77ef 100644
--- a/backend/core/models/dynamic_tabler.go
+++ b/backend/core/models/dynamic_tabler.go
@@ -18,6 +18,8 @@ limitations under the License.
 package models
 
 import (
+	"encoding/json"
+	"github.com/apache/incubator-devlake/core/errors"
 	"reflect"
 
 	"github.com/apache/incubator-devlake/core/dal"
@@ -56,6 +58,26 @@ func (d *DynamicTabler) NewSlice() *DynamicTabler {
 	}
 }
 
+func (d *DynamicTabler) From(src any) errors.Error {
+	b, err := json.Marshal(src)
+	if err != nil {
+		return errors.Convert(err)
+	}
+	return errors.Convert(json.Unmarshal(b, d.wrapped))
+}
+
+func (d *DynamicTabler) To(target any) errors.Error {
+	b, err := json.Marshal(d.wrapped)
+	if err != nil {
+		return errors.Convert(err)
+	}
+	return errors.Convert(json.Unmarshal(b, target))
+}
+
+func (d *DynamicTabler) Set(x any) {
+	d.wrapped = x
+}
+
 func (d *DynamicTabler) Unwrap() any {
 	return d.wrapped
 }
diff --git a/backend/python/pydevlake/README.md b/backend/python/pydevlake/README.md
index fbfc976d2..c30a5bf8d 100644
--- a/backend/python/pydevlake/README.md
+++ b/backend/python/pydevlake/README.md
@@ -98,7 +98,7 @@ You can use `SQLModel` features like [declaring relationships with other models]
 Create a new file for your first stream in a `streams` directory.
 
 ```python
-from pydevlake import Stream
+from pydevlake import Stream, DomainType
 from pydevlake.domain_layer.crossdomain import User as DomainUser
 
 from myplugin.models import User as ToolUser
@@ -106,7 +106,7 @@ from myplugin.models import User as ToolUser
 
 class Users(Stream):
     tool_model = ToolUser
-    domain_model = DomainUser
+    domain_types = [DomainType.CROSS]
 
     def collect(self, state, context) -> Iterable[Tuple[object, dict]]:
         pass
@@ -118,7 +118,8 @@ class Users(Stream):
 This stream will collect raw user data, e.g. as parsed JSON objects, extract this raw data as your
 tool-specific user model, then convert them into domain-layer user models.
 
-It is possible to have a stream that produce several domain models of different types from a single tool model. In that case, declare the list of possible domain model types in a class attribute `domain_models` (plural) instead of a single domain model type in the `domain_model` (singular) class attribute.
+The `tool_model` class attribute declares the tool model class that is extracted by this strem.
+The `domain_types` class attribute is a list of domain types this stream is about.
 
 The `collect` method takes a `state` dictionary and a context object and yields tuples of raw data and new state.
 The last state that the plugin yielded for a given connection will be reused during the next collection.
@@ -198,7 +199,7 @@ class Users(Stream):
 #### Request and response hook
 
 For each request sent and response received by your API wrapper,
-you can register hooks. Hooks allows you to implement 
+you can register hooks. Hooks allows you to implement
 authentication, pagination, and generic API error handling.
 
 For example, lets assume that we are dealing with an API that
@@ -278,7 +279,7 @@ class UserComments(Substream):
     ...
     def collect(self, state: dict, context, user: User):
         """
-        This method will be called for each user collected from parent stream Users. 
+        This method will be called for each user collected from parent stream Users.
         """
         for json in MyPluginAPI(context.connection.token).user_comments(user.id):
             yield json, state
@@ -298,7 +299,6 @@ poetry run myplugin/main.py --help
 For testing, the interesting commands are `collect`/`extract`/`convert`.
 Each takes a context and a stream name.
 The context is a JSON object that must at least contain:
-- a `connection_id`
 - a `db_url`, e.g. you can use `"sqlite+pysqlite:///:memory:"` for an in-memory DB
 - a `connection` object containing the same attributes than your plugin connection type
 
@@ -307,7 +307,7 @@ redirect to stdout when testing your plugin.
 
 ```
 console
-CTX='{"connection_id": "1", "db_url":"sqlite+pysqlite:///:memory:", "connection": {...your connection attrs here...}}'
+CTX='{"db_url":"sqlite+pysqlite:///:memory:", "connection": {...your connection attrs here...}}'
 poetry run myplugin/main.py $CTX users 3>&1
 ```
 
@@ -326,7 +326,7 @@ curl -X 'POST' \
 
 You should get the created connection with his id (which is 1 for the first created connection) in the response.
 
-Now that a connection for your plugin exists in DevLake database, we can try to run your plugin using `backend/server/services/remote/run/run.go` script: 
+Now that a connection for your plugin exists in DevLake database, we can try to run your plugin using `backend/server/services/remote/run/run.go` script:
 
 ```console
 cd backend
diff --git a/backend/python/pydevlake/pydevlake/__init__.py b/backend/python/pydevlake/pydevlake/__init__.py
index 6d513946d..1cc120d27 100644
--- a/backend/python/pydevlake/pydevlake/__init__.py
+++ b/backend/python/pydevlake/pydevlake/__init__.py
@@ -14,9 +14,9 @@
 # limitations under the License.
 
 
-from .model import ToolModel
+from .model import ToolModel, ToolScope, DomainScope, Connection, TransformationRule
 from .logger import logger
-from .message import Connection, TransformationRule, RemoteScope
+from .message import RemoteScopeGroup
 from .plugin import Plugin
-from .stream import Stream, Substream
+from .stream import DomainType, Stream, Substream
 from .context import Context
diff --git a/backend/python/pydevlake/pydevlake/context.py b/backend/python/pydevlake/pydevlake/context.py
index 85eb9fb7c..043bb0636 100644
--- a/backend/python/pydevlake/pydevlake/context.py
+++ b/backend/python/pydevlake/pydevlake/context.py
@@ -17,20 +17,18 @@
 from urllib.parse import urlparse, parse_qsl
 from sqlmodel import SQLModel, create_engine
 
-from pydevlake.message import Connection, TransformationRule
+from pydevlake.model import Connection, TransformationRule
 
 
 class Context:
     def __init__(self,
                  db_url: str,
                  scope_id: str,
-                 connection_id: int,
                  connection: Connection,
                  transformation_rule: TransformationRule = None,
                  options: dict = None):
         self.db_url = db_url
         self.scope_id = scope_id
-        self.connection_id = connection_id
         self.connection = connection
         self.transformation_rule = transformation_rule
         self.options = options or {}
diff --git a/backend/python/pydevlake/pydevlake/docgen.py b/backend/python/pydevlake/pydevlake/docgen.py
index 9d91197cd..7bc0a5c79 100644
--- a/backend/python/pydevlake/pydevlake/docgen.py
+++ b/backend/python/pydevlake/pydevlake/docgen.py
@@ -19,21 +19,21 @@ from pathlib import Path
 from string import Template
 import json
 
-from pydevlake.message import Connection, TransformationRule
+from pydevlake.model import Connection, TransformationRule
 
 
 # TODO: Move swagger documentation generation to GO side along with API implementation
 TEMPLATE_PATH = str(Path(__file__).parent / 'doc.template.json')
 
-def generate_doc(plugin_name: str, 
-                 connection_type: Type[Connection], 
+def generate_doc(plugin_name: str,
+                 connection_type: Type[Connection],
                  transformation_rule_type: Type[TransformationRule]):
     with open(TEMPLATE_PATH, 'r') as f:
         doc_template = Template(f.read())
         connection_schema = connection_type.schema_json()
-        transformation_rule_schema = transformation_rule_type.schema_json()
+        transformation_rule_schema = transformation_rule_type.schema_json() if transformation_rule_type else {}
         doc = doc_template.substitute(
-            plugin_name=plugin_name, 
+            plugin_name=plugin_name,
             connection_schema=connection_schema,
             transformation_rule_schema=transformation_rule_schema)
         return json.loads(doc)
diff --git a/backend/python/pydevlake/pydevlake/domain_layer/devops.py b/backend/python/pydevlake/pydevlake/domain_layer/devops.py
index c15dd5c5d..4a5fffbdf 100644
--- a/backend/python/pydevlake/pydevlake/domain_layer/devops.py
+++ b/backend/python/pydevlake/pydevlake/domain_layer/devops.py
@@ -20,15 +20,17 @@ from enum import Enum
 
 from sqlmodel import Field, Relationship
 
-from pydevlake.model import DomainModel, NoPKModel
+from pydevlake.model import DomainModel, DomainScope, NoPKModel
 
 
-class CICDScope(DomainModel):
-	name: str
-	description: str
-	url: str
-	createdDate: datetime
-	updatedDate: datetime
+class CicdScope(DomainScope, table=True):
+    __tablename__ = 'cicd_scopes'
+
+    name: str
+    description: Optional[str]
+    url: Optional[str]
+    createdDate: Optional[datetime]
+    updatedDate: Optional[datetime]
 
 
 class CICDPipeline(DomainModel, table=True):
@@ -48,7 +50,7 @@ class CICDPipeline(DomainModel, table=True):
     class Type(Enum):
         CI = "CI"
         CD = "CD"
-        
+
     name: str
     status: Status
     created_date: datetime
diff --git a/backend/python/pydevlake/pydevlake/ipc.py b/backend/python/pydevlake/pydevlake/ipc.py
index b1284007a..aa2811481 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -16,7 +16,7 @@
 
 import os
 from functools import wraps
-from typing import Generator, TextIO
+from typing import Generator, TextIO, Optional
 
 from pydevlake.context import Context
 from pydevlake.message import Message
@@ -30,7 +30,7 @@ def plugin_method(func):
     def send_output(send_ch: TextIO, obj: object):
         if not isinstance(obj, Message):
             return
-        send_ch.write(obj.json())
+        send_ch.write(obj.json(exclude_unset=True))
         send_ch.write('\n')
         send_ch.flush()
 
@@ -71,8 +71,9 @@ class PluginCommands:
         self._plugin.test_connection(connection)
 
     @plugin_method
-    def make_pipeline(self, ctx: dict, scopes: list[dict]):
-        yield from self._plugin.make_pipeline(self._mk_context(ctx), scopes)
+    def make_pipeline(self, scopes: list[dict]):
+        s = [self._plugin.tool_scope_type(**data) for data in scopes]
+        return self._plugin.make_pipeline(s)
 
     @plugin_method
     def run_migrations(self, force: bool):
@@ -83,9 +84,9 @@ class PluginCommands:
         return self._plugin.plugin_info()
 
     @plugin_method
-    def remote_scopes(self, connection: dict, query: str = ''):
+    def remote_scopes(self, connection: dict, group_id: Optional[str]):
         c = self._plugin.connection_type(**connection)
-        self.plugin.remote_scopes(c, query)
+        return self._plugin.remote_scopes(c, group_id)
 
     def startup(self, endpoint: str):
         self._plugin.startup(endpoint)
@@ -93,8 +94,10 @@ class PluginCommands:
     def _mk_context(self, data: dict):
         db_url = data['db_url']
         scope_id = data['scope_id']
-        connection_id = data['connection_id']
         connection = self._plugin.connection_type(**data['connection'])
-        transformation_rule = self._plugin.transformation_rule_type(**data['transformation_rule'])
+        if self._plugin.transformation_rule_type:
+            transformation_rule = self._plugin.transformation_rule_type(**data['transformation_rule'])
+        else:
+            transformation_rule = None
         options = data.get('options', {})
-        return Context(db_url, scope_id, connection_id, connection, transformation_rule, options)
+        return Context(db_url, scope_id, connection, transformation_rule, options)
diff --git a/backend/python/pydevlake/pydevlake/logger.py b/backend/python/pydevlake/pydevlake/logger.py
index 8f297ab8e..6122c285a 100644
--- a/backend/python/pydevlake/pydevlake/logger.py
+++ b/backend/python/pydevlake/pydevlake/logger.py
@@ -32,10 +32,3 @@ logging.basicConfig(
 )
 
 logger = logging.getLogger()
-
-
-if __name__ == "__main__":
-    logger.info('hey')
-    logger.debug('wut?')
-    logger.error('oops')
-    logger.critical('boom')
diff --git a/backend/python/pydevlake/pydevlake/message.py b/backend/python/pydevlake/pydevlake/message.py
index 9b2ba7456..43eef850f 100644
--- a/backend/python/pydevlake/pydevlake/message.py
+++ b/backend/python/pydevlake/pydevlake/message.py
@@ -14,7 +14,11 @@
 # limitations under the License.
 
 
-from pydantic import BaseModel
+from typing import Optional
+
+from pydantic import BaseModel, Field
+
+from pydevlake.model import ToolScope
 
 
 class Message(BaseModel):
@@ -31,11 +35,24 @@ class SubtaskMeta(BaseModel):
     arguments: list[str] = None
 
 
+class DynamicModelInfo(Message):
+    json_schema: dict
+    table_name: str
+
+    @staticmethod
+    def from_model(model_class):
+        return DynamicModelInfo(
+            json_schema=model_class.schema(),
+            table_name=model_class.__tablename__
+        )
+
+
 class PluginInfo(Message):
     name: str
     description: str
-    connection_schema: dict
-    transformation_rule_schema: dict
+    connection_model_info: DynamicModelInfo
+    transformation_rule_model_info: Optional[DynamicModelInfo]
+    scope_model_info: DynamicModelInfo
     plugin_path: str
     subtask_metas: list[SubtaskMeta]
     extension: str = "datasource"
@@ -59,14 +76,6 @@ class RemoteProgress(Message):
     total: int = 0
 
 
-class Connection(Message):
-    pass
-
-
-class TransformationRule(Message):
-    pass
-
-
 class PipelineTask(Message):
     plugin: str
     # Do not snake_case this attribute,
@@ -76,25 +85,25 @@ class PipelineTask(Message):
     options: dict[str, object]
 
 
-class PipelineStage(Message):
-    tasks: list[PipelineTask]
+class DynamicDomainScope(Message):
+	type_name: str
+	data: dict
 
 
-class PipelinePlan(Message):
-    stages: list[PipelineStage]
+class PipelineData(Message):
+    plan: list[list[PipelineTask]]
+    scopes: list[DynamicDomainScope]
 
 
-class PipelineScope(Message):
+class RemoteScopeTreeNode(Message):
     id: str
     name: str
-    table_name: str
 
 
-class BlueprintScope(Message):
-    id: str
-    name: str
+class RemoteScopeGroup(RemoteScopeTreeNode):
+    type: str = Field("group", const=True)
 
 
-class RemoteScope(Message):
-    id: str
-    name: str
+class RemoteScope(RemoteScopeTreeNode):
+    type: str = Field("scope", const=True)
+    scope: ToolScope
diff --git a/backend/python/pydevlake/pydevlake/model.py b/backend/python/pydevlake/pydevlake/model.py
index 3c8705010..c5e865065 100644
--- a/backend/python/pydevlake/pydevlake/model.py
+++ b/backend/python/pydevlake/pydevlake/model.py
@@ -28,17 +28,29 @@ import inflect
 inflect_engine = inflect.engine()
 
 
-def get_plugin_name(cls):
-    """
-    Get the plugin name from a class by looking into
-    the file path of its module.
-    """
-    module = getmodule(cls)
-    path_segments = module.__file__.split(os.sep)
-    # Finds the name of the first enclosing folder
-    # that is not a python module 
-    depth = len(module.__name__.split('.')) + 1
-    return path_segments[-depth]
+class Model(SQLModel):
+    id: int = Field(primary_key=True)
+    created_at: Optional[datetime] = Field(
+        sa_column=Column(DateTime(), default=func.now())
+    )
+    updated_at: Optional[datetime] = Field(
+        sa_column=Column(DateTime(), default=func.now(), onupdate=func.now())
+    )
+
+class ToolTable(Model):
+    @declared_attr
+    def __tablename__(cls) -> str:
+        plugin_name = _get_plugin_name(cls)
+        plural_entity = inflect_engine.plural_noun(cls.__name__.lower())
+        return f'_tool_{plugin_name}_{plural_entity}'
+
+
+class Connection(ToolTable):
+    name: str
+
+
+class TransformationRule(ToolTable):
+    name: str
 
 
 class RawModel(SQLModel):
@@ -53,8 +65,8 @@ class RawModel(SQLModel):
 class RawDataOrigin(SQLModel):
     # SQLModel doesn't like attributes starting with _
     # so we change the names of the columns.
-    raw_data_params: str = Field(sa_column_kwargs={'name':'_raw_data_params'})
-    raw_data_table: str = Field(sa_column_kwargs={'name':'_raw_data_table'})
+    raw_data_params: Optional[str] = Field(sa_column_kwargs={'name':'_raw_data_params'})
+    raw_data_table: Optional[str] = Field(sa_column_kwargs={'name':'_raw_data_table'})
     raw_data_id: Optional[str] = Field(sa_column_kwargs={'name':'_raw_data_id'})
     raw_data_remark: Optional[str] = Field(sa_column_kwargs={'name':'_raw_data_remark'})
 
@@ -64,25 +76,34 @@ class RawDataOrigin(SQLModel):
         self.raw_data_table = raw.__tablename__
 
 
-class ToolModel(RawDataOrigin):
+class NoPKModel(RawDataOrigin):
+    created_at: Optional[datetime] = Field(
+        sa_column=Column(DateTime(), default=func.now())
+    )
+    updated_at: Optional[datetime] = Field(
+        sa_column=Column(DateTime(), default=func.now(), onupdate=func.now())
+    )
+
+
+class ToolModel(ToolTable, NoPKModel):
     @declared_attr
     def __tablename__(cls) -> str:
-        plugin_name = get_plugin_name(cls)
+        plugin_name = _get_plugin_name(cls)
         plural_entity = inflect_engine.plural_noun(cls.__name__.lower())
         return f'_tool_{plugin_name}_{plural_entity}'
 
 
-class NoPKModel(SQLModel):
-    created_at: datetime = Field(
-        sa_column=Column(DateTime(), default=func.now())
-    )
-    updated_at: datetime = Field(
-        sa_column=Column(DateTime(), default=func.now(), onupdate=func.now())
-    )
+class DomainModel(NoPKModel):
+    id: str = Field(primary_key=True)
 
 
-class DomainModel(NoPKModel):
+class ToolScope(ToolModel):
     id: str = Field(primary_key=True)
+    name: str
+
+
+class DomainScope(DomainModel):
+    pass
 
 
 def generate_domain_id(tool_model: ToolModel, connection_id: str):
@@ -91,10 +112,23 @@ def generate_domain_id(tool_model: ToolModel, connection_id: str):
     from the tool entity it originates from.
     """
     model_type = type(tool_model)
-    segments = [get_plugin_name(model_type), model_type.__name__, str(connection_id)]
+    segments = [_get_plugin_name(model_type), model_type.__name__, str(connection_id)]
     mapper = inspect(model_type)
     for primary_key_column in mapper.primary_key:
         prop = mapper.get_property_by_column(primary_key_column)
         attr_val = getattr(tool_model, prop.key)
         segments.append(str(attr_val))
     return ':'.join(segments)
+
+
+def _get_plugin_name(cls):
+    """
+    Get the plugin name from a class by looking into
+    the file path of its module.
+    """
+    module = getmodule(cls)
+    path_segments = module.__file__.split(os.sep)
+    # Finds the name of the first enclosing folder
+    # that is not a python module
+    depth = len(module.__name__.split('.')) + 1
+    return path_segments[-depth]
diff --git a/backend/python/pydevlake/pydevlake/plugin.py b/backend/python/pydevlake/pydevlake/plugin.py
index fbad2b38d..9b21f6e6a 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -14,9 +14,9 @@
 # limitations under the License.
 
 
-from typing import Type, Union, Iterable
+from typing import Type, Union, Iterable, Optional
 import sys
-from abc import abstractmethod
+from abc import ABC, abstractmethod
 import requests
 
 import fire
@@ -27,10 +27,10 @@ from pydevlake.docgen import generate_doc
 from pydevlake.ipc import PluginCommands
 from pydevlake.context import Context
 from pydevlake.stream import Stream
-from pydevlake.model import DomainModel
+from pydevlake.model import ToolScope, DomainScope, Connection, TransformationRule
 
 
-class Plugin:
+class Plugin(ABC):
     def __init__(self):
         self._streams = dict()
         for stream in self.streams:
@@ -51,15 +51,20 @@ class Plugin:
 
     @property
     @abstractmethod
-    def connection_type(self) -> Type[msg.Connection]:
+    def connection_type(self) -> Type[Connection]:
         pass
 
     @property
-    def transformation_rule_type(self) -> Type[msg.TransformationRule]:
-        return msg.TransformationRule
+    @abstractmethod
+    def tool_scope_type(self) -> Type[ToolScope]:
+        pass
+
+    @property
+    def transformation_rule_type(self) -> Type[TransformationRule]:
+        return None
 
     @abstractmethod
-    def test_connection(self, connection: msg.Connection):
+    def test_connection(self, connection: Connection):
         """
         Test if the the connection with the datasource can be established with the given connection.
         Must raise an exception if the connection can't be established.
@@ -71,11 +76,15 @@ class Plugin:
         return [subtask for stream in self._streams.values() for subtask in stream.subtasks]
 
     @abstractmethod
-    def get_scopes(self, scope_name: str, connection: msg.Connection) -> Iterable[DomainModel]:
+    def domain_scopes(self, tool_scope: ToolScope) -> Iterable[DomainScope]:
+        pass
+
+    @abstractmethod
+    def remote_scopes(self, connection: Connection, group_id: str) -> list[ToolScope]:
         pass
 
     @abstractmethod
-    def remote_scopes(self, connection: msg.Connection, query: str = ''):
+    def remote_scope_groups(self, connection: Connection) -> list[msg.RemoteScopeGroup]:
         pass
 
     @property
@@ -95,39 +104,53 @@ class Plugin:
         # TODO: Create tables
         pass
 
-    def make_pipeline(self, ctx: Context, scopes: list[msg.BlueprintScope]):
+    def make_remote_scopes(self, connection: Connection, group_id: Optional[str]) -> list[msg.RemoteScopeTreeNode]:
+        if group_id:
+            return [
+                msg.RemoteScope(
+                    id=tool_scope.id,
+                    name=tool_scope.name,
+                    scope=tool_scope
+                )
+                for tool_scope
+                in self.remote_scopes(connection, group_id)
+            ]
+        else:
+            return self.remote_scope_groups(connection)
+
+    def make_pipeline(self, tool_scopes: list[ToolScope]):
         """
         Make a simple pipeline using the scopes declared by the plugin.
         """
-        stages = [
-            msg.PipelineStage(
-                tasks=[
-                    msg.PipelineTask(
-                        self.name,
-                        skipOnFail=False,
-                        subtasks=[t.name for t in self.subtasks],
-                        options={
-                            "scopeId": scope.id,
-                            "scopeName": scope.name}
-                    )
-                ]
+        plan = self.make_pipeline_plan(tool_scopes)
+        domain_scopes = [
+            msg.DynamicDomainScope(
+                type_name=type(scope).__name__,
+                data=scope.dict(exclude_unset=True)
             )
-            for scope in scopes
+            for tool_scope in tool_scopes
+            for scope in self.domain_scopes(tool_scope)
         ]
+        return msg.PipelineData(
+            plan=plan,
+            scopes=domain_scopes
+        )
 
-        plan = msg.PipelinePlan(stages=stages)
-        yield plan
-
-        scopes = [
-            msg.PipelineScope(
-                id=':'.join([self.name, type(scope).__name__, ctx.connection_id, bp_scope.id]),
-                name=bp_scope.name,
-                table_name=scope.__tablename__
+    def make_pipeline_plan(self, scopes: list[ToolScope]) -> list[list[msg.PipelineTask]]:
+        return [self.make_pipeline_stage(scope) for scope in scopes]
+
+    def make_pipeline_stage(self, scope: ToolScope) -> list[msg.PipelineTask]:
+        return [
+            msg.PipelineTask(
+                plugin=self.name,
+                skipOnFail=False,
+                subtasks=[t.name for t in self.subtasks],
+                options={
+                    "scopeId": scope.id,
+                    "scopeName": scope.name
+                }
             )
-            for bp_scope in scopes
-            for scope in self.get_scopes(bp_scope.name, ctx.connection)
         ]
-        yield scopes
 
     def get_stream(self, stream_name: str):
         stream = self._streams.get(stream_name)
@@ -148,7 +171,7 @@ class Plugin:
         if resp.status_code != 200:
             raise Exception(f"unexpected http status code {resp.status_code}: {resp.content}")
 
-    def plugin_info(self):
+    def plugin_info(self) -> msg.PluginInfo:
         subtask_metas = [
             msg.SubtaskMeta(
                 name=subtask.name,
@@ -157,18 +180,24 @@ class Plugin:
                 required=True,
                 enabled_by_default=True,
                 description=subtask.description,
-                domain_types=[dm.__name__ for dm in subtask.stream.domain_models]
+                domain_types=[dm.value for dm in subtask.stream.domain_types]
             )
             for subtask in self.subtasks
         ]
 
+        if self.transformation_rule_type:
+            tx_rule_model_info = msg.DynamicModelInfo.from_model(self.transformation_rule_type)
+        else:
+            tx_rule_model_info = None
+
         return msg.PluginInfo(
             name=self.name,
             description=self.description,
             plugin_path=self._plugin_path(),
             extension="datasource",
-            connection_schema=self.connection_type.schema(),
-            transformation_rule_schema=self.transformation_rule_type.schema(),
+            connection_model_info=msg.DynamicModelInfo.from_model(self.connection_type),
+            transformation_rule_model_info=tx_rule_model_info,
+            scope_model_info=msg.DynamicModelInfo.from_model(self.tool_scope_type),
             subtask_metas=subtask_metas
         )
 
diff --git a/backend/python/pydevlake/pydevlake/stream.py b/backend/python/pydevlake/pydevlake/stream.py
index 7422bd0de..4072777cc 100644
--- a/backend/python/pydevlake/pydevlake/stream.py
+++ b/backend/python/pydevlake/pydevlake/stream.py
@@ -16,11 +16,21 @@
 
 from typing import Iterable, Type
 from abc import abstractmethod
+from enum import Enum
 
 from pydevlake.subtasks import Collector, Extractor, Convertor, SubstreamCollector
 from pydevlake.model import RawModel, ToolModel, DomainModel
 
 
+class DomainType(Enum):
+    CODE = "CODE"
+    TICKET = "TICKET"
+    CODE_REVIEW = "CODEREVIEW"
+    CROSS = "CROSS"
+    CICD = "CICD"
+    CODE_QUALITY = "CODEQUALITY"
+
+
 class Stream:
     def __init__(self, plugin_name: str):
         self.plugin_name = plugin_name
@@ -46,14 +56,10 @@ class Stream:
         pass
 
     @property
-    def domain_model(self) -> Type[DomainModel]:
+    @abstractmethod
+    def domain_types(self) -> list[DomainType]:
         pass
 
-    @property
-    def domain_models(self) -> Type[DomainModel]:
-        assert self.domain_model, "Streams must declare their domain_model or domain_models"
-        return [self.domain_model]
-
     def raw_model(self, session) -> Type[RawModel]:
         if self._raw_model is not None:
             return self._raw_model
diff --git a/backend/python/pydevlake/pydevlake/subtasks.py b/backend/python/pydevlake/pydevlake/subtasks.py
index 343c29ac5..865b930ab 100644
--- a/backend/python/pydevlake/pydevlake/subtasks.py
+++ b/backend/python/pydevlake/pydevlake/subtasks.py
@@ -47,9 +47,9 @@ class Subtask:
 
     def run(self, ctx: Context, sync_point_interval=100):
         with Session(ctx.engine) as session:
-            subtask_run = self._start_subtask(session, ctx.connection_id)
+            subtask_run = self._start_subtask(session, ctx.connection.id)
             if ctx.incremental:
-                state = self._get_last_state(session, ctx.connection_id)
+                state = self._get_last_state(session, ctx.connection.id)
             else:
                 self.delete(session, ctx)
                 state = dict()
@@ -151,7 +151,7 @@ class Collector(Subtask):
 
     def _params(self, ctx: Context) -> str:
         return json.dumps({
-            "connection_id": ctx.connection_id,
+            "connection_id": ctx.connection.id,
             "scope_id": ctx.scope_id
         })
 
@@ -199,9 +199,9 @@ class Convertor(Subtask):
         res = self.stream.convert(tool_model)
         if isinstance(res, Generator):
             for each in self.stream.convert(tool_model):
-                self._save(tool_model, each, session, ctx.connection_id)
+                self._save(tool_model, each, session, ctx.connection.id)
         else:
-            self._save(tool_model, res, session, ctx.connection_id)
+            self._save(tool_model, res, session, ctx.connection.id)
 
     def _save(self, tool_model: ToolModel, domain_model: DomainModel, session: Session, connection_id: int):
         if not isinstance(domain_model, DomainModel):
diff --git a/backend/python/pydevlake/test/stream_test.py b/backend/python/pydevlake/test/stream_test.py
index 234c7e79a..3aac3c724 100644
--- a/backend/python/pydevlake/test/stream_test.py
+++ b/backend/python/pydevlake/test/stream_test.py
@@ -19,7 +19,7 @@ import json
 import pytest
 from sqlmodel import Session, Field
 
-from pydevlake import Stream, Connection, Context
+from pydevlake import Stream, Connection, Context, DomainType
 from pydevlake.model import ToolModel, DomainModel
 
 
@@ -34,7 +34,7 @@ class DummyDomainModel(DomainModel, table=True):
 
 class DummyStream(Stream):
     tool_model=DummyToolModel
-    domain_model=DummyDomainModel
+    domain_types=[DomainType.CROSS]
 
     def collect(self, state, context):
         for i, each in enumerate(context.connection.raw_data):
@@ -68,7 +68,7 @@ def raw_data():
 
 @pytest.fixture
 def connection(raw_data):
-    return DummyConnection(raw_data=raw_data)
+    return DummyConnection(id=11, raw_data=raw_data)
 
 
 @pytest.fixture
@@ -76,7 +76,6 @@ def ctx(connection):
     return Context(
         db_url="sqlite+pysqlite:///:memory:",
         scope_id="1",
-        connection_id=11,
         connection=connection,
         options={}
     )
@@ -124,7 +123,7 @@ def test_convert_data(stream, raw_data, ctx):
                     id=each["i"],
                     name=each["n"],
                     raw_data_table="_raw_dummy_model",
-                    raw_data_params=json.dumps({"connection_id": ctx.connection_id, "scope_id": ctx.scope_id})
+                    raw_data_params=json.dumps({"connection_id": ctx.connection.id, "scope_id": ctx.scope_id})
                 )
             )
         session.commit()
diff --git a/backend/server/services/remote/models/conversion.go b/backend/server/services/remote/models/conversion.go
index 5b07efdae..6d77bf37c 100644
--- a/backend/server/services/remote/models/conversion.go
+++ b/backend/server/services/remote/models/conversion.go
@@ -24,18 +24,17 @@ import (
 
 	"github.com/apache/incubator-devlake/core/errors"
 	"github.com/apache/incubator-devlake/core/models"
-	"github.com/apache/incubator-devlake/core/models/common"
 )
 
-func LoadTableModel(tableName string, schema map[string]any) (*models.DynamicTabler, errors.Error) {
-	structType, err := GenerateStructType(schema, reflect.TypeOf(common.Model{}))
+func LoadTableModel(tableName string, schema map[string]any, encrypt bool, parentModel any) (*models.DynamicTabler, errors.Error) {
+	structType, err := GenerateStructType(schema, encrypt, reflect.TypeOf(parentModel))
 	if err != nil {
 		return nil, err
 	}
 	return models.NewDynamicTabler(tableName, structType), nil
 }
 
-func GenerateStructType(schema map[string]any, baseType reflect.Type) (reflect.Type, errors.Error) {
+func GenerateStructType(schema map[string]any, encrypt bool, baseType reflect.Type) (reflect.Type, errors.Error) {
 	var structFields []reflect.StructField
 	propsRaw, ok := schema["properties"]
 	if !ok {
@@ -55,8 +54,11 @@ func GenerateStructType(schema map[string]any, baseType reflect.Type) (reflect.T
 		structFields = append(structFields, anonymousField)
 	}
 	for k, v := range props {
+		if isBaseTypeField(k, baseType) {
+			continue
+		}
 		spec := v.(map[string]any)
-		field, err := generateStructField(k, spec)
+		field, err := generateStructField(k, encrypt, spec)
 		if err != nil {
 			return nil, err
 		}
@@ -65,19 +67,42 @@ func GenerateStructType(schema map[string]any, baseType reflect.Type) (reflect.T
 	return reflect.StructOf(structFields), nil
 }
 
-func generateStructField(name string, schema map[string]any) (*reflect.StructField, errors.Error) {
-	tag := reflect.StructTag(fmt.Sprintf("json:\"%s\" "+
-		"gorm:\"serializer:encdec\"", //just encrypt everything for GORM operations - makes things easy
-		name))
+func isBaseTypeField(fieldName string, baseType reflect.Type) bool {
+	fieldName = canonicalFieldName(fieldName)
+	for i := 0; i < baseType.NumField(); i++ {
+		baseField := baseType.Field(i)
+		if baseField.Anonymous {
+			if isBaseTypeField(fieldName, baseField.Type) {
+				return true
+			}
+		}
+		if fieldName == canonicalFieldName(baseField.Name) {
+			return true
+		}
+	}
+	return false
+}
+
+func canonicalFieldName(fieldName string) string {
+	return strings.ToLower(strings.Replace(fieldName, "_", "", -1))
+}
+
+func generateStructField(name string, encrypt bool, schema map[string]any) (*reflect.StructField, errors.Error) {
 	goType, err := getGoType(schema)
 	if err != nil {
 		return nil, err
 	}
-	return &reflect.StructField{
+	sf := &reflect.StructField{
 		Name: strings.Title(name), //nolint:staticcheck
 		Type: goType,
-		Tag:  tag,
-	}, nil
+		Tag:  reflect.StructTag(fmt.Sprintf("json:\"%s\"", name)),
+	}
+	if encrypt {
+		sf.Tag = reflect.StructTag(fmt.Sprintf("json:\"%s\" "+
+			"gorm:\"serializer:encdec\"", //just encrypt everything for GORM operations - makes things easy
+			name))
+	}
+	return sf, nil
 }
 
 func getGoType(schema map[string]any) (reflect.Type, errors.Error) {
@@ -90,6 +115,8 @@ func getGoType(schema map[string]any) (reflect.Type, errors.Error) {
 	//TODO: support more types
 	case "integer":
 		goType = reflect.TypeOf(uint64(0))
+	case "boolean":
+		goType = reflect.TypeOf(false)
 	case "string":
 		//TODO: distinguish stypes based on string format
 		goType = reflect.TypeOf("")
diff --git a/backend/server/services/remote/models/dynamic_domain_scopes.go b/backend/server/services/remote/models/dynamic_domain_scopes.go
new file mode 100644
index 000000000..527c577c1
--- /dev/null
+++ b/backend/server/services/remote/models/dynamic_domain_scopes.go
@@ -0,0 +1,66 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import (
+	"fmt"
+
+	"github.com/mitchellh/mapstructure"
+
+	"github.com/apache/incubator-devlake/core/errors"
+	"github.com/apache/incubator-devlake/core/models/domainlayer/code"
+	"github.com/apache/incubator-devlake/core/models/domainlayer/codequality"
+	"github.com/apache/incubator-devlake/core/models/domainlayer/devops"
+	"github.com/apache/incubator-devlake/core/models/domainlayer/ticket"
+	"github.com/apache/incubator-devlake/core/plugin"
+)
+
+func newScopeByTypeName(typeName string) (plugin.Scope, errors.Error) {
+	switch typeName {
+	case "Repo":
+		return &code.Repo{}, nil
+	case "CqProject":
+		return &codequality.CqProject{}, nil
+	case "CicdScope":
+		return &devops.CicdScope{}, nil
+	case "Board":
+		return &ticket.Board{}, nil
+	default:
+		return nil, errors.BadInput.New(fmt.Sprintf("Unknown scope type %s", typeName))
+	}
+}
+
+func (d DynamicDomainScope) Load() (plugin.Scope, errors.Error) {
+	scope, type_err := newScopeByTypeName(d.TypeName)
+	if type_err != nil {
+		return nil, type_err
+	}
+	config := &mapstructure.DecoderConfig{
+		TagName: "json",
+		Result:  scope,
+	}
+	decoder, err := mapstructure.NewDecoder(config)
+	if err != nil {
+		return nil, errors.Convert(err)
+	}
+	err = decoder.Decode(d.Data)
+	if err != nil {
+		return nil, errors.Convert(err)
+	}
+	return scope, nil
+}
diff --git a/backend/server/services/remote/models/models.go b/backend/server/services/remote/models/models.go
index c82783f32..7dbcfb659 100644
--- a/backend/server/services/remote/models/models.go
+++ b/backend/server/services/remote/models/models.go
@@ -17,6 +17,15 @@ limitations under the License.
 
 package models
 
+import (
+	"time"
+
+	"github.com/apache/incubator-devlake/core/errors"
+	"github.com/apache/incubator-devlake/core/models"
+	"github.com/apache/incubator-devlake/core/models/common"
+	"github.com/apache/incubator-devlake/core/plugin"
+)
+
 const (
 	PythonPoetryCmd PluginType      = "python-poetry"
 	PythonCmd       PluginType      = "python"
@@ -31,15 +40,38 @@ type (
 )
 
 type PluginInfo struct {
-	Type                     PluginType      `json:"type" validate:"required"`
-	Name                     string          `json:"name" validate:"required"`
-	Extension                PluginExtension `json:"extension"`
-	ConnectionSchema         map[string]any  `json:"connection_schema" validate:"required"`
-	TransformationRuleSchema map[string]any  `json:"transformation_rule_schema" validate:"required"`
-	Description              string          `json:"description"`
-	PluginPath               string          `json:"plugin_path" validate:"required"`
-	ApiEndpoints             []Endpoint      `json:"api_endpoints" validate:"dive"`
-	SubtaskMetas             []SubtaskMeta   `json:"subtask_metas" validate:"dive"`
+	Type                        PluginType        `json:"type" validate:"required"`
+	Name                        string            `json:"name" validate:"required"`
+	Extension                   PluginExtension   `json:"extension"`
+	ConnectionModelInfo         *DynamicModelInfo `json:"connection_model_info" validate:"required"`
+	TransformationRuleModelInfo *DynamicModelInfo `json:"transformation_rule_model_info"`
+	ScopeModelInfo              *DynamicModelInfo `json:"scope_model_info" validate:"dive"`
+	Description                 string            `json:"description"`
+	PluginPath                  string            `json:"plugin_path" validate:"required"`
+	SubtaskMetas                []SubtaskMeta     `json:"subtask_metas" validate:"dive"`
+}
+
+type DynamicModelInfo struct {
+	JsonSchema map[string]any `json:"json_schema" validate:"required"`
+	TableName  string         `json:"table_name" validate:"required"`
+}
+
+func (d DynamicModelInfo) LoadDynamicTabler(encrypt bool, parentModel any) (*models.DynamicTabler, errors.Error) {
+	return LoadTableModel(d.TableName, d.JsonSchema, encrypt, parentModel)
+}
+
+type ScopeModel struct {
+	common.NoPKModel
+	Id                   string `gorm:"primarykey;type:varchar(255)" json:"id"`
+	ConnectionId         uint64 `gorm:"primaryKey" json:"connection_id"`
+	TransformationRuleId uint64 `json:"transformation_rule_id"`
+}
+
+type TransformationModel struct {
+	Id        uint64    `gorm:"primaryKey" json:"id"`
+	CreatedAt time.Time `json:"createdAt"`
+	UpdatedAt time.Time `json:"updatedAt"`
+	Name      string
 }
 
 type SubtaskMeta struct {
@@ -52,8 +84,12 @@ type SubtaskMeta struct {
 	DomainTypes      []string `json:"domain_types" validate:"required"`
 }
 
-type Endpoint struct {
-	Resource string `json:"resource" validate:"required"`
-	Handler  string `json:"handler" validate:"required"`
-	Method   string `json:"method" validate:"required"`
+type DynamicDomainScope struct {
+	TypeName string                 `json:"type_name"`
+	Data     map[string]interface{} `json:"data"`
+}
+
+type PipelineData struct {
+	Plan   plugin.PipelinePlan  `json:"plan"`
+	Scopes []DynamicDomainScope `json:"scopes"`
 }
diff --git a/backend/server/services/remote/models/plugin_models.go b/backend/server/services/remote/models/plugin_models.go
deleted file mode 100644
index 4284112d6..000000000
--- a/backend/server/services/remote/models/plugin_models.go
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package models
-
-type (
-	PipelineScope struct {
-		ScopeId   string
-		ScopeName string
-		TableName string
-	}
-	WrappedPipelineScope struct {
-		Scope PipelineScope
-	}
-)
-
-func (p *WrappedPipelineScope) ScopeId() string {
-	return p.Scope.ScopeId
-}
-
-func (p *WrappedPipelineScope) ScopeName() string {
-	return p.Scope.ScopeName
-}
-
-func (p *WrappedPipelineScope) TableName() string {
-	return p.Scope.TableName
-}
diff --git a/backend/server/services/remote/plugin/default_api.go b/backend/server/services/remote/plugin/default_api.go
index 59bd80801..6245fa5a7 100644
--- a/backend/server/services/remote/plugin/default_api.go
+++ b/backend/server/services/remote/plugin/default_api.go
@@ -28,6 +28,7 @@ type pluginAPI struct {
 	invoker    bridge.Invoker
 	connType   *models.DynamicTabler
 	txRuleType *models.DynamicTabler
+	scopeType  *models.DynamicTabler
 	helper     *api.ConnectionApiHelper
 }
 
@@ -35,15 +36,17 @@ func GetDefaultAPI(
 	invoker bridge.Invoker,
 	connType *models.DynamicTabler,
 	txRuleType *models.DynamicTabler,
+	scopeType *models.DynamicTabler,
 	helper *api.ConnectionApiHelper) map[string]map[string]plugin.ApiResourceHandler {
 	papi := &pluginAPI{
 		invoker:    invoker,
 		connType:   connType,
 		txRuleType: txRuleType,
+		scopeType:  scopeType,
 		helper:     helper,
 	}
 
-	return map[string]map[string]plugin.ApiResourceHandler{
+	resources := map[string]map[string]plugin.ApiResourceHandler{
 		"test": {
 			"POST": papi.TestConnection,
 		},
@@ -60,7 +63,7 @@ func GetDefaultAPI(
 			"PUT": papi.PutScope,
 			"GET": papi.ListScopes,
 		},
-		"connections/:connectionId/scopes/*scopeId": {
+		"connections/:connectionId/scopes/:scopeId": {
 			"GET":   papi.GetScope,
 			"PATCH": papi.PatchScope,
 		},
@@ -68,15 +71,20 @@ func GetDefaultAPI(
 			"GET": papi.GetRemoteScopes,
 		},
 		"connections/:connectionId/search-remote-scopes": {
-			"GET": papi.GetRemoteScopes,
+			"GET": papi.SearchRemoteScopes,
 		},
-		"transformation_rules": {
+	}
+
+	if txRuleType != nil {
+		resources["transformation_rules"] = map[string]plugin.ApiResourceHandler{
 			"POST": papi.PostTransformationRules,
 			"GET":  papi.ListTransformationRules,
-		},
-		"transformation_rules/:id": {
+		}
+		resources["transformation_rules/:id"] = map[string]plugin.ApiResourceHandler{
 			"GET":   papi.GetTransformationRule,
 			"PATCH": papi.PatchTransformationRule,
-		},
+		}
 	}
+
+	return resources
 }
diff --git a/backend/server/services/remote/plugin/init.go b/backend/server/services/remote/plugin/init.go
index 676fa14d9..cd75b2740 100644
--- a/backend/server/services/remote/plugin/init.go
+++ b/backend/server/services/remote/plugin/init.go
@@ -53,9 +53,9 @@ func NewRemotePlugin(info *models.PluginInfo) (models.RemotePlugin, errors.Error
 	case models.None:
 		return plugin, nil
 	case models.Metric:
-		return remoteMetricPlugin{plugin}, nil
+		return &remoteMetricPlugin{plugin}, nil
 	case models.Datasource:
-		return remoteDatasourcePlugin{plugin}, nil
+		return &remoteDatasourcePlugin{plugin}, nil
 	default:
 		return nil, errors.BadInput.New("unsupported plugin extension")
 	}
diff --git a/backend/server/services/remote/plugin/plugin_extensions.go b/backend/server/services/remote/plugin/plugin_extensions.go
index e81abeed3..61b1e436e 100644
--- a/backend/server/services/remote/plugin/plugin_extensions.go
+++ b/backend/server/services/remote/plugin/plugin_extensions.go
@@ -20,8 +20,10 @@ package plugin
 import (
 	"encoding/json"
 
+	"github.com/apache/incubator-devlake/core/dal"
 	"github.com/apache/incubator-devlake/core/errors"
 	"github.com/apache/incubator-devlake/core/plugin"
+	"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
 	"github.com/apache/incubator-devlake/server/services/remote/bridge"
 	"github.com/apache/incubator-devlake/server/services/remote/models"
 )
@@ -35,28 +37,44 @@ type (
 	}
 )
 
-func (p *remoteMetricPlugin) MakeMetricPluginPipelinePlanV200(projectName string, options json.RawMessage) (plugin.PipelinePlan, errors.Error) {
+func (p remoteMetricPlugin) MakeMetricPluginPipelinePlanV200(projectName string, options json.RawMessage) (plugin.PipelinePlan, errors.Error) {
 	return nil, errors.Internal.New("Remote metric plugins not supported")
 }
 
-func (p *remoteDatasourcePlugin) MakeDataSourcePipelinePlanV200(connectionId uint64, bpScopes []*plugin.BlueprintScopeV200, syncPolicy plugin.BlueprintSyncPolicy) (plugin.PipelinePlan, []plugin.Scope, errors.Error) {
+func (p remoteDatasourcePlugin) MakeDataSourcePipelinePlanV200(connectionId uint64, bpScopes []*plugin.BlueprintScopeV200, syncPolicy plugin.BlueprintSyncPolicy) (plugin.PipelinePlan, []plugin.Scope, errors.Error) {
 	connection := p.connectionTabler.New()
 	err := connectionHelper.FirstById(connection, connectionId)
 	if err != nil {
 		return nil, nil, err
 	}
 
-	plan := plugin.PipelinePlan{}
-	var scopes []models.PipelineScope
-	err = p.invoker.Call("make-pipeline", bridge.DefaultContext, connectionId, bpScopes).Get(&plan, &scopes)
+	db := basicRes.GetDal()
+	var toolScopes = make([]interface{}, len(bpScopes))
+	for i, bpScope := range bpScopes {
+		toolScope := p.scopeTabler.New()
+		err = api.CallDB(db.First, toolScope, dal.Where("id = ?", bpScope.Id))
+		if err != nil {
+			return nil, nil, errors.NotFound.New("record not found")
+		}
+		toolScopes[i] = toolScope.Unwrap()
+	}
+
+	plan_data := models.PipelineData{}
+	err = p.invoker.Call("make-pipeline", bridge.DefaultContext, toolScopes).Get(&plan_data)
 	if err != nil {
 		return nil, nil, err
 	}
-	var castedScopes []plugin.Scope
-	for _, scope := range scopes {
-		castedScopes = append(castedScopes, &models.WrappedPipelineScope{Scope: scope})
+
+	var scopes = make([]plugin.Scope, len(plan_data.Scopes))
+	for i, dynamicScope := range plan_data.Scopes {
+		scope, err := dynamicScope.Load()
+		if err != nil {
+			return nil, nil, err
+		}
+		scopes[i] = scope
 	}
-	return plan, castedScopes, nil
+
+	return plan_data.Plan, scopes, nil
 }
 
 var _ models.RemotePlugin = (*remoteMetricPlugin)(nil)
diff --git a/backend/server/services/remote/plugin/plugin_impl.go b/backend/server/services/remote/plugin/plugin_impl.go
index 8b34b60d7..80df3650c 100644
--- a/backend/server/services/remote/plugin/plugin_impl.go
+++ b/backend/server/services/remote/plugin/plugin_impl.go
@@ -18,11 +18,10 @@ limitations under the License.
 package plugin
 
 import (
-	"fmt"
-
 	"github.com/apache/incubator-devlake/core/dal"
 	"github.com/apache/incubator-devlake/core/errors"
 	coreModels "github.com/apache/incubator-devlake/core/models"
+	"github.com/apache/incubator-devlake/core/models/common"
 	"github.com/apache/incubator-devlake/core/plugin"
 	"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
 	"github.com/apache/incubator-devlake/server/services/remote/bridge"
@@ -37,13 +36,13 @@ type (
 		description              string
 		invoker                  bridge.Invoker
 		connectionTabler         *coreModels.DynamicTabler
+		scopeTabler              *coreModels.DynamicTabler
 		transformationRuleTabler *coreModels.DynamicTabler
 		resources                map[string]map[string]plugin.ApiResourceHandler
 	}
 	RemotePluginTaskData struct {
 		DbUrl              string                 `json:"db_url"`
 		ScopeId            string                 `json:"scope_id"`
-		ConnectionId       uint64                 `json:"connection_id"`
 		Connection         interface{}            `json:"connection"`
 		TransformationRule interface{}            `json:"transformation_rule"`
 		Options            map[string]interface{} `json:"options"`
@@ -51,26 +50,31 @@ type (
 )
 
 func newPlugin(info *models.PluginInfo, invoker bridge.Invoker) (*remotePluginImpl, errors.Error) {
-	connectionTableName := fmt.Sprintf("_tool_%s_connections", info.Name)
-	connectionTabler, err := models.LoadTableModel(connectionTableName, info.ConnectionSchema)
+	connectionTabler, err := info.ConnectionModelInfo.LoadDynamicTabler(true, common.Model{})
 	if err != nil {
 		return nil, err
 	}
 
-	txRuleTableName := fmt.Sprintf("_tool_%s_transformation_rules", info.Name)
-	txRuleTabler, err := models.LoadTableModel(txRuleTableName, info.TransformationRuleSchema)
+	var txRuleTabler *coreModels.DynamicTabler
+	if info.TransformationRuleModelInfo != nil {
+		txRuleTabler, err = info.TransformationRuleModelInfo.LoadDynamicTabler(false, models.TransformationModel{})
+		if err != nil {
+			return nil, err
+		}
+	}
+	scopeTabler, err := info.ScopeModelInfo.LoadDynamicTabler(false, models.ScopeModel{})
 	if err != nil {
 		return nil, err
 	}
-
 	p := remotePluginImpl{
 		name:                     info.Name,
 		invoker:                  invoker,
 		pluginPath:               info.PluginPath,
 		description:              info.Description,
 		connectionTabler:         connectionTabler,
+		scopeTabler:              scopeTabler,
 		transformationRuleTabler: txRuleTabler,
-		resources:                GetDefaultAPI(invoker, connectionTabler, txRuleTabler, connectionHelper),
+		resources:                GetDefaultAPI(invoker, connectionTabler, txRuleTabler, scopeTabler, connectionHelper),
 	}
 	remoteBridge := bridge.NewBridge(invoker)
 	for _, subtask := range info.SubtaskMetas {
@@ -94,38 +98,43 @@ func (p *remotePluginImpl) PrepareTaskData(taskCtx plugin.TaskContext, options m
 	dbUrl := taskCtx.GetConfig("db_url")
 	connectionId := uint64(options["connectionId"].(float64))
 
-	connectionHelper := api.NewConnectionHelper(
+	helper := api.NewConnectionHelper(
 		taskCtx,
 		nil,
 	)
 
-	connection := p.connectionTabler.New()
-	err := connectionHelper.FirstById(connection, connectionId)
+	wrappedConnection := p.connectionTabler.New()
+	err := helper.FirstById(wrappedConnection, connectionId)
 	if err != nil {
 		return nil, errors.Convert(err)
 	}
+	connection := wrappedConnection.Unwrap()
 
 	scopeId, ok := options["scopeId"].(string)
 	if !ok {
 		return nil, errors.BadInput.New("missing scopeId")
 	}
 
-	txRule := p.transformationRuleTabler.New()
+	var txRule interface{}
 	txRuleId, ok := options["transformation_rule_id"].(uint64)
 	if ok {
+		wrappedTxRule := p.transformationRuleTabler.New()
 		db := taskCtx.GetDal()
-		err = db.First(&txRule, dal.Where("id = ?", txRuleId))
+		err = db.First(&wrappedTxRule, dal.Where("id = ?", txRuleId))
 		if err != nil {
 			return nil, errors.BadInput.New("invalid transformation rule id")
 		}
+		txRule = wrappedTxRule.Unwrap()
+	} else {
+		txRule = nil
 	}
 
 	return RemotePluginTaskData{
 		DbUrl:              dbUrl,
 		ScopeId:            scopeId,
-		ConnectionId:       connectionId,
-		Connection:         connection.Unwrap(),
+		Connection:         connection,
 		TransformationRule: txRule,
+		Options:            options,
 	}, nil
 }
 
@@ -150,15 +159,18 @@ func (p *remotePluginImpl) RunMigrations(forceMigrate bool) errors.Error {
 	if err != nil {
 		return err
 	}
-
-	err = api.CallDB(basicRes.GetDal().AutoMigrate, p.transformationRuleTabler.New())
+	if p.transformationRuleTabler != nil {
+		err = api.CallDB(basicRes.GetDal().AutoMigrate, p.transformationRuleTabler.New())
+		if err != nil {
+			return err
+		}
+	}
+	err = api.CallDB(basicRes.GetDal().AutoMigrate, p.scopeTabler.New())
 	if err != nil {
 		return err
 	}
-
 	err = p.invoker.Call("run-migrations", bridge.DefaultContext, forceMigrate).Get()
 	return err
 }
 
 var _ models.RemotePlugin = (*remotePluginImpl)(nil)
-var _ plugin.Scope = (*models.WrappedPipelineScope)(nil)
diff --git a/backend/server/services/remote/plugin/remote_scope_api.go b/backend/server/services/remote/plugin/remote_scope_api.go
index fc428bd9a..e0fb2d183 100644
--- a/backend/server/services/remote/plugin/remote_scope_api.go
+++ b/backend/server/services/remote/plugin/remote_scope_api.go
@@ -26,8 +26,22 @@ import (
 	"github.com/apache/incubator-devlake/server/services/remote/bridge"
 )
 
+type ScopeItem struct {
+	ScopeId              string `json:"scopeId"`
+	ScopeName            string `json:"scopeName"`
+	ConnectionId         uint64 `json:"connectionId"`
+	TransformationRuleId uint64 `json:"transformationRuleId,omitempty"`
+}
+
 type RemoteScopesOutput struct {
-	Children []ScopeItem `json:"children"`
+	Children []RemoteScopesTreeNode `json:"children"`
+}
+
+type RemoteScopesTreeNode struct {
+	Type string      `json:"type"`
+	Id   string      `json:"id"`
+	Name string      `json:"name"`
+	Data interface{} `json:"data"`
 }
 
 func (pa *pluginAPI) GetRemoteScopes(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
@@ -42,20 +56,19 @@ func (pa *pluginAPI) GetRemoteScopes(input *plugin.ApiResourceInput) (*plugin.Ap
 		return nil, err
 	}
 
-	query, ok := input.Params["query"]
-	if !ok {
-		query = ""
-	}
-
-	var scopes []ScopeItem
-	err = pa.invoker.Call("remote-scopes", bridge.DefaultContext, connection, query).Get(&scopes)
+	var remoteScopes []RemoteScopesTreeNode
+	err = pa.invoker.Call("remote-scopes", bridge.DefaultContext, connection.Unwrap()).Get(&remoteScopes)
 	if err != nil {
 		return nil, err
 	}
 
-	res := RemoteScopesOutput{
-		Children: scopes,
+	output := RemoteScopesOutput{
+		Children: remoteScopes,
 	}
 
-	return &plugin.ApiResourceOutput{Body: res, Status: http.StatusOK}, nil
+	return &plugin.ApiResourceOutput{Body: output, Status: http.StatusOK}, nil
+}
+
+func (pa *pluginAPI) SearchRemoteScopes(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	return &plugin.ApiResourceOutput{Status: http.StatusNotImplemented}, nil
 }
diff --git a/backend/server/services/remote/plugin/scope_api.go b/backend/server/services/remote/plugin/scope_api.go
index e793d6119..d27e02bd6 100644
--- a/backend/server/services/remote/plugin/scope_api.go
+++ b/backend/server/services/remote/plugin/scope_api.go
@@ -21,6 +21,8 @@ import (
 	"net/http"
 	"strconv"
 
+	"github.com/apache/incubator-devlake/server/services/remote/models"
+
 	"github.com/mitchellh/mapstructure"
 
 	"github.com/apache/incubator-devlake/core/dal"
@@ -29,22 +31,14 @@ import (
 	"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
 )
 
-type ScopeItem struct {
-	ScopeId              string `json:"scopeId"`
-	ScopeName            string `json:"scopeName"`
-	ConnectionId         uint64 `json:"connectionId"`
-	TransformationRuleId uint64 `json:"transformationRuleId,omitempty"`
-}
-
 // DTO that includes the transformation rule name
 type apiScopeResponse struct {
-	Scope                  ScopeItem
+	Scope                  any
 	TransformationRuleName string `json:"transformationRuleId,omitempty"`
 }
 
-// Why a batch PUT?
 type request struct {
-	Data []*ScopeItem `json:"data"`
+	Data []map[string]any `json:"data"`
 }
 
 func (pa *pluginAPI) PutScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
@@ -52,34 +46,38 @@ func (pa *pluginAPI) PutScope(input *plugin.ApiResourceInput) (*plugin.ApiResour
 	if connectionId == 0 {
 		return nil, errors.BadInput.New("invalid connectionId")
 	}
-
 	var scopes request
 	err := errors.Convert(mapstructure.Decode(input.Body, &scopes))
 	if err != nil {
 		return nil, errors.BadInput.Wrap(err, "decoding scope error")
 	}
-
 	keeper := make(map[string]struct{})
-	for _, scope := range scopes.Data {
-		if _, ok := keeper[scope.ScopeId]; ok {
+	var createdScopes []any
+	for _, scopeRaw := range scopes.Data {
+		err = verifyScope(scopeRaw)
+		if err != nil {
+			return nil, err
+		}
+		scopeId := scopeRaw["id"].(string)
+		if _, ok := keeper[scopeId]; ok {
 			return nil, errors.BadInput.New("duplicated item")
 		} else {
-			keeper[scope.ScopeId] = struct{}{}
+			keeper[scopeId] = struct{}{}
 		}
-		scope.ConnectionId = connectionId
-
-		err = verifyScope(scope)
+		scope := pa.scopeType.New()
+		err = scope.From(&scopeRaw)
 		if err != nil {
 			return nil, err
 		}
+		// I don't know the reflection logic to do this in a batch...
+		err = api.CallDB(basicRes.GetDal().CreateOrUpdate, scope)
+		if err != nil {
+			return nil, errors.Default.Wrap(err, "error on saving scope")
+		}
+		createdScopes = append(createdScopes, scope.Unwrap())
 	}
 
-	err = basicRes.GetDal().CreateOrUpdate(scopes.Data)
-	if err != nil {
-		return nil, errors.Default.Wrap(err, "error on saving scope")
-	}
-
-	return &plugin.ApiResourceOutput{Body: scopes.Data, Status: http.StatusOK}, nil
+	return &plugin.ApiResourceOutput{Body: createdScopes, Status: http.StatusOK}, nil
 }
 
 func (pa *pluginAPI) PatchScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
@@ -87,115 +85,113 @@ func (pa *pluginAPI) PatchScope(input *plugin.ApiResourceInput) (*plugin.ApiReso
 	if connectionId == 0 {
 		return nil, errors.BadInput.New("invalid connectionId")
 	}
-
 	db := basicRes.GetDal()
-	scope := ScopeItem{}
-	err := db.First(&scope, dal.Where("connection_id = ? AND scope_id = ?", connectionId, scopeId))
+	scope := pa.scopeType.New()
+	err := api.CallDB(db.First, scope, dal.Where("connection_id = ? AND id = ?", connectionId, scopeId))
 	if err != nil {
 		return nil, errors.Default.Wrap(err, "scope not found")
 	}
-
-	err = api.DecodeMapStruct(input.Body, &scope)
+	err = verifyScope(input.Body)
 	if err != nil {
-		return nil, errors.Default.Wrap(err, "patch scope error")
+		return nil, err
 	}
-
-	err = verifyScope(&scope)
+	err = scope.From(&input.Body)
 	if err != nil {
-		return nil, err
+		return nil, errors.Default.Wrap(err, "patch scope error")
 	}
-
-	err = db.Update(&scope)
+	err = api.CallDB(db.Update, scope)
 	if err != nil {
 		return nil, errors.Default.Wrap(err, "error on saving scope")
 	}
-	return &plugin.ApiResourceOutput{Body: scope, Status: http.StatusOK}, nil
+	return &plugin.ApiResourceOutput{Body: scope.Unwrap(), Status: http.StatusOK}, nil
 }
 
 func (pa *pluginAPI) ListScopes(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
-	var scopes []ScopeItem
 	connectionId, _ := extractParam(input.Params)
-
 	if connectionId == 0 {
 		return nil, errors.BadInput.New("invalid connectionId")
 	}
-
 	limit, offset := api.GetLimitOffset(input.Query, "pageSize", "page")
 
 	if limit > 100 {
 		return nil, errors.BadInput.New("Page limit cannot exceed 100")
 	}
-
 	db := basicRes.GetDal()
-	err := db.All(&scopes, dal.Where("connection_id = ?", connectionId), dal.Limit(limit), dal.Offset(offset))
+	scopes := pa.scopeType.NewSlice()
+	err := api.CallDB(db.All, scopes, dal.Where("connection_id = ?", connectionId), dal.Limit(limit), dal.Offset(offset))
+	if err != nil {
+		return nil, err
+	}
+	var scopeMap []map[string]any
+	err = scopes.To(&scopeMap)
 	if err != nil {
 		return nil, err
 	}
-
 	var ruleIds []uint64
-	for _, scope := range scopes {
-		if scope.TransformationRuleId > 0 {
-			ruleIds = append(ruleIds, scope.TransformationRuleId)
+	for _, scopeModel := range scopeMap {
+		if tid := uint64(scopeModel["transformation_rule_id"].(float64)); tid > 0 {
+			ruleIds = append(ruleIds, tid)
 		}
 	}
-
-	var txRuleId2Name []struct {
-		id   uint64
-		name string
-	}
+	rules := pa.txRuleType.NewSlice()
 	if len(ruleIds) > 0 {
-		err = db.All(&txRuleId2Name,
-			dal.Select("id, name"),
-			dal.From(pa.txRuleType.TableName()),
+		err = api.CallDB(db.All, rules, dal.Select("id, name"),
 			dal.Where("id IN (?)", ruleIds))
 		if err != nil {
 			return nil, err
 		}
 	}
-
+	var transformationModels []models.TransformationModel
+	err = rules.To(&transformationModels)
+	if err != nil {
+		return nil, err
+	}
 	names := make(map[uint64]string)
-	for _, r := range txRuleId2Name {
-		names[r.id] = r.name
+	for _, t := range transformationModels {
+		names[t.Id] = t.Name
 	}
-
 	var apiScopes []apiScopeResponse
-	for _, scope := range scopes {
-		txRuleName := names[scope.TransformationRuleId]
-		scopeRes := apiScopeResponse{
-			Scope:                  scope,
-			TransformationRuleName: txRuleName,
+	for _, scope := range scopeMap {
+		txRuleName, ok := names[uint64(scope["transformation_rule_id"].(float64))]
+		if ok {
+			scopeRes := apiScopeResponse{
+				Scope:                  scope,
+				TransformationRuleName: txRuleName,
+			}
+			apiScopes = append(apiScopes, scopeRes)
 		}
-		apiScopes = append(apiScopes, scopeRes)
 	}
 
 	return &plugin.ApiResourceOutput{Body: apiScopes, Status: http.StatusOK}, nil
 }
 
 func (pa *pluginAPI) GetScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
-	var scope ScopeItem
 	connectionId, scopeId := extractParam(input.Params)
 	if connectionId == 0 {
 		return nil, errors.BadInput.New("invalid path params")
 	}
-
+	rawScope := pa.scopeType.New()
 	db := basicRes.GetDal()
-	err := db.First(&scope, dal.Where("connection_id = ? AND scope_id = ?", connectionId, scopeId))
+	err := api.CallDB(db.First, rawScope, dal.Where("connection_id = ? AND id = ?", connectionId, scopeId))
 	if db.IsErrorNotFound(err) {
 		return nil, errors.NotFound.New("record not found")
 	}
 	if err != nil {
 		return nil, err
 	}
-
-	var ruleName string
+	var scope models.ScopeModel
+	err = rawScope.To(&scope)
+	if err != nil {
+		return nil, err
+	}
+	var rule models.TransformationModel
 	if scope.TransformationRuleId > 0 {
-		err = db.First(&ruleName, dal.Select("name"), dal.From(pa.txRuleType.TableName()), dal.Where("id = ?", scope.TransformationRuleId))
+		err = api.CallDB(db.First, &rule, dal.From(pa.txRuleType.TableName()), dal.Where("id = ?", scope.TransformationRuleId))
 		if err != nil {
 			return nil, err
 		}
 	}
-
-	return &plugin.ApiResourceOutput{Body: apiScopeResponse{scope, ruleName}, Status: http.StatusOK}, nil
+	return &plugin.ApiResourceOutput{Body: apiScopeResponse{rawScope.Unwrap(), rule.Name}, Status: http.StatusOK}, nil
 }
 
 func extractParam(params map[string]string) (uint64, string) {
@@ -204,12 +200,12 @@ func extractParam(params map[string]string) (uint64, string) {
 	return connectionId, scopeId
 }
 
-func verifyScope(scope *ScopeItem) errors.Error {
-	if scope.ConnectionId == 0 {
+func verifyScope(scope map[string]any) errors.Error {
+	if scope["connection_id"].(float64) == 0 {
 		return errors.BadInput.New("invalid connectionId")
 	}
 
-	if scope.ScopeId == "" {
+	if scope["id"] == "" {
 		return errors.BadInput.New("invalid scope ID")
 	}
 
diff --git a/backend/test/helper/api.go b/backend/test/helper/api.go
index a3da2ec69..c751f51f6 100644
--- a/backend/test/helper/api.go
+++ b/backend/test/helper/api.go
@@ -134,10 +134,13 @@ func (d *DevlakeClient) ListProjects() apiProject.PaginatedProjects {
 }
 
 func (d *DevlakeClient) CreateScope(pluginName string, connectionId uint64, scope any) any {
+	request := map[string]any{
+		"Data": []any{scope},
+	}
 	return sendHttpRequest[any](d.testCtx, d.timeout, debugInfo{
 		print:      true,
 		inlineJson: false,
-	}, http.MethodPut, fmt.Sprintf("%s/plugins/%s/connections/%d/scopes", d.Endpoint, pluginName, connectionId), scope)
+	}, http.MethodPut, fmt.Sprintf("%s/plugins/%s/connections/%d/scopes", d.Endpoint, pluginName, connectionId), request)
 }
 
 func (d *DevlakeClient) UpdateScope(pluginName string, connectionId uint64, scopeId string, scope any) any {
diff --git a/backend/test/remote/fakeplugin/fakeplugin/main.py b/backend/test/remote/fakeplugin/fakeplugin/main.py
index 969c70d7a..4a1fbb485 100644
--- a/backend/test/remote/fakeplugin/fakeplugin/main.py
+++ b/backend/test/remote/fakeplugin/fakeplugin/main.py
@@ -19,8 +19,8 @@ from typing import Optional
 
 from sqlmodel import Field
 
-from pydevlake import Plugin, Connection, TransformationRule, Stream, ToolModel, RemoteScope
-from pydevlake.domain_layer.devops import CICDScope, CICDPipeline
+from pydevlake import Plugin, Connection, TransformationRule, Stream, ToolModel, ToolScope, RemoteScopeGroup, DomainType
+from pydevlake.domain_layer.devops import CicdScope, CICDPipeline
 
 
 VALID_TOKEN = "this_is_a_valid_token"
@@ -43,7 +43,7 @@ class FakePipeline(ToolModel, table=True):
 
 class FakeStream(Stream):
     tool_model = FakePipeline
-    domain_model = CICDPipeline
+    domain_types = [DomainType.CICD]
 
     fake_pipelines = [
         FakePipeline(id=1, project=VALID_PROJECT, state=FakePipeline.State.SUCCESS, started_at=datetime(2023, 1, 10, 11, 0, 0), finished_at=datetime(2023, 1, 10, 11, 3, 0)),
@@ -94,6 +94,10 @@ class FakeConnection(Connection):
     token: str
 
 
+class FakeProject(ToolScope):
+    pass
+
+
 class FakeTransformationRule(TransformationRule):
     tx1: str
 
@@ -103,19 +107,35 @@ class FakePlugin(Plugin):
     def connection_type(self):
         return FakeConnection
 
-    def get_scopes(self, scope_name: str, connection: FakeConnection):
-        assert connection
-        yield CICDScope(
+    @property
+    def tool_scope_type(self):
+        return FakeProject
+
+    def domain_scopes(self, project: FakeProject):
+        yield CicdScope(
             id=1,
-            name=scope_name,
-            url=f"http://fake.org/api/project/{scope_name}"
+            name=project.name,
+            url=f"http://fake.org/api/project/{project.name}"
         )
 
-    def remote_scopes(self, connection: FakeConnection, query: str = ''):
-        yield RemoteScope(
-            id='test',
-            name='Not a real scope'
-        )
+    def remote_scopes(self, connection: FakeConnection, group_id: str):
+        if group_id == 'group1':
+            return [
+                FakeProject(
+                    id='p1',
+                    name='Project 1'
+                )
+            ]
+        else:
+            return []
+
+    def remote_scope_groups(self, connection: FakeConnection):
+        return [
+            RemoteScopeGroup(
+                id='group1',
+                name='Group 1'
+            )
+        ]
 
     def test_connection(self, connection: FakeConnection):
         if connection.token != VALID_TOKEN:
diff --git a/backend/test/remote/remote_test.go b/backend/test/remote/remote_test.go
index 4ecbc8768..95e3d8e60 100644
--- a/backend/test/remote/remote_test.go
+++ b/backend/test/remote/remote_test.go
@@ -19,20 +19,27 @@ package test
 
 import (
 	"fmt"
-	"github.com/apache/incubator-devlake/core/models"
-	"github.com/apache/incubator-devlake/core/plugin"
-	"github.com/apache/incubator-devlake/core/utils"
-	"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-	"github.com/apache/incubator-devlake/test/helper"
-	"github.com/stretchr/testify/require"
 	"os"
 	"os/exec"
 	"path/filepath"
 	"testing"
 	"time"
+
+	"github.com/apache/incubator-devlake/core/models"
+	"github.com/apache/incubator-devlake/core/plugin"
+	"github.com/apache/incubator-devlake/core/utils"
+	"github.com/apache/incubator-devlake/test/helper"
+	"github.com/stretchr/testify/require"
 )
 
 const PLUGIN_NAME = "fake"
+const TOKEN = "this_is_a_valid_token"
+
+type FakePluginConnection struct {
+	Id    uint64 `json:"id"`
+	Name  string `json:"name"`
+	Token string `json:"token"`
+}
 
 func setupEnv() {
 	fmt.Println("Setup test env")
@@ -75,39 +82,97 @@ func connectLocalServer(t *testing.T) *helper.DevlakeClient {
 	return client
 }
 
-func TestRunPipeline(t *testing.T) {
+func CreateTestConnection(client *helper.DevlakeClient) *helper.Connection {
+	connection := client.CreateConnection(PLUGIN_NAME,
+		FakePluginConnection{
+			Name:  "Test connection",
+			Token: TOKEN,
+		},
+	)
+
+	client.SetTimeout(1)
+	return connection
+}
+
+func TestCreateConnection(t *testing.T) {
 	setupEnv()
 	buildPython(t)
 	client := connectLocalServer(t)
-	fmt.Println("Create new connection")
-	conn := client.CreateConnection(PLUGIN_NAME,
-		api.AccessToken{
-			Token: "this_is_a_valid_token",
-		},
-	)
-	client.SetTimeout(0)
+
+	CreateTestConnection(client)
+
 	conns := client.ListConnections(PLUGIN_NAME)
 	require.Equal(t, 1, len(conns))
-	require.Equal(t, "this_is_a_valid_token", conns[0].Token)
-	fmt.Println("Run pipeline")
-	t.Run("run_pipeline", func(t *testing.T) {
-		pipeline := client.RunPipeline(models.NewPipeline{
-			Name: "remote_test",
-			Plan: []plugin.PipelineStage{
+	require.Equal(t, TOKEN, conns[0].Token)
+}
+
+func TestRunPipeline(t *testing.T) {
+	setupEnv()
+	buildPython(t)
+	client := connectLocalServer(t)
+	conn := CreateTestConnection(client)
+
+	pipeline := client.RunPipeline(models.NewPipeline{
+		Name: "remote_test",
+		Plan: []plugin.PipelineStage{
+			{
 				{
+					Plugin:   PLUGIN_NAME,
+					Subtasks: nil,
+					Options: map[string]interface{}{
+						"connectionId": conn.ID,
+						"scopeId":      "org/project",
+					},
+				},
+			},
+		},
+	})
+
+	require.Equal(t, models.TASK_COMPLETED, pipeline.Status)
+	require.Equal(t, 1, pipeline.FinishedTasks)
+	require.Equal(t, "", pipeline.ErrorName)
+}
+
+func TestBlueprintV200(t *testing.T) {
+	setupEnv()
+	buildPython(t)
+	client := connectLocalServer(t)
+	connection := CreateTestConnection(client)
+	projectName := "Test project"
+
+	client.CreateProject(&helper.ProjectConfig{
+		ProjectName: projectName,
+	})
+
+	client.CreateScope("fake", connection.ID, map[string]interface{}{
+		"id":            "12345",
+		"connection_id": connection.ID,
+		"name":          "fake project",
+	})
+
+	blueprint := client.CreateBasicBlueprintV2(
+		"Test blueprint",
+		&helper.BlueprintV2Config{
+			Connection: &plugin.BlueprintConnectionV200{
+				Plugin:       "fake",
+				ConnectionId: connection.ID,
+				Scopes: []*plugin.BlueprintScopeV200{
 					{
-						Plugin:   PLUGIN_NAME,
-						Subtasks: nil,
-						Options: map[string]interface{}{
-							"connectionId": conn.ID,
-							"scopeId":      1,
+						Id:   "12345",
+						Name: "Test scope",
+						Entities: []string{
+							plugin.DOMAIN_TYPE_CROSS,
 						},
 					},
 				},
 			},
-		})
-		require.Equal(t, models.TASK_COMPLETED, pipeline.Status)
-		require.Equal(t, 1, pipeline.FinishedTasks)
-		require.Equal(t, "", pipeline.ErrorName)
-	})
+			SkipOnFail:  true,
+			ProjectName: projectName,
+		},
+	)
+
+	project := client.GetProject(projectName)
+	require.Equal(t, blueprint.Name, project.Blueprint.Name)
+
+	client.TriggerBlueprint(blueprint.ID)
 }