You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by he...@apache.org on 2023/05/31 17:45:49 UTC

[incubator-devlake] branch main updated: 5250 define migration script on python side (#5286)

This is an automated email from the ASF dual-hosted git repository.

hez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 4092092bb 5250 define migration script on python side (#5286)
4092092bb is described below

commit 4092092bba27ff3c258bdadd4177904f4701300f
Author: Camille Teruel <ca...@gmail.com>
AuthorDate: Wed May 31 19:45:44 2023 +0200

    5250 define migration script on python side (#5286)
    
    * feat: Improve dynamic struct generation from JSON schema
    
    * Support "date-time" string format: time.Time (or *time.Time if not required)
    * Support "number" property type: float64
    * Generate gorm `primaryKey` tag for Field(primary_key=True, ...)
    * Generate gorm `type` tag for string fields: `varchar(n)` if n <= 255, `text` otherwise
    * Generate gorm `serializer:encdec` tag for string format "password": Use SecretStr on python side
    * Generate `validate:"required"` for required properties
    
    * feat: AutoMigrate all tool models
    
    * feat: MigrationScripts for python plugins
    
    Support definition of migration scripts on python side.
    
    ---------
    
    Co-authored-by: Camille Teruel <ca...@meri.co>
    Co-authored-by: Hezheng Yin <he...@merico.dev>
---
 backend/core/utils/json.go                         |  74 ++++++++++
 backend/core/utils/json_test.go                    | 111 +++++++++++++++
 backend/python/README.md                           |  41 ++++--
 .../python/plugins/azuredevops/azuredevops/api.py  |   2 +-
 .../python/plugins/azuredevops/azuredevops/main.py |   2 +-
 .../plugins/azuredevops/azuredevops/models.py      |  36 ++---
 backend/python/pydevlake/pydevlake/__init__.py     |   8 +-
 backend/python/pydevlake/pydevlake/ipc.py          |   4 -
 backend/python/pydevlake/pydevlake/message.py      |   9 +-
 backend/python/pydevlake/pydevlake/migration.py    | 113 +++++++++++++++
 backend/python/pydevlake/pydevlake/model.py        |  12 +-
 backend/python/pydevlake/pydevlake/plugin.py       |  29 +---
 backend/python/pydevlake/tests/migration_test.py   |  65 +++++++++
 backend/python/test/fakeplugin/fakeplugin/main.py  |   5 +-
 backend/server/services/remote/init.go             |   5 +-
 .../server/services/remote/models/conversion.go    | 127 +++++++++++++----
 .../services/remote/models/conversion_test.go      | 153 +++++++++++++++++++++
 backend/server/services/remote/models/migration.go | 140 +++++++++++++++++++
 .../services/remote/models/migration_test.go       |  67 +++++++++
 backend/server/services/remote/models/models.go    |  39 +++---
 .../server/services/remote/models/plugin_remote.go |   3 +-
 .../server/services/remote/plugin/plugin_impl.go   |  59 ++++++--
 22 files changed, 955 insertions(+), 149 deletions(-)

diff --git a/backend/core/utils/json.go b/backend/core/utils/json.go
new file mode 100644
index 000000000..557f64cec
--- /dev/null
+++ b/backend/core/utils/json.go
@@ -0,0 +1,74 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+	"fmt"
+	"reflect"
+
+	"github.com/apache/incubator-devlake/core/errors"
+)
+
+type JsonObject = map[string]any
+type JsonArray = []any
+
+func GetProperty[T any](object JsonObject, key string) (T, errors.Error) {
+	property, ok := object[key]
+	if !ok {
+		return *new(T), errors.Default.New(fmt.Sprintf("Missing property \"%s\"", key))
+	}
+	return Convert[T](property)
+}
+
+func GetItem[T any](array JsonArray, index int) (T, errors.Error) {
+	if index < 0 || index >= len(array) {
+		return *new(T), errors.Default.New(fmt.Sprintf("Index %d out of range", index))
+	}
+	return Convert[T](array[index])
+}
+
+// Convert converts value to type T. If value is a slice, it converts each element of the slice to type T.
+// Does not support nested slices.
+func Convert[T any](value any) (T, errors.Error) {
+	var t T
+	tType := reflect.TypeOf(t)
+	if tType.Kind() == reflect.Slice {
+		valueSlice, ok := value.([]any)
+		if !ok {
+			return t, errors.Default.New("Value is not a slice")
+		}
+		elemType := tType.Elem()
+		result := reflect.MakeSlice(tType, 0, len(valueSlice))
+		for i, v := range valueSlice {
+			value := reflect.ValueOf(v)
+			if elemType.AssignableTo(reflect.TypeOf(v)) {
+				elem := value.Convert(elemType)
+				result = reflect.Append(result, elem)
+			} else {
+				return t, errors.Default.New(fmt.Sprintf("Element %d is not of type %s", i, elemType.Name()))
+			}
+		}
+		return result.Interface().(T), nil
+	} else {
+		result, ok := value.(T)
+		if !ok {
+			return t, errors.Default.New(fmt.Sprintf("Value is not of type %T", t))
+		}
+		return result, nil
+	}
+}
diff --git a/backend/core/utils/json_test.go b/backend/core/utils/json_test.go
new file mode 100644
index 000000000..ae573b932
--- /dev/null
+++ b/backend/core/utils/json_test.go
@@ -0,0 +1,111 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestExistingProperty(t *testing.T) {
+	object := map[string]interface{}{
+		"id": 1,
+	}
+
+	res, err := GetProperty[int](object, "id")
+
+	assert.NoError(t, err)
+	assert.Equal(t, res, 1)
+}
+
+func TestMissingProperty(t *testing.T) {
+	object := map[string]interface{}{
+		"id": 1,
+	}
+
+	_, err := GetProperty[int](object, "name")
+
+	assert.Error(t, err)
+	assert.Equal(t, "Missing property \"name\"", err.Error())
+}
+
+func TestInvalidPropertyType(t *testing.T) {
+	object := map[string]interface{}{
+		"id": 1,
+	}
+
+	_, err := GetProperty[string](object, "id")
+
+	assert.Error(t, err)
+	assert.Equal(t, "Value is not of type string", err.Error())
+}
+
+func TestGetItemInRange(t *testing.T) {
+	array := []any{1, 2, 3}
+
+	res, err := GetItem[int](array, 1)
+
+	assert.NoError(t, err)
+	assert.Equal(t, 2, res)
+}
+
+func TestGetItemOutOfRange(t *testing.T) {
+	array := []any{1, 2, 3}
+
+	_, err := GetItem[int](array, 3)
+
+	assert.Error(t, err)
+	assert.Equal(t, "Index 3 out of range", err.Error())
+}
+
+func TestConvertSlice(t *testing.T) {
+	value := []any{1, 2, 3}
+
+	res, err := Convert[[]int](value)
+
+	assert.NoError(t, err)
+	assert.Equal(t, []int{1, 2, 3}, res)
+}
+
+func TestConvertSliceInvalidType(t *testing.T) {
+	value := []any{1, 2, 3}
+
+	val, err := Convert[[]string](value)
+	_ = val
+	assert.Error(t, err)
+	assert.Equal(t, "Element 0 is not of type string", err.Error())
+}
+
+func TestConvertSliceInvalidValue(t *testing.T) {
+	value := []any{1, "2", 3}
+
+	_, err := Convert[[]int](value)
+
+	assert.Error(t, err)
+	assert.Equal(t, "Element 1 is not of type int", err.Error())
+}
+
+func TestConvertSliceInvalidSlice(t *testing.T) {
+	value := 1
+
+	_, err := Convert[[]int](value)
+
+	assert.Error(t, err)
+	assert.Equal(t, "Value is not a slice", err.Error())
+}
diff --git a/backend/python/README.md b/backend/python/README.md
index 192798e58..1e52053d4 100644
--- a/backend/python/README.md
+++ b/backend/python/README.md
@@ -100,11 +100,15 @@ and those that are used to customize conversion to domain models that are groupe
 For example, to add `url` and `token` parameter, edit `MyPluginConnection` as follow:
 
 ```python
+from pydantic import SecretStr
+
 class MyPluginConnection(Connection):
     url: str
-    token: str
+    token: SecretStr
 ```
 
+Using type `SecretStr` instead of `str` will encode the value in the database.
+To get the `str` value, you need to call `get_secret_value()`: `connection.token.get_secret_value()`.
 All plugin methods that have a connection parameter will be called with an instance of this class.
 Note that you should not define `__init__`.
 
@@ -235,20 +239,28 @@ To facilitate or even eliminate extraction, your tool models should be close to
 #### Migration of tool models
 
 Tool models, connection, scope and transformation rule types are stored in the DevLake database.
-When you change the definition of one of those types, you need to migrate the database.
-You should implement the migration logic in the model class by defining a `migrate` class method. This method takes a sqlalchemy session as argument that you can use to
-execute SQL `ALTER TABLE` statements.
+When you change the definition of one of those types, the database needs to be migrated.
+Automatic migration takes care of most modifications, but some changes require manual migration. For example, automatic migration never drops columns. Another example is adding a column to the primary key of a table, you need to write a script that remove the primary key constraint and add a new compound primary key.
 
-```python
-class User(ToolModel, table=True):
-    id: str = Field(primary_key=True)
-    name: str
-    email: str
-    age: int
+To declare a new migration script, you decorate a function with the `migration` decorator. The function name should describe what the script does. The `migration` decorator takes a version number that should be a 14 digits timestamp in the format `YYYYMMDDhhmmss`. The function takes a `MigrationScriptBuilder` as a parameter. This builder exposes methods to execute migration operations.
+
+##### Migration operations
+
+The `MigrationScriptBuilder` exposes the following methods:
+- `execute(sql: str, dialect: Optional[Dialect])`: execute a raw SQL statement. The `dialect` parameter is used to execute the SQL statement only if the database is of the given dialect. If `dialect` is `None`, the statement is executed unconditionally.
+- `drop_column(table: str, column: str)`: drop a column from a table
+- `drop_table(table: str)`: drop a table
 
-    @classmethod
-    def migrate(cls, session):
-        session.execute(f"ALTER TABLE {cls.__tablename__} ADD COLUMN age INT")
+
+```python
+from pydevlake.migration import MigrationScriptBuilder, migration, Dialect
+
+@migration(20230524181430)
+def add_build_id_as_job_primary_key(b: MigrationScriptBuilder):
+    table = Job.__tablename__
+    b.execute(f'ALTER TABLE {table} DROP PRIMARY KEY', Dialect.MYSQL)
+    b.execute(f'ALTER TABLE {table} DROP CONSTRAINT {table}_pkey', Dialect.POSTGRESQL)
+    b.execute(f'ALTER TABLE {table} ADD PRIMARY KEY (id, build_id)')
 ```
 
 
@@ -481,7 +493,8 @@ class UserComments(Substream):
         """
         This method will be called for each user collected from parent stream Users.
         """
-        for json in MyPluginAPI(context.connection.token).user_comments(user.id):
+        api = MyPluginAPI(context.connection.token.get_secret_value())
+        for json in api.user_comments(user.id):
             yield json, state
     ...
 ```
diff --git a/backend/python/plugins/azuredevops/azuredevops/api.py b/backend/python/plugins/azuredevops/azuredevops/api.py
index 4c63e507a..9461d0890 100644
--- a/backend/python/plugins/azuredevops/azuredevops/api.py
+++ b/backend/python/plugins/azuredevops/azuredevops/api.py
@@ -36,7 +36,7 @@ class AzureDevOpsAPI(API):
 
     @request_hook
     def authenticate(self, request: Request):
-        token_b64 = base64.b64encode((':' + self.connection.token).encode()).decode()
+        token_b64 = base64.b64encode((':' + self.connection.token.get_secret_value()).encode()).decode()
         request.headers['Authorization'] = 'Basic ' + token_b64
 
     @request_hook
diff --git a/backend/python/plugins/azuredevops/azuredevops/main.py b/backend/python/plugins/azuredevops/azuredevops/main.py
index fabc53a63..3a96a1964 100644
--- a/backend/python/plugins/azuredevops/azuredevops/main.py
+++ b/backend/python/plugins/azuredevops/azuredevops/main.py
@@ -109,7 +109,7 @@ class AzureDevOpsPlugin(Plugin):
     def extra_tasks(self, scope: GitRepository, tx_rule: AzureDevOpsTransformationRule, entity_types: list[DomainType], connection: AzureDevOpsConnection):
         if DomainType.CODE in entity_types:
             url = urlparse(scope.remote_url)
-            url = url._replace(netloc=f'{url.username}:{connection.token}@{url.hostname}')
+            url = url._replace(netloc=f'{url.username}:{connection.token.get_secret_value()}@{url.hostname}')
             yield gitextractor(url.geturl(), scope.domain_id(), connection.proxy)
 
     def extra_stages(self, scope_tx_rule_pairs: list[ScopeTxRulePair], entity_types: list[DomainType], _):
diff --git a/backend/python/plugins/azuredevops/azuredevops/models.py b/backend/python/plugins/azuredevops/azuredevops/models.py
index 5c656afd1..3e08ef68b 100644
--- a/backend/python/plugins/azuredevops/azuredevops/models.py
+++ b/backend/python/plugins/azuredevops/azuredevops/models.py
@@ -18,15 +18,16 @@ from enum import Enum
 from typing import Optional
 import re
 
-from sqlmodel import Session, Column, Text
+from pydantic import SecretStr
 
 from pydevlake import Field, Connection, TransformationRule
 from pydevlake.model import ToolModel, ToolScope
 from pydevlake.pipeline_tasks import RefDiffOptions
+from pydevlake.migration import migration, MigrationScriptBuilder, Dialect
 
 
 class AzureDevOpsConnection(Connection):
-    token: str
+    token: SecretStr
     organization: Optional[str]
 
 
@@ -52,7 +53,7 @@ class GitPullRequest(ToolModel, table=True):
         Completed = "completed"
 
     pull_request_id: int = Field(primary_key=True)
-    description: Optional[str] = Field(sa_column=Column(Text))
+    description: Optional[str]
     status: PRStatus
     created_by_id: str = Field(source='/createdBy/id')
     created_by_name: str = Field(source='/createdBy/displayName')
@@ -68,14 +69,6 @@ class GitPullRequest(ToolModel, table=True):
     source_ref_name: Optional[str]
     fork_repo_id: Optional[str] = Field(source='/forkSource/repository/id')
 
-    @classmethod
-    def migrate(self, session: Session):
-        dialect = session.bind.dialect.name
-        if dialect == 'mysql':
-            session.execute(f'ALTER TABLE {self.__tablename__} MODIFY COLUMN description TEXT')
-        elif dialect == 'postgresql':
-            session.execute(f'ALTER TABLE {self.__tablename__} ALTER COLUMN description TYPE TEXT')
-
 
 class GitPullRequestCommit(ToolModel, table=True):
     commit_id: str = Field(primary_key=True)
@@ -132,13 +125,14 @@ class Job(ToolModel, table=True):
     state: JobState
     result: JobResult
 
-    @classmethod
-    def migrate(self, session: Session):
-        dialect = session.bind.dialect.name
-        if dialect == 'mysql':
-            session.execute(f'ALTER TABLE {self.__tablename__} DROP PRIMARY KEY')
-        elif dialect == 'postgresql':
-            session.execute(f'ALTER TABLE {self.__tablename__} DROP CONSTRAINT {self.__tablename__}_pkey')
-        else:
-            raise Exception(f'Unsupported dialect {dialect}')
-        session.execute(f'ALTER TABLE {self.__tablename__} ADD PRIMARY KEY (id, build_id)')
+
+@migration(20230524181430)
+def add_build_id_as_job_primary_key(b: MigrationScriptBuilder):
+    # NOTE: We can't add a column to the primary key of an existing table
+    # so we have to drop the primary key constraint first,
+    # which is done differently in MySQL and PostgreSQL,
+    # and then add the new composite primary key.
+    table = Job.__tablename__
+    b.execute(f'ALTER TABLE {table} DROP PRIMARY KEY', Dialect.MYSQL)
+    b.execute(f'ALTER TABLE {table} DROP CONSTRAINT {table}_pkey', Dialect.POSTGRESQL)
+    b.execute(f'ALTER TABLE {table} ADD PRIMARY KEY (id, build_id)')
diff --git a/backend/python/pydevlake/pydevlake/__init__.py b/backend/python/pydevlake/pydevlake/__init__.py
index d645c0749..0c10c3e9f 100644
--- a/backend/python/pydevlake/pydevlake/__init__.py
+++ b/backend/python/pydevlake/pydevlake/__init__.py
@@ -21,14 +21,16 @@ pytest.register_assert_rewrite('pydevlake.testing')
 from sqlmodel import Field as _Field
 
 
-def Field(*args, schema_extra: Optional[dict[str, Any]]=None, source: Optional[str]=None, **kwargs):
+def Field(*args, primary_key: bool=False, source: Optional[str]=None, **kwargs):
     """
     A wrapper around sqlmodel.Field that adds a source parameter.
     """
-    schema_extra = schema_extra or {}
+    schema_extra = kwargs.get('schema_extra', {})
     if source:
         schema_extra['source'] = source
-    return _Field(*args, **kwargs, schema_extra=schema_extra)
+    if primary_key:
+        schema_extra['primaryKey'] = True
+    return _Field(*args, **kwargs, primary_key=primary_key, schema_extra=schema_extra)
 
 
 from .model import ToolModel, ToolScope, DomainScope, Connection, TransformationRule, domain_id
diff --git a/backend/python/pydevlake/pydevlake/ipc.py b/backend/python/pydevlake/pydevlake/ipc.py
index e59065af0..d783800a7 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -100,10 +100,6 @@ class PluginCommands:
         entities = [DomainType(e) for e in entities]
         return self._plugin.make_pipeline(scope_tx_rule_pairs, entities, connection)
 
-    @plugin_method
-    def run_migrations(self, db_url, force: bool):
-        self._plugin.run_migrations(create_db_engine(db_url), force)
-
     @plugin_method
     def plugin_info(self):
         return self._plugin.plugin_info()
diff --git a/backend/python/pydevlake/pydevlake/message.py b/backend/python/pydevlake/pydevlake/message.py
index 0cb7c23cd..44228f56c 100644
--- a/backend/python/pydevlake/pydevlake/message.py
+++ b/backend/python/pydevlake/pydevlake/message.py
@@ -20,6 +20,7 @@ from pydantic import BaseModel, Field
 import jsonref
 
 from pydevlake.model import ToolScope
+from pydevlake.migration import MigrationScript
 
 
 class Message(BaseModel):
@@ -48,6 +49,10 @@ class DynamicModelInfo(Message):
             # Replace $ref with actual schema
             schema = jsonref.replace_refs(schema, proxies=False)
             del schema['definitions']
+        # Pydantic forgets to put type in enums
+        for prop in schema['properties'].values():
+            if 'type' not in prop and 'enum' in prop:
+                prop['type'] = 'string'
         return DynamicModelInfo(
             json_schema=schema,
             table_name=model_class.__tablename__
@@ -60,11 +65,11 @@ class PluginInfo(Message):
     connection_model_info: DynamicModelInfo
     transformation_rule_model_info: Optional[DynamicModelInfo]
     scope_model_info: DynamicModelInfo
+    tool_model_infos: list[DynamicModelInfo]
+    migration_scripts: list[MigrationScript]
     plugin_path: str
     subtask_metas: list[SubtaskMeta]
     extension: str = "datasource"
-    type: str = "python-poetry"
-    tables: list[str]
 
 
 class RemoteProgress(Message):
diff --git a/backend/python/pydevlake/pydevlake/migration.py b/backend/python/pydevlake/pydevlake/migration.py
new file mode 100644
index 000000000..e64eb9115
--- /dev/null
+++ b/backend/python/pydevlake/pydevlake/migration.py
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from typing import List, Literal, Optional, Union, Annotated
+from enum import Enum
+from datetime import datetime
+
+from pydantic import BaseModel, Field
+
+
+MIGRATION_SCRIPTS = []
+
+class Dialect(Enum):
+    MYSQL = "mysql"
+    POSTGRESQL = "postgres"
+
+
+class Execute(BaseModel):
+    type: Literal["execute"] = "execute"
+    sql: str
+    dialect: Optional[Dialect] = None
+
+
+class DropColumn(BaseModel):
+    type: Literal["drop_column"] = "drop_column"
+    table: str
+    column: str
+
+
+class DropTable(BaseModel):
+    type: Literal["drop_table"] = "drop_table"
+    table: str
+
+
+Operation = Annotated[
+    Union[Execute, DropColumn, DropTable],
+    Field(discriminator="type")
+]
+
+
+class MigrationScript(BaseModel):
+    operations: List[Operation]
+    version: int
+    name: str
+
+
+class MigrationScriptBuilder:
+    def __init__(self):
+        self.operations = []
+
+    def execute(self, sql: str, dialect: Optional[Dialect] = None):
+        """
+        Executes a raw SQL statement.
+        If dialect is specified the statement will be executed only if the db dialect matches.
+        """
+        self.operations.append(Execute(sql=sql, dialect=dialect))
+
+    def drop_column(self, table: str, column: str):
+        """
+        Drops a column from a table.
+        """
+        self.operations.append(DropColumn(table=table, column=column))
+
+    def drop_table(self, table: str):
+        """
+        Drops a table.
+        """
+        self.operations.append(DropTable(table=table))
+
+
+def migration(version: int, name: Optional[str] = None):
+    """
+    Builds a migration script from a function.
+
+    Usage:
+
+    @migration(20230511)
+    def change_description_type(b: MigrationScriptBuilder):
+        b.exec('ALTER TABLE my_table ...')
+    """
+    _validate_version(version)
+
+    def wrapper(fn):
+        builder = MigrationScriptBuilder()
+        fn(builder)
+        script = MigrationScript(operations=builder.operations, version=version, name=name or fn.__name__)
+        MIGRATION_SCRIPTS.append(script)
+        return script
+    return wrapper
+
+
+def _validate_version(version: int):
+    str_version = str(version)
+    err = ValueError(f"Invalid version {version}, must be in YYYYMMDDhhmmss format")
+    if len(str_version) != 14:
+        raise err
+    try:
+        datetime.strptime(str_version, "%Y%m%d%H%M%S")
+    except ValueError:
+        raise  err
diff --git a/backend/python/pydevlake/pydevlake/model.py b/backend/python/pydevlake/pydevlake/model.py
index 995c13b3a..2c576a12e 100644
--- a/backend/python/pydevlake/pydevlake/model.py
+++ b/backend/python/pydevlake/pydevlake/model.py
@@ -20,7 +20,7 @@ from inspect import getmodule
 from datetime import datetime
 
 import inflect
-from pydantic import AnyUrl, validator
+from pydantic import AnyUrl, SecretStr, validator
 from sqlalchemy import Column, DateTime
 from sqlalchemy.orm import declared_attr, Session
 from sqlalchemy.inspection import inspect
@@ -48,6 +48,9 @@ class ToolTable(SQLModel):
 
     class Config:
         allow_population_by_field_name = True
+        json_encoders = {
+            SecretStr: lambda v: v.get_secret_value() if v else None
+        }
 
         @classmethod
         def alias_generator(cls, attr_name: str) -> str:
@@ -56,13 +59,6 @@ class ToolTable(SQLModel):
             parts = attr_name.split('_')
             return parts[0] + ''.join(word.capitalize() for word in parts[1:])
 
-    @classmethod
-    def migrate(cls, session: Session):
-        """
-        Redefine this method to perform migration on this tool model.
-        """
-        pass
-
 
 class Connection(ToolTable, Model):
     name: str
diff --git a/backend/python/pydevlake/pydevlake/plugin.py b/backend/python/pydevlake/pydevlake/plugin.py
index d543fe791..e94791895 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -20,16 +20,14 @@ import os
 import sys
 
 import fire
-from sqlmodel import SQLModel, Session
-from sqlalchemy.inspection import inspect
-from sqlalchemy.engine import Engine
 
 import pydevlake.message as msg
 from pydevlake.subtasks import Subtask
 from pydevlake.ipc import PluginCommands
 from pydevlake.context import Context
 from pydevlake.stream import Stream, DomainType
-from pydevlake.model import ToolScope, DomainScope, Connection, TransformationRule, SubtaskRun
+from pydevlake.model import ToolScope, DomainScope, Connection, TransformationRule
+from pydevlake.migration import MIGRATION_SCRIPTS
 
 
 ScopeTxRulePair = tuple[ToolScope, Optional[TransformationRule]]
@@ -105,24 +103,6 @@ class Plugin(ABC):
     def convert(self, ctx: Context, stream: str):
         yield from self.get_stream(stream).convertor.run(ctx)
 
-    def run_migrations(self, engine: Engine, force: bool = False):
-        # NOTE: Not sure what "force" is for
-        # TODO: Support migration for transformation rule and connection tables
-        # They are currently created on go-side.
-        tool_models = [stream.tool_model for stream in self._streams.values()]
-        tool_models.append(self.tool_scope_type)
-        inspector = inspect(engine)
-        tables = SQLModel.metadata.tables
-        with Session(engine) as session:
-            for model in tool_models:
-                if inspector.has_table(model.__tablename__):
-                    # TODO: Add version table and migrate if needed
-                    model.migrate(session)
-                else:
-                    tables[model.__tablename__].create(engine)
-            session.commit()
-        tables[SubtaskRun.__tablename__].create(engine, checkfirst=True)
-
     def make_remote_scopes(self, connection: Connection, group_id: Optional[str] = None) -> msg.RemoteScopes:
         if group_id:
             remote_scopes = []
@@ -237,8 +217,6 @@ class Plugin(ABC):
             tx_rule_model_info = msg.DynamicModelInfo.from_model(self.transformation_rule_type)
         else:
             tx_rule_model_info = None
-        plugin_tables = [stream(self.name).raw_model_table for stream in self.streams] + \
-                        [stream.tool_model.__tablename__ for stream in self.streams]
         return msg.PluginInfo(
             name=self.name,
             description=self.description,
@@ -247,8 +225,9 @@ class Plugin(ABC):
             connection_model_info=msg.DynamicModelInfo.from_model(self.connection_type),
             transformation_rule_model_info=tx_rule_model_info,
             scope_model_info=msg.DynamicModelInfo.from_model(self.tool_scope_type),
+            tool_model_infos=[msg.DynamicModelInfo.from_model(stream.tool_model) for stream in self._streams.values()],
             subtask_metas=subtask_metas,
-            tables=plugin_tables,
+            migration_scripts=MIGRATION_SCRIPTS
         )
 
     def _plugin_path(self):
diff --git a/backend/python/pydevlake/tests/migration_test.py b/backend/python/pydevlake/tests/migration_test.py
new file mode 100644
index 000000000..6e19825f0
--- /dev/null
+++ b/backend/python/pydevlake/tests/migration_test.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pydevlake.migration import migration, MIGRATION_SCRIPTS
+
+
+@migration(20230520174322)
+def my_migration(b):
+    b.execute("SOME SQL")
+    b.drop_column("t", "c")
+    b.drop_table("t")
+
+
+def test_migration():
+    assert my_migration.version == 20230520174322
+    assert my_migration.name == "my_migration"
+    assert len(my_migration.operations) == 3
+
+    op1 = my_migration.operations[0]
+    assert op1.sql == "SOME SQL"
+    assert op1.dialect is None
+
+    op2 = my_migration.operations[1]
+    assert op2.table == "t"
+    assert op2.column == "c"
+
+    op3 = my_migration.operations[2]
+    assert op3.table == "t"
+
+
+def test_registration():
+    assert my_migration in MIGRATION_SCRIPTS
+
+
+def test_serialization():
+    val = my_migration.dict()
+    assert val["version"] == 20230520174322
+    assert val["name"] == "my_migration"
+    assert len(val["operations"]) == 3
+
+    op1 = val["operations"][0]
+    assert op1["type"] == "execute"
+    assert op1["sql"] == "SOME SQL"
+    assert "dialect" not in op1 or op1["dialect"] is None
+
+    op2 = val["operations"][1]
+    assert op2["type"] == "drop_column"
+    assert op2["table"] == "t"
+    assert op2["column"] == "c"
+
+    op3 = val["operations"][2]
+    assert op3["type"] == "drop_table"
+    assert op3["table"] == "t"
diff --git a/backend/python/test/fakeplugin/fakeplugin/main.py b/backend/python/test/fakeplugin/fakeplugin/main.py
index 6a048ff22..55c548965 100644
--- a/backend/python/test/fakeplugin/fakeplugin/main.py
+++ b/backend/python/test/fakeplugin/fakeplugin/main.py
@@ -19,6 +19,7 @@ from typing import Optional
 import json
 
 from sqlmodel import Field
+from pydantic import SecretStr
 
 from pydevlake import Plugin, Connection, TransformationRule, Stream, ToolModel, ToolScope, RemoteScopeGroup, DomainType
 from pydevlake.domain_layer.devops import CicdScope, CICDPipeline, CICDStatus, CICDResult, CICDType
@@ -96,7 +97,7 @@ class FakePipelineStream(Stream):
 
 
 class FakeConnection(Connection):
-    token: str
+    token: SecretStr
 
 
 class FakeProject(ToolScope, table=True):
@@ -149,7 +150,7 @@ class FakePlugin(Plugin):
         ]
 
     def test_connection(self, connection: FakeConnection):
-        if connection.token != VALID_TOKEN:
+        if connection.token.get_secret_value() != VALID_TOKEN:
             raise Exception("Invalid token")
 
     @property
diff --git a/backend/server/services/remote/init.go b/backend/server/services/remote/init.go
index 3e505b96c..4c6dd8ff1 100644
--- a/backend/server/services/remote/init.go
+++ b/backend/server/services/remote/init.go
@@ -19,7 +19,7 @@ package remote
 
 import (
 	"fmt"
-	"github.com/apache/incubator-devlake/core/config"
+
 	"github.com/apache/incubator-devlake/core/context"
 	"github.com/apache/incubator-devlake/core/errors"
 	pluginCore "github.com/apache/incubator-devlake/core/plugin"
@@ -43,8 +43,7 @@ func NewRemotePlugin(info *models.PluginInfo) (models.RemotePlugin, errors.Error
 	if err != nil {
 		return nil, err
 	}
-	forceMigration := config.GetConfig().GetBool("FORCE_MIGRATION")
-	err = plugin.RunMigrations(forceMigration)
+	err = plugin.RunAutoMigrations()
 	if err != nil {
 		return nil, err
 	}
diff --git a/backend/server/services/remote/models/conversion.go b/backend/server/services/remote/models/conversion.go
index bd221ecaa..877597ab4 100644
--- a/backend/server/services/remote/models/conversion.go
+++ b/backend/server/services/remote/models/conversion.go
@@ -20,33 +20,35 @@ package models
 import (
 	"encoding/json"
 	"fmt"
-	"github.com/apache/incubator-devlake/impls/dalgorm"
 	"reflect"
 	"strings"
 	"time"
 
+	"github.com/apache/incubator-devlake/impls/dalgorm"
+
 	"github.com/apache/incubator-devlake/core/errors"
 	"github.com/apache/incubator-devlake/core/models"
+	"github.com/apache/incubator-devlake/core/utils"
 	"gorm.io/datatypes"
 )
 
-func LoadTableModel(tableName string, schema map[string]any, encrypt bool, parentModel any) (*models.DynamicTabler, errors.Error) {
-	structType, err := GenerateStructType(schema, encrypt, reflect.TypeOf(parentModel))
+func LoadTableModel(tableName string, schema utils.JsonObject, parentModel any) (*models.DynamicTabler, errors.Error) {
+	structType, err := GenerateStructType(schema, reflect.TypeOf(parentModel))
 	if err != nil {
 		return nil, err
 	}
 	return models.NewDynamicTabler(tableName, structType), nil
 }
 
-func GenerateStructType(schema map[string]any, encrypt bool, baseType reflect.Type) (reflect.Type, errors.Error) {
+func GenerateStructType(schema utils.JsonObject, baseType reflect.Type) (reflect.Type, errors.Error) {
 	var structFields []reflect.StructField
-	propsRaw, ok := schema["properties"]
-	if !ok {
-		return nil, errors.BadInput.New("Missing properties in JSON schema")
+	props, err := utils.GetProperty[utils.JsonObject](schema, "properties")
+	if err != nil {
+		return nil, err
 	}
-	props, ok := propsRaw.(map[string]any)
-	if !ok {
-		return nil, errors.BadInput.New("JSON schema properties must be an object")
+	required, err := utils.GetProperty[[]string](schema, "required")
+	if err != nil {
+		return nil, err
 	}
 	if baseType != nil {
 		anonymousField := reflect.StructField{
@@ -61,8 +63,8 @@ func GenerateStructType(schema map[string]any, encrypt bool, baseType reflect.Ty
 		if isBaseTypeField(k, baseType) {
 			continue
 		}
-		spec := v.(map[string]any)
-		field, err := generateStructField(k, encrypt, spec)
+		spec := v.(utils.JsonObject)
+		field, err := generateStructField(k, spec, isRequired(k, required))
 		if err != nil {
 			return nil, err
 		}
@@ -98,6 +100,15 @@ func ToDatabaseMap(tableName string, ifc any, createdAt *time.Time, updatedAt *t
 	return m, nil
 }
 
+func isRequired(fieldName string, required []string) bool {
+	for _, r := range required {
+		if fieldName == r {
+			return true
+		}
+	}
+	return false
+}
+
 func isBaseTypeField(fieldName string, baseType reflect.Type) bool {
 	fieldName = canonicalFieldName(fieldName)
 	for i := 0; i < baseType.NumField(); i++ {
@@ -118,26 +129,33 @@ func canonicalFieldName(fieldName string) string {
 	return strings.ToLower(strings.Replace(fieldName, "_", "", -1))
 }
 
-func generateStructField(name string, encrypt bool, schema map[string]any) (*reflect.StructField, errors.Error) {
-	goType, err := getGoType(schema)
+var (
+	int64Type   = reflect.TypeOf(int64(0))
+	float64Type = reflect.TypeOf(float64(0))
+	boolType    = reflect.TypeOf(false)
+	stringType  = reflect.TypeOf("")
+	timeType    = reflect.TypeOf(time.Time{})
+	jsonMapType = reflect.TypeOf(datatypes.JSONMap{})
+)
+
+func generateStructField(name string, schema utils.JsonObject, required bool) (*reflect.StructField, errors.Error) {
+	goType, err := getGoType(schema, required)
 	if err != nil {
 		return nil, errors.Default.Wrap(err, fmt.Sprintf("couldn't resolve type for field: \"%s\"", name))
 	}
+	tag, err := getTag(name, schema, goType, required)
+	if err != nil {
+		return nil, err
+	}
 	sf := &reflect.StructField{
 		Name: strings.Title(name), //nolint:staticcheck
 		Type: goType,
-		Tag:  reflect.StructTag(fmt.Sprintf("json:\"%s\"", name)),
-	}
-	if encrypt {
-		sf.Tag = reflect.StructTag(fmt.Sprintf("json:\"%s\" "+
-			"gorm:\"serializer:encdec\"", //just encrypt everything for GORM operations - makes things easy
-			name))
+		Tag:  tag,
 	}
 	return sf, nil
 }
 
-func getGoType(schema map[string]any) (reflect.Type, errors.Error) {
-	var goType reflect.Type
+func getGoType(schema utils.JsonObject, required bool) (reflect.Type, errors.Error) {
 	jsonType, ok := schema["type"].(string)
 	if !ok {
 		return nil, errors.BadInput.New("\"type\" property must be a string")
@@ -145,15 +163,70 @@ func getGoType(schema map[string]any) (reflect.Type, errors.Error) {
 	switch jsonType {
 	//TODO: support more types
 	case "integer":
-		goType = reflect.TypeOf(uint64(0))
+		return int64Type, nil
+	case "number":
+		return float64Type, nil
 	case "boolean":
-		goType = reflect.TypeOf(false)
+		return boolType, nil
 	case "string":
-		goType = reflect.TypeOf("")
+		format, err := utils.GetProperty[string](schema, "format")
+		if err == nil && format == "date-time" {
+			if required {
+				return timeType, nil
+			} else {
+				return reflect.PtrTo(timeType), nil
+			}
+		} else {
+			return stringType, nil
+		}
 	case "object":
-		goType = reflect.TypeOf(datatypes.JSONMap{})
+		return jsonMapType, nil
 	default:
 		return nil, errors.BadInput.New(fmt.Sprintf("Unsupported type %s", jsonType))
 	}
-	return goType, nil
+}
+
+func getTag(name string, schema utils.JsonObject, goType reflect.Type, required bool) (reflect.StructTag, errors.Error) {
+	tags := []string{}
+	tags = append(tags, fmt.Sprintf("json:\"%s\"", name))
+	gormTag := getGormTag(schema, goType)
+	if gormTag != "" {
+		tags = append(tags, gormTag)
+	}
+	if required {
+		tags = append(tags, "validate:\"required\"")
+	}
+	return reflect.StructTag(strings.Join(tags, " ")), nil
+}
+
+func getGormTag(schema utils.JsonObject, goType reflect.Type) string {
+	gormTags := []string{}
+	primaryKey, err := utils.GetProperty[bool](schema, "primaryKey")
+	if err == nil && primaryKey {
+		gormTags = append(gormTags, "primaryKey")
+	}
+	if goType == stringType {
+		maxLength, err := utils.GetProperty[float64](schema, "maxLength")
+		maxLengthInt := int(maxLength)
+		if err == nil {
+			if maxLengthInt > 255 {
+				gormTags = append(gormTags, "type:text")
+			} else {
+				gormTags = append(gormTags, fmt.Sprintf("type:varchar(%d)", maxLengthInt))
+			}
+		} else if primaryKey {
+			// primary keys must have a key length
+			gormTags = append(gormTags, "type:varchar(255)")
+		} else {
+			gormTags = append(gormTags, "type:text")
+		}
+	}
+	format, err := utils.GetProperty[string](schema, "format")
+	if err == nil && format == "password" {
+		gormTags = append(gormTags, "serializer:encdec")
+	}
+	if len(gormTags) == 0 {
+		return ""
+	}
+	return fmt.Sprintf("gorm:\"%s\"", strings.Join(gormTags, ";"))
 }
diff --git a/backend/server/services/remote/models/conversion_test.go b/backend/server/services/remote/models/conversion_test.go
new file mode 100644
index 000000000..1b0bc9c2c
--- /dev/null
+++ b/backend/server/services/remote/models/conversion_test.go
@@ -0,0 +1,153 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import (
+	"reflect"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestGenerateSimpleField(t *testing.T) {
+	schema := map[string]interface{}{
+		"type": "integer",
+	}
+	field, err := generateStructField("i", schema, true)
+	assert.NoError(t, err)
+	assert.Equal(t, int64Type, field.Type)
+	assert.Equal(t, "I", field.Name)
+	json, ok := field.Tag.Lookup("json")
+	assert.True(t, ok)
+	assert.Equal(t, "i", json)
+	validate, ok := field.Tag.Lookup("validate")
+	assert.True(t, ok)
+	assert.Equal(t, "required", validate)
+	_, ok = field.Tag.Lookup("gorm")
+	assert.False(t, ok)
+}
+
+func TestGetGoTypeInt64(t *testing.T) {
+	schema := map[string]interface{}{
+		"type": "integer",
+	}
+	typ, err := getGoType(schema, false)
+	assert.NoError(t, err)
+	assert.Equal(t, int64Type, typ)
+}
+
+func TestGetGoTypeFloat64(t *testing.T) {
+	schema := map[string]interface{}{
+		"type": "number",
+	}
+	typ, err := getGoType(schema, false)
+	assert.NoError(t, err)
+	assert.Equal(t, float64Type, typ)
+}
+
+func TestGetGoTypeBool(t *testing.T) {
+	schema := map[string]interface{}{
+		"type": "boolean",
+	}
+	typ, err := getGoType(schema, false)
+	assert.NoError(t, err)
+	assert.Equal(t, boolType, typ)
+}
+
+func TestGetGoTypeString(t *testing.T) {
+	schema := map[string]interface{}{
+		"type": "string",
+	}
+	typ, err := getGoType(schema, false)
+	assert.NoError(t, err)
+	assert.Equal(t, stringType, typ)
+}
+
+func TestGetGoTypeTime(t *testing.T) {
+	schema := map[string]interface{}{
+		"type":   "string",
+		"format": "date-time",
+	}
+	typ, err := getGoType(schema, true)
+	assert.NoError(t, err)
+	assert.Equal(t, timeType, typ)
+}
+
+func TestGetGoTypeTimePointer(t *testing.T) {
+	schema := map[string]interface{}{
+		"type":   "string",
+		"format": "date-time",
+	}
+	typ, err := getGoType(schema, false)
+	assert.NoError(t, err)
+	assert.Equal(t, reflect.PtrTo(timeType), typ)
+}
+
+func TestGetGoTypeJsonMap(t *testing.T) {
+	schema := map[string]interface{}{
+		"type": "object",
+	}
+	typ, err := getGoType(schema, false)
+	assert.NoError(t, err)
+	assert.Equal(t, jsonMapType, typ)
+}
+
+func TestGetGormTagPrimaryKey(t *testing.T) {
+	schema := map[string]interface{}{
+		"type":       "integer",
+		"primaryKey": true,
+	}
+	tag := getGormTag(schema, int64Type)
+	assert.Equal(t, "gorm:\"primaryKey\"", tag)
+}
+
+func TestGetGormTagVarChar(t *testing.T) {
+	schema := map[string]interface{}{
+		"type":      "string",
+		"maxLength": float64(100),
+	}
+	tag := getGormTag(schema, stringType)
+	assert.Equal(t, "gorm:\"type:varchar(100)\"", tag)
+}
+
+func TestGetGormTagText(t *testing.T) {
+	schema := map[string]interface{}{
+		"type":      "string",
+		"maxLength": float64(300),
+	}
+	tag := getGormTag(schema, stringType)
+	assert.Equal(t, "gorm:\"type:text\"", tag)
+}
+
+func TestGetGormTagStringPrimaryKey(t *testing.T) {
+	schema := map[string]interface{}{
+		"type":       "string",
+		"primaryKey": true,
+	}
+	tag := getGormTag(schema, stringType)
+	assert.Equal(t, "gorm:\"primaryKey;type:varchar(255)\"", tag)
+}
+
+func TestGetGormTagEncDec(t *testing.T) {
+	schema := map[string]interface{}{
+		"type":   "string",
+		"format": "password",
+	}
+	tag := getGormTag(schema, stringType)
+	assert.Equal(t, "gorm:\"type:text;serializer:encdec\"", tag)
+}
diff --git a/backend/server/services/remote/models/migration.go b/backend/server/services/remote/models/migration.go
new file mode 100644
index 000000000..7d980354d
--- /dev/null
+++ b/backend/server/services/remote/models/migration.go
@@ -0,0 +1,140 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import (
+	"encoding/json"
+
+	"github.com/apache/incubator-devlake/core/context"
+	"github.com/apache/incubator-devlake/core/dal"
+	"github.com/apache/incubator-devlake/core/errors"
+	"github.com/apache/incubator-devlake/core/plugin"
+)
+
+type Operation interface {
+	Execute(dal.Dal) errors.Error
+}
+
+type ExecuteOperation struct {
+	Sql     string  `json:"sql"`
+	Dialect *string `json:"dialect"`
+}
+
+func (o ExecuteOperation) Execute(dal dal.Dal) errors.Error {
+	if o.Dialect != nil {
+		if dal.Dialect() == *o.Dialect {
+			return dal.Exec(o.Sql)
+		}
+		return nil
+	} else {
+		return dal.Exec(o.Sql)
+	}
+}
+
+var _ Operation = (*ExecuteOperation)(nil)
+
+type DropColumnOperation struct {
+	Table  string `json:"table"`
+	Column string `json:"column"`
+}
+
+func (o DropColumnOperation) Execute(dal dal.Dal) errors.Error {
+	return dal.DropColumns(o.Table, o.Column)
+}
+
+var _ Operation = (*DropColumnOperation)(nil)
+
+type DropTableOperation struct {
+	Table  string `json:"table"`
+	Column string `json:"column"`
+}
+
+func (o DropTableOperation) Execute(dal dal.Dal) errors.Error {
+	return dal.DropTables(o.Table)
+}
+
+var _ Operation = (*DropTableOperation)(nil)
+
+type RemoteMigrationScript struct {
+	operations []Operation
+	version    uint64
+	name       string
+}
+
+type rawRemoteMigrationScript struct {
+	Operations []json.RawMessage `json:"operations"`
+	Version    uint64            `json:"version"`
+	Name       string            `json:"name"`
+}
+
+func (s *RemoteMigrationScript) UnmarshalJSON(data []byte) error {
+	var rawScript rawRemoteMigrationScript
+	err := json.Unmarshal(data, &rawScript)
+	if err != nil {
+		return err
+	}
+	s.version = rawScript.Version
+	s.name = rawScript.Name
+	s.operations = make([]Operation, len(rawScript.Operations))
+	for i, operationRaw := range rawScript.Operations {
+		operationMap := make(map[string]interface{})
+		err := json.Unmarshal(operationRaw, &operationMap)
+		if err != nil {
+			return err
+		}
+		operationType := operationMap["type"].(string)
+		var operation Operation
+		switch operationType {
+		case "execute":
+			operation = &ExecuteOperation{}
+		case "drop_column":
+			operation = &DropColumnOperation{}
+		case "drop_table":
+			operation = &DropTableOperation{}
+		default:
+			return errors.BadInput.New("unsupported operation type")
+		}
+		err = json.Unmarshal(operationRaw, operation)
+		if err != nil {
+			return err
+		}
+		s.operations[i] = operation
+	}
+	return nil
+}
+
+func (s *RemoteMigrationScript) Up(basicRes context.BasicRes) errors.Error {
+	dal := basicRes.GetDal()
+	for _, operation := range s.operations {
+		err := operation.Execute(dal)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (s *RemoteMigrationScript) Version() uint64 {
+	return s.version
+}
+
+func (s *RemoteMigrationScript) Name() string {
+	return s.name
+}
+
+var _ plugin.MigrationScript = (*RemoteMigrationScript)(nil)
diff --git a/backend/server/services/remote/models/migration_test.go b/backend/server/services/remote/models/migration_test.go
new file mode 100644
index 000000000..163bfb238
--- /dev/null
+++ b/backend/server/services/remote/models/migration_test.go
@@ -0,0 +1,67 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import (
+	"encoding/json"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestUnmarshallMigrationScript(t *testing.T) {
+	raw := []byte(`{
+		"name": "test",
+		"version": 20230420123456,
+		"operations": [
+			{
+				"type": "execute",
+				"sql": "SOME SQL",
+				"dialect": "mysql"
+			},
+			{
+				"type": "drop_column",
+				"table": "t",
+				"column": "c"
+			},
+			{
+				"type": "drop_table",
+				"table": "t"
+			}
+		]
+	}`)
+
+	var script RemoteMigrationScript
+	err := json.Unmarshal(raw, &script)
+
+	assert.NoError(t, err)
+	assert.Equal(t, "test", script.name)
+	assert.Equal(t, uint64(20230420123456), script.version)
+	assert.Len(t, script.operations, 3)
+
+	op1 := script.operations[0].(*ExecuteOperation)
+	assert.Equal(t, "SOME SQL", op1.Sql)
+	assert.Equal(t, "mysql", *op1.Dialect)
+
+	op2 := script.operations[1].(*DropColumnOperation)
+	assert.Equal(t, "t", op2.Table)
+	assert.Equal(t, "c", op2.Column)
+
+	op3 := script.operations[2].(*DropTableOperation)
+	assert.Equal(t, "t", op3.Table)
+}
diff --git a/backend/server/services/remote/models/models.go b/backend/server/services/remote/models/models.go
index b220114e1..c3bc47b51 100644
--- a/backend/server/services/remote/models/models.go
+++ b/backend/server/services/remote/models/models.go
@@ -24,30 +24,25 @@ import (
 	"github.com/apache/incubator-devlake/core/plugin"
 )
 
-const (
-	PythonPoetryCmd PluginType      = "python-poetry"
-	PythonCmd       PluginType      = "python"
-	None            PluginExtension = ""
-	Metric          PluginExtension = "metric"
-	Datasource      PluginExtension = "datasource"
-)
+type PluginExtension string
 
-type (
-	PluginType      string
-	PluginExtension string
+const (
+	None       PluginExtension = ""
+	Metric     PluginExtension = "metric"
+	Datasource PluginExtension = "datasource"
 )
 
 type PluginInfo struct {
-	Type                        PluginType        `json:"type" validate:"required"`
-	Name                        string            `json:"name" validate:"required"`
-	Extension                   PluginExtension   `json:"extension"`
-	ConnectionModelInfo         *DynamicModelInfo `json:"connection_model_info" validate:"required"`
-	TransformationRuleModelInfo *DynamicModelInfo `json:"transformation_rule_model_info"`
-	ScopeModelInfo              *DynamicModelInfo `json:"scope_model_info" validate:"dive"`
-	Description                 string            `json:"description"`
-	PluginPath                  string            `json:"plugin_path" validate:"required"`
-	SubtaskMetas                []SubtaskMeta     `json:"subtask_metas" validate:"dive"`
-	Tables                      []string          `json:"tables"`
+	Name                        string                  `json:"name" validate:"required"`
+	Description                 string                  `json:"description"`
+	ConnectionModelInfo         *DynamicModelInfo       `json:"connection_model_info" validate:"required"`
+	TransformationRuleModelInfo *DynamicModelInfo       `json:"transformation_rule_model_info"`
+	ScopeModelInfo              *DynamicModelInfo       `json:"scope_model_info" validate:"required"`
+	ToolModelInfos              []*DynamicModelInfo     `json:"tool_model_infos"`
+	MigrationScripts            []RemoteMigrationScript `json:"migration_scripts"`
+	PluginPath                  string                  `json:"plugin_path" validate:"required"`
+	SubtaskMetas                []SubtaskMeta           `json:"subtask_metas"`
+	Extension                   PluginExtension         `json:"extension"`
 }
 
 // Type aliases used by the API helper for better readability
@@ -62,8 +57,8 @@ type DynamicModelInfo struct {
 	TableName  string         `json:"table_name" validate:"required"`
 }
 
-func (d DynamicModelInfo) LoadDynamicTabler(encrypt bool, parentModel any) (*models.DynamicTabler, errors.Error) {
-	return LoadTableModel(d.TableName, d.JsonSchema, encrypt, parentModel)
+func (d DynamicModelInfo) LoadDynamicTabler(parentModel any) (*models.DynamicTabler, errors.Error) {
+	return LoadTableModel(d.TableName, d.JsonSchema, parentModel)
 }
 
 type ScopeModel struct {
diff --git a/backend/server/services/remote/models/plugin_remote.go b/backend/server/services/remote/models/plugin_remote.go
index e039912a6..1d0c0f57b 100644
--- a/backend/server/services/remote/models/plugin_remote.go
+++ b/backend/server/services/remote/models/plugin_remote.go
@@ -29,5 +29,6 @@ type RemotePlugin interface {
 	plugin.PluginMeta
 	plugin.PluginOpenApiSpec
 	plugin.PluginModel
-	RunMigrations(forceMigrate bool) errors.Error
+	plugin.PluginMigration
+	RunAutoMigrations() errors.Error
 }
diff --git a/backend/server/services/remote/plugin/plugin_impl.go b/backend/server/services/remote/plugin/plugin_impl.go
index 259d2300c..02a613748 100644
--- a/backend/server/services/remote/plugin/plugin_impl.go
+++ b/backend/server/services/remote/plugin/plugin_impl.go
@@ -19,6 +19,7 @@ package plugin
 
 import (
 	"fmt"
+	"strings"
 
 	"github.com/apache/incubator-devlake/core/dal"
 	"github.com/apache/incubator-devlake/core/errors"
@@ -41,9 +42,10 @@ type (
 		connectionTabler         *coreModels.DynamicTabler
 		scopeTabler              *coreModels.DynamicTabler
 		transformationRuleTabler *coreModels.DynamicTabler
+		toolModelTablers         []*coreModels.DynamicTabler
+		migrationScripts         []plugin.MigrationScript
 		resources                map[string]map[string]plugin.ApiResourceHandler
 		openApiSpec              string
-		tables                   []dal.Tabler
 	}
 	RemotePluginTaskData struct {
 		DbUrl              string                 `json:"db_url"`
@@ -55,26 +57,39 @@ type (
 )
 
 func newPlugin(info *models.PluginInfo, invoker bridge.Invoker) (*remotePluginImpl, errors.Error) {
-	connectionTabler, err := info.ConnectionModelInfo.LoadDynamicTabler(true, common.Model{})
+	connectionTabler, err := info.ConnectionModelInfo.LoadDynamicTabler(common.Model{})
 	if err != nil {
 		return nil, errors.Default.Wrap(err, fmt.Sprintf("Couldn't load Connection type for plugin %s", info.Name))
 	}
 
 	var txRuleTabler *coreModels.DynamicTabler
 	if info.TransformationRuleModelInfo != nil {
-		txRuleTabler, err = info.TransformationRuleModelInfo.LoadDynamicTabler(false, models.TransformationModel{})
+		txRuleTabler, err = info.TransformationRuleModelInfo.LoadDynamicTabler(models.TransformationModel{})
 		if err != nil {
 			return nil, errors.Default.Wrap(err, fmt.Sprintf("Couldn't load TransformationRule type for plugin %s", info.Name))
 		}
 	}
-	scopeTabler, err := info.ScopeModelInfo.LoadDynamicTabler(false, models.ScopeModel{})
+	scopeTabler, err := info.ScopeModelInfo.LoadDynamicTabler(models.ScopeModel{})
 	if err != nil {
 		return nil, errors.Default.Wrap(err, fmt.Sprintf("Couldn't load Scope type for plugin %s", info.Name))
 	}
+	toolModelTablers := make([]*coreModels.DynamicTabler, len(info.ToolModelInfos))
+	for i, toolModelInfo := range info.ToolModelInfos {
+		toolModelTabler, err := toolModelInfo.LoadDynamicTabler(common.NoPKModel{})
+		if err != nil {
+			return nil, errors.Default.Wrap(err, fmt.Sprintf("Couldn't load ToolModel type for plugin %s", info.Name))
+		}
+		toolModelTablers[i] = toolModelTabler
+	}
 	openApiSpec, err := doc.GenerateOpenApiSpec(info)
 	if err != nil {
 		return nil, errors.Default.Wrap(err, fmt.Sprintf("Couldn't generate OpenAPI spec for plugin %s", info.Name))
 	}
+	scripts := make([]plugin.MigrationScript, 0)
+	for _, script := range info.MigrationScripts {
+		script := script
+		scripts = append(scripts, &script)
+	}
 	p := remotePluginImpl{
 		name:                     info.Name,
 		invoker:                  invoker,
@@ -83,6 +98,8 @@ func newPlugin(info *models.PluginInfo, invoker bridge.Invoker) (*remotePluginIm
 		connectionTabler:         connectionTabler,
 		scopeTabler:              scopeTabler,
 		transformationRuleTabler: txRuleTabler,
+		toolModelTablers:         toolModelTablers,
+		migrationScripts:         scripts,
 		resources:                GetDefaultAPI(invoker, connectionTabler, txRuleTabler, scopeTabler, connectionHelper),
 		openApiSpec:              *openApiSpec,
 	}
@@ -97,9 +114,6 @@ func newPlugin(info *models.PluginInfo, invoker bridge.Invoker) (*remotePluginIm
 			DomainTypes:      subtask.DomainTypes,
 		})
 	}
-	for _, tableName := range info.Tables {
-		p.tables = append(p.tables, coreModels.NewDynamicTabler(tableName, nil))
-	}
 	return &p, nil
 }
 
@@ -108,7 +122,13 @@ func (p *remotePluginImpl) SubTaskMetas() []plugin.SubTaskMeta {
 }
 
 func (p *remotePluginImpl) GetTablesInfo() []dal.Tabler {
-	return p.tables
+	tables := make([]dal.Tabler, 0)
+	for _, toolModelTabler := range p.toolModelTablers {
+		tables = append(tables, toolModelTabler)
+		rawTableName := strings.Replace(toolModelTabler.TableName(), "_tool_", "_raw_", 1)
+		tables = append(tables, coreModels.NewDynamicTabler(rawTableName, nil))
+	}
+	return tables
 }
 
 func (p *remotePluginImpl) PrepareTaskData(taskCtx plugin.TaskContext, options map[string]interface{}) (interface{}, errors.Error) {
@@ -191,28 +211,37 @@ func (p *remotePluginImpl) ApiResources() map[string]map[string]plugin.ApiResour
 	return p.resources
 }
 
-func (p *remotePluginImpl) RunMigrations(forceMigrate bool) errors.Error {
-	err := api.CallDB(basicRes.GetDal().AutoMigrate, p.connectionTabler.New())
+func (p *remotePluginImpl) RunAutoMigrations() errors.Error {
+	db := basicRes.GetDal()
+	err := api.CallDB(db.AutoMigrate, p.connectionTabler.New())
 	if err != nil {
 		return err
 	}
-	err = api.CallDB(basicRes.GetDal().AutoMigrate, p.scopeTabler.New())
+	err = api.CallDB(db.AutoMigrate, p.scopeTabler.New())
 	if err != nil {
 		return err
 	}
 	if p.transformationRuleTabler != nil {
-		err = api.CallDB(basicRes.GetDal().AutoMigrate, p.transformationRuleTabler.New())
+		err = api.CallDB(db.AutoMigrate, p.transformationRuleTabler.New())
 		if err != nil {
 			return err
 		}
 	}
-	dbUrl := basicRes.GetConfig("db_url")
-	err = p.invoker.Call("run-migrations", bridge.DefaultContext, dbUrl, forceMigrate).Err
-	return err
+	for _, toolModelTabler := range p.toolModelTablers {
+		err = api.CallDB(db.AutoMigrate, toolModelTabler.New())
+		if err != nil {
+			return err
+		}
+	}
+	return nil
 }
 
 func (p *remotePluginImpl) OpenApiSpec() string {
 	return p.openApiSpec
 }
 
+func (p *remotePluginImpl) MigrationScripts() []plugin.MigrationScript {
+	return p.migrationScripts
+}
+
 var _ models.RemotePlugin = (*remotePluginImpl)(nil)