You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by he...@apache.org on 2023/05/31 17:45:49 UTC
[incubator-devlake] branch main updated: 5250 define migration script on python side (#5286)
This is an automated email from the ASF dual-hosted git repository.
hez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 4092092bb 5250 define migration script on python side (#5286)
4092092bb is described below
commit 4092092bba27ff3c258bdadd4177904f4701300f
Author: Camille Teruel <ca...@gmail.com>
AuthorDate: Wed May 31 19:45:44 2023 +0200
5250 define migration script on python side (#5286)
* feat: Improve dynamic struct generation from JSON schema
* Support "date-time" string format: time.Time (or *time.Time if not required)
* Support "number" property type: float64
* Generate gorm `primaryKey` tag for Field(primary_key=True, ...)
* Generate gorm `type` tag for string fields: `varchar(n)` if n <= 255, `text` otherwise
* Generate gorm `serializer:encdec` tag for string format "password": Use SecretStr on python side
* Generate `validate:"required"` for required properties
* feat: AutoMigrate all tool models
* feat: MigrationScripts for python plugins
Support definition of migration scripts on python side.
---------
Co-authored-by: Camille Teruel <ca...@meri.co>
Co-authored-by: Hezheng Yin <he...@merico.dev>
---
backend/core/utils/json.go | 74 ++++++++++
backend/core/utils/json_test.go | 111 +++++++++++++++
backend/python/README.md | 41 ++++--
.../python/plugins/azuredevops/azuredevops/api.py | 2 +-
.../python/plugins/azuredevops/azuredevops/main.py | 2 +-
.../plugins/azuredevops/azuredevops/models.py | 36 ++---
backend/python/pydevlake/pydevlake/__init__.py | 8 +-
backend/python/pydevlake/pydevlake/ipc.py | 4 -
backend/python/pydevlake/pydevlake/message.py | 9 +-
backend/python/pydevlake/pydevlake/migration.py | 113 +++++++++++++++
backend/python/pydevlake/pydevlake/model.py | 12 +-
backend/python/pydevlake/pydevlake/plugin.py | 29 +---
backend/python/pydevlake/tests/migration_test.py | 65 +++++++++
backend/python/test/fakeplugin/fakeplugin/main.py | 5 +-
backend/server/services/remote/init.go | 5 +-
.../server/services/remote/models/conversion.go | 127 +++++++++++++----
.../services/remote/models/conversion_test.go | 153 +++++++++++++++++++++
backend/server/services/remote/models/migration.go | 140 +++++++++++++++++++
.../services/remote/models/migration_test.go | 67 +++++++++
backend/server/services/remote/models/models.go | 39 +++---
.../server/services/remote/models/plugin_remote.go | 3 +-
.../server/services/remote/plugin/plugin_impl.go | 59 ++++++--
22 files changed, 955 insertions(+), 149 deletions(-)
diff --git a/backend/core/utils/json.go b/backend/core/utils/json.go
new file mode 100644
index 000000000..557f64cec
--- /dev/null
+++ b/backend/core/utils/json.go
@@ -0,0 +1,74 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+ "fmt"
+ "reflect"
+
+ "github.com/apache/incubator-devlake/core/errors"
+)
+
+type JsonObject = map[string]any
+type JsonArray = []any
+
+func GetProperty[T any](object JsonObject, key string) (T, errors.Error) {
+ property, ok := object[key]
+ if !ok {
+ return *new(T), errors.Default.New(fmt.Sprintf("Missing property \"%s\"", key))
+ }
+ return Convert[T](property)
+}
+
+func GetItem[T any](array JsonArray, index int) (T, errors.Error) {
+ if index < 0 || index >= len(array) {
+ return *new(T), errors.Default.New(fmt.Sprintf("Index %d out of range", index))
+ }
+ return Convert[T](array[index])
+}
+
+// Convert converts value to type T. If value is a slice, it converts each element of the slice to type T.
+// Does not support nested slices.
+func Convert[T any](value any) (T, errors.Error) {
+ var t T
+ tType := reflect.TypeOf(t)
+ if tType.Kind() == reflect.Slice {
+ valueSlice, ok := value.([]any)
+ if !ok {
+ return t, errors.Default.New("Value is not a slice")
+ }
+ elemType := tType.Elem()
+ result := reflect.MakeSlice(tType, 0, len(valueSlice))
+ for i, v := range valueSlice {
+ value := reflect.ValueOf(v)
+ if elemType.AssignableTo(reflect.TypeOf(v)) {
+ elem := value.Convert(elemType)
+ result = reflect.Append(result, elem)
+ } else {
+ return t, errors.Default.New(fmt.Sprintf("Element %d is not of type %s", i, elemType.Name()))
+ }
+ }
+ return result.Interface().(T), nil
+ } else {
+ result, ok := value.(T)
+ if !ok {
+ return t, errors.Default.New(fmt.Sprintf("Value is not of type %T", t))
+ }
+ return result, nil
+ }
+}
diff --git a/backend/core/utils/json_test.go b/backend/core/utils/json_test.go
new file mode 100644
index 000000000..ae573b932
--- /dev/null
+++ b/backend/core/utils/json_test.go
@@ -0,0 +1,111 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestExistingProperty(t *testing.T) {
+ object := map[string]interface{}{
+ "id": 1,
+ }
+
+ res, err := GetProperty[int](object, "id")
+
+ assert.NoError(t, err)
+ assert.Equal(t, res, 1)
+}
+
+func TestMissingProperty(t *testing.T) {
+ object := map[string]interface{}{
+ "id": 1,
+ }
+
+ _, err := GetProperty[int](object, "name")
+
+ assert.Error(t, err)
+ assert.Equal(t, "Missing property \"name\"", err.Error())
+}
+
+func TestInvalidPropertyType(t *testing.T) {
+ object := map[string]interface{}{
+ "id": 1,
+ }
+
+ _, err := GetProperty[string](object, "id")
+
+ assert.Error(t, err)
+ assert.Equal(t, "Value is not of type string", err.Error())
+}
+
+func TestGetItemInRange(t *testing.T) {
+ array := []any{1, 2, 3}
+
+ res, err := GetItem[int](array, 1)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 2, res)
+}
+
+func TestGetItemOutOfRange(t *testing.T) {
+ array := []any{1, 2, 3}
+
+ _, err := GetItem[int](array, 3)
+
+ assert.Error(t, err)
+ assert.Equal(t, "Index 3 out of range", err.Error())
+}
+
+func TestConvertSlice(t *testing.T) {
+ value := []any{1, 2, 3}
+
+ res, err := Convert[[]int](value)
+
+ assert.NoError(t, err)
+ assert.Equal(t, []int{1, 2, 3}, res)
+}
+
+func TestConvertSliceInvalidType(t *testing.T) {
+ value := []any{1, 2, 3}
+
+ val, err := Convert[[]string](value)
+ _ = val
+ assert.Error(t, err)
+ assert.Equal(t, "Element 0 is not of type string", err.Error())
+}
+
+func TestConvertSliceInvalidValue(t *testing.T) {
+ value := []any{1, "2", 3}
+
+ _, err := Convert[[]int](value)
+
+ assert.Error(t, err)
+ assert.Equal(t, "Element 1 is not of type int", err.Error())
+}
+
+func TestConvertSliceInvalidSlice(t *testing.T) {
+ value := 1
+
+ _, err := Convert[[]int](value)
+
+ assert.Error(t, err)
+ assert.Equal(t, "Value is not a slice", err.Error())
+}
diff --git a/backend/python/README.md b/backend/python/README.md
index 192798e58..1e52053d4 100644
--- a/backend/python/README.md
+++ b/backend/python/README.md
@@ -100,11 +100,15 @@ and those that are used to customize conversion to domain models that are groupe
For example, to add `url` and `token` parameter, edit `MyPluginConnection` as follow:
```python
+from pydantic import SecretStr
+
class MyPluginConnection(Connection):
url: str
- token: str
+ token: SecretStr
```
+Using type `SecretStr` instead of `str` will encode the value in the database.
+To get the `str` value, you need to call `get_secret_value()`: `connection.token.get_secret_value()`.
All plugin methods that have a connection parameter will be called with an instance of this class.
Note that you should not define `__init__`.
@@ -235,20 +239,28 @@ To facilitate or even eliminate extraction, your tool models should be close to
#### Migration of tool models
Tool models, connection, scope and transformation rule types are stored in the DevLake database.
-When you change the definition of one of those types, you need to migrate the database.
-You should implement the migration logic in the model class by defining a `migrate` class method. This method takes a sqlalchemy session as argument that you can use to
-execute SQL `ALTER TABLE` statements.
+When you change the definition of one of those types, the database needs to be migrated.
+Automatic migration takes care of most modifications, but some changes require manual migration. For example, automatic migration never drops columns. Another example is adding a column to the primary key of a table, you need to write a script that remove the primary key constraint and add a new compound primary key.
-```python
-class User(ToolModel, table=True):
- id: str = Field(primary_key=True)
- name: str
- email: str
- age: int
+To declare a new migration script, you decorate a function with the `migration` decorator. The function name should describe what the script does. The `migration` decorator takes a version number that should be a 14 digits timestamp in the format `YYYYMMDDhhmmss`. The function takes a `MigrationScriptBuilder` as a parameter. This builder exposes methods to execute migration operations.
+
+##### Migration operations
+
+The `MigrationScriptBuilder` exposes the following methods:
+- `execute(sql: str, dialect: Optional[Dialect])`: execute a raw SQL statement. The `dialect` parameter is used to execute the SQL statement only if the database is of the given dialect. If `dialect` is `None`, the statement is executed unconditionally.
+- `drop_column(table: str, column: str)`: drop a column from a table
+- `drop_table(table: str)`: drop a table
- @classmethod
- def migrate(cls, session):
- session.execute(f"ALTER TABLE {cls.__tablename__} ADD COLUMN age INT")
+
+```python
+from pydevlake.migration import MigrationScriptBuilder, migration, Dialect
+
+@migration(20230524181430)
+def add_build_id_as_job_primary_key(b: MigrationScriptBuilder):
+ table = Job.__tablename__
+ b.execute(f'ALTER TABLE {table} DROP PRIMARY KEY', Dialect.MYSQL)
+ b.execute(f'ALTER TABLE {table} DROP CONSTRAINT {table}_pkey', Dialect.POSTGRESQL)
+ b.execute(f'ALTER TABLE {table} ADD PRIMARY KEY (id, build_id)')
```
@@ -481,7 +493,8 @@ class UserComments(Substream):
"""
This method will be called for each user collected from parent stream Users.
"""
- for json in MyPluginAPI(context.connection.token).user_comments(user.id):
+ api = MyPluginAPI(context.connection.token.get_secret_value())
+ for json in api.user_comments(user.id):
yield json, state
...
```
diff --git a/backend/python/plugins/azuredevops/azuredevops/api.py b/backend/python/plugins/azuredevops/azuredevops/api.py
index 4c63e507a..9461d0890 100644
--- a/backend/python/plugins/azuredevops/azuredevops/api.py
+++ b/backend/python/plugins/azuredevops/azuredevops/api.py
@@ -36,7 +36,7 @@ class AzureDevOpsAPI(API):
@request_hook
def authenticate(self, request: Request):
- token_b64 = base64.b64encode((':' + self.connection.token).encode()).decode()
+ token_b64 = base64.b64encode((':' + self.connection.token.get_secret_value()).encode()).decode()
request.headers['Authorization'] = 'Basic ' + token_b64
@request_hook
diff --git a/backend/python/plugins/azuredevops/azuredevops/main.py b/backend/python/plugins/azuredevops/azuredevops/main.py
index fabc53a63..3a96a1964 100644
--- a/backend/python/plugins/azuredevops/azuredevops/main.py
+++ b/backend/python/plugins/azuredevops/azuredevops/main.py
@@ -109,7 +109,7 @@ class AzureDevOpsPlugin(Plugin):
def extra_tasks(self, scope: GitRepository, tx_rule: AzureDevOpsTransformationRule, entity_types: list[DomainType], connection: AzureDevOpsConnection):
if DomainType.CODE in entity_types:
url = urlparse(scope.remote_url)
- url = url._replace(netloc=f'{url.username}:{connection.token}@{url.hostname}')
+ url = url._replace(netloc=f'{url.username}:{connection.token.get_secret_value()}@{url.hostname}')
yield gitextractor(url.geturl(), scope.domain_id(), connection.proxy)
def extra_stages(self, scope_tx_rule_pairs: list[ScopeTxRulePair], entity_types: list[DomainType], _):
diff --git a/backend/python/plugins/azuredevops/azuredevops/models.py b/backend/python/plugins/azuredevops/azuredevops/models.py
index 5c656afd1..3e08ef68b 100644
--- a/backend/python/plugins/azuredevops/azuredevops/models.py
+++ b/backend/python/plugins/azuredevops/azuredevops/models.py
@@ -18,15 +18,16 @@ from enum import Enum
from typing import Optional
import re
-from sqlmodel import Session, Column, Text
+from pydantic import SecretStr
from pydevlake import Field, Connection, TransformationRule
from pydevlake.model import ToolModel, ToolScope
from pydevlake.pipeline_tasks import RefDiffOptions
+from pydevlake.migration import migration, MigrationScriptBuilder, Dialect
class AzureDevOpsConnection(Connection):
- token: str
+ token: SecretStr
organization: Optional[str]
@@ -52,7 +53,7 @@ class GitPullRequest(ToolModel, table=True):
Completed = "completed"
pull_request_id: int = Field(primary_key=True)
- description: Optional[str] = Field(sa_column=Column(Text))
+ description: Optional[str]
status: PRStatus
created_by_id: str = Field(source='/createdBy/id')
created_by_name: str = Field(source='/createdBy/displayName')
@@ -68,14 +69,6 @@ class GitPullRequest(ToolModel, table=True):
source_ref_name: Optional[str]
fork_repo_id: Optional[str] = Field(source='/forkSource/repository/id')
- @classmethod
- def migrate(self, session: Session):
- dialect = session.bind.dialect.name
- if dialect == 'mysql':
- session.execute(f'ALTER TABLE {self.__tablename__} MODIFY COLUMN description TEXT')
- elif dialect == 'postgresql':
- session.execute(f'ALTER TABLE {self.__tablename__} ALTER COLUMN description TYPE TEXT')
-
class GitPullRequestCommit(ToolModel, table=True):
commit_id: str = Field(primary_key=True)
@@ -132,13 +125,14 @@ class Job(ToolModel, table=True):
state: JobState
result: JobResult
- @classmethod
- def migrate(self, session: Session):
- dialect = session.bind.dialect.name
- if dialect == 'mysql':
- session.execute(f'ALTER TABLE {self.__tablename__} DROP PRIMARY KEY')
- elif dialect == 'postgresql':
- session.execute(f'ALTER TABLE {self.__tablename__} DROP CONSTRAINT {self.__tablename__}_pkey')
- else:
- raise Exception(f'Unsupported dialect {dialect}')
- session.execute(f'ALTER TABLE {self.__tablename__} ADD PRIMARY KEY (id, build_id)')
+
+@migration(20230524181430)
+def add_build_id_as_job_primary_key(b: MigrationScriptBuilder):
+ # NOTE: We can't add a column to the primary key of an existing table
+ # so we have to drop the primary key constraint first,
+ # which is done differently in MySQL and PostgreSQL,
+ # and then add the new composite primary key.
+ table = Job.__tablename__
+ b.execute(f'ALTER TABLE {table} DROP PRIMARY KEY', Dialect.MYSQL)
+ b.execute(f'ALTER TABLE {table} DROP CONSTRAINT {table}_pkey', Dialect.POSTGRESQL)
+ b.execute(f'ALTER TABLE {table} ADD PRIMARY KEY (id, build_id)')
diff --git a/backend/python/pydevlake/pydevlake/__init__.py b/backend/python/pydevlake/pydevlake/__init__.py
index d645c0749..0c10c3e9f 100644
--- a/backend/python/pydevlake/pydevlake/__init__.py
+++ b/backend/python/pydevlake/pydevlake/__init__.py
@@ -21,14 +21,16 @@ pytest.register_assert_rewrite('pydevlake.testing')
from sqlmodel import Field as _Field
-def Field(*args, schema_extra: Optional[dict[str, Any]]=None, source: Optional[str]=None, **kwargs):
+def Field(*args, primary_key: bool=False, source: Optional[str]=None, **kwargs):
"""
A wrapper around sqlmodel.Field that adds a source parameter.
"""
- schema_extra = schema_extra or {}
+ schema_extra = kwargs.get('schema_extra', {})
if source:
schema_extra['source'] = source
- return _Field(*args, **kwargs, schema_extra=schema_extra)
+ if primary_key:
+ schema_extra['primaryKey'] = True
+ return _Field(*args, **kwargs, primary_key=primary_key, schema_extra=schema_extra)
from .model import ToolModel, ToolScope, DomainScope, Connection, TransformationRule, domain_id
diff --git a/backend/python/pydevlake/pydevlake/ipc.py b/backend/python/pydevlake/pydevlake/ipc.py
index e59065af0..d783800a7 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -100,10 +100,6 @@ class PluginCommands:
entities = [DomainType(e) for e in entities]
return self._plugin.make_pipeline(scope_tx_rule_pairs, entities, connection)
- @plugin_method
- def run_migrations(self, db_url, force: bool):
- self._plugin.run_migrations(create_db_engine(db_url), force)
-
@plugin_method
def plugin_info(self):
return self._plugin.plugin_info()
diff --git a/backend/python/pydevlake/pydevlake/message.py b/backend/python/pydevlake/pydevlake/message.py
index 0cb7c23cd..44228f56c 100644
--- a/backend/python/pydevlake/pydevlake/message.py
+++ b/backend/python/pydevlake/pydevlake/message.py
@@ -20,6 +20,7 @@ from pydantic import BaseModel, Field
import jsonref
from pydevlake.model import ToolScope
+from pydevlake.migration import MigrationScript
class Message(BaseModel):
@@ -48,6 +49,10 @@ class DynamicModelInfo(Message):
# Replace $ref with actual schema
schema = jsonref.replace_refs(schema, proxies=False)
del schema['definitions']
+ # Pydantic forgets to put type in enums
+ for prop in schema['properties'].values():
+ if 'type' not in prop and 'enum' in prop:
+ prop['type'] = 'string'
return DynamicModelInfo(
json_schema=schema,
table_name=model_class.__tablename__
@@ -60,11 +65,11 @@ class PluginInfo(Message):
connection_model_info: DynamicModelInfo
transformation_rule_model_info: Optional[DynamicModelInfo]
scope_model_info: DynamicModelInfo
+ tool_model_infos: list[DynamicModelInfo]
+ migration_scripts: list[MigrationScript]
plugin_path: str
subtask_metas: list[SubtaskMeta]
extension: str = "datasource"
- type: str = "python-poetry"
- tables: list[str]
class RemoteProgress(Message):
diff --git a/backend/python/pydevlake/pydevlake/migration.py b/backend/python/pydevlake/pydevlake/migration.py
new file mode 100644
index 000000000..e64eb9115
--- /dev/null
+++ b/backend/python/pydevlake/pydevlake/migration.py
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from typing import List, Literal, Optional, Union, Annotated
+from enum import Enum
+from datetime import datetime
+
+from pydantic import BaseModel, Field
+
+
+MIGRATION_SCRIPTS = []
+
+class Dialect(Enum):
+ MYSQL = "mysql"
+ POSTGRESQL = "postgres"
+
+
+class Execute(BaseModel):
+ type: Literal["execute"] = "execute"
+ sql: str
+ dialect: Optional[Dialect] = None
+
+
+class DropColumn(BaseModel):
+ type: Literal["drop_column"] = "drop_column"
+ table: str
+ column: str
+
+
+class DropTable(BaseModel):
+ type: Literal["drop_table"] = "drop_table"
+ table: str
+
+
+Operation = Annotated[
+ Union[Execute, DropColumn, DropTable],
+ Field(discriminator="type")
+]
+
+
+class MigrationScript(BaseModel):
+ operations: List[Operation]
+ version: int
+ name: str
+
+
+class MigrationScriptBuilder:
+ def __init__(self):
+ self.operations = []
+
+ def execute(self, sql: str, dialect: Optional[Dialect] = None):
+ """
+ Executes a raw SQL statement.
+ If dialect is specified the statement will be executed only if the db dialect matches.
+ """
+ self.operations.append(Execute(sql=sql, dialect=dialect))
+
+ def drop_column(self, table: str, column: str):
+ """
+ Drops a column from a table.
+ """
+ self.operations.append(DropColumn(table=table, column=column))
+
+ def drop_table(self, table: str):
+ """
+ Drops a table.
+ """
+ self.operations.append(DropTable(table=table))
+
+
+def migration(version: int, name: Optional[str] = None):
+ """
+ Builds a migration script from a function.
+
+ Usage:
+
+ @migration(20230511)
+ def change_description_type(b: MigrationScriptBuilder):
+ b.exec('ALTER TABLE my_table ...')
+ """
+ _validate_version(version)
+
+ def wrapper(fn):
+ builder = MigrationScriptBuilder()
+ fn(builder)
+ script = MigrationScript(operations=builder.operations, version=version, name=name or fn.__name__)
+ MIGRATION_SCRIPTS.append(script)
+ return script
+ return wrapper
+
+
+def _validate_version(version: int):
+ str_version = str(version)
+ err = ValueError(f"Invalid version {version}, must be in YYYYMMDDhhmmss format")
+ if len(str_version) != 14:
+ raise err
+ try:
+ datetime.strptime(str_version, "%Y%m%d%H%M%S")
+ except ValueError:
+ raise err
diff --git a/backend/python/pydevlake/pydevlake/model.py b/backend/python/pydevlake/pydevlake/model.py
index 995c13b3a..2c576a12e 100644
--- a/backend/python/pydevlake/pydevlake/model.py
+++ b/backend/python/pydevlake/pydevlake/model.py
@@ -20,7 +20,7 @@ from inspect import getmodule
from datetime import datetime
import inflect
-from pydantic import AnyUrl, validator
+from pydantic import AnyUrl, SecretStr, validator
from sqlalchemy import Column, DateTime
from sqlalchemy.orm import declared_attr, Session
from sqlalchemy.inspection import inspect
@@ -48,6 +48,9 @@ class ToolTable(SQLModel):
class Config:
allow_population_by_field_name = True
+ json_encoders = {
+ SecretStr: lambda v: v.get_secret_value() if v else None
+ }
@classmethod
def alias_generator(cls, attr_name: str) -> str:
@@ -56,13 +59,6 @@ class ToolTable(SQLModel):
parts = attr_name.split('_')
return parts[0] + ''.join(word.capitalize() for word in parts[1:])
- @classmethod
- def migrate(cls, session: Session):
- """
- Redefine this method to perform migration on this tool model.
- """
- pass
-
class Connection(ToolTable, Model):
name: str
diff --git a/backend/python/pydevlake/pydevlake/plugin.py b/backend/python/pydevlake/pydevlake/plugin.py
index d543fe791..e94791895 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -20,16 +20,14 @@ import os
import sys
import fire
-from sqlmodel import SQLModel, Session
-from sqlalchemy.inspection import inspect
-from sqlalchemy.engine import Engine
import pydevlake.message as msg
from pydevlake.subtasks import Subtask
from pydevlake.ipc import PluginCommands
from pydevlake.context import Context
from pydevlake.stream import Stream, DomainType
-from pydevlake.model import ToolScope, DomainScope, Connection, TransformationRule, SubtaskRun
+from pydevlake.model import ToolScope, DomainScope, Connection, TransformationRule
+from pydevlake.migration import MIGRATION_SCRIPTS
ScopeTxRulePair = tuple[ToolScope, Optional[TransformationRule]]
@@ -105,24 +103,6 @@ class Plugin(ABC):
def convert(self, ctx: Context, stream: str):
yield from self.get_stream(stream).convertor.run(ctx)
- def run_migrations(self, engine: Engine, force: bool = False):
- # NOTE: Not sure what "force" is for
- # TODO: Support migration for transformation rule and connection tables
- # They are currently created on go-side.
- tool_models = [stream.tool_model for stream in self._streams.values()]
- tool_models.append(self.tool_scope_type)
- inspector = inspect(engine)
- tables = SQLModel.metadata.tables
- with Session(engine) as session:
- for model in tool_models:
- if inspector.has_table(model.__tablename__):
- # TODO: Add version table and migrate if needed
- model.migrate(session)
- else:
- tables[model.__tablename__].create(engine)
- session.commit()
- tables[SubtaskRun.__tablename__].create(engine, checkfirst=True)
-
def make_remote_scopes(self, connection: Connection, group_id: Optional[str] = None) -> msg.RemoteScopes:
if group_id:
remote_scopes = []
@@ -237,8 +217,6 @@ class Plugin(ABC):
tx_rule_model_info = msg.DynamicModelInfo.from_model(self.transformation_rule_type)
else:
tx_rule_model_info = None
- plugin_tables = [stream(self.name).raw_model_table for stream in self.streams] + \
- [stream.tool_model.__tablename__ for stream in self.streams]
return msg.PluginInfo(
name=self.name,
description=self.description,
@@ -247,8 +225,9 @@ class Plugin(ABC):
connection_model_info=msg.DynamicModelInfo.from_model(self.connection_type),
transformation_rule_model_info=tx_rule_model_info,
scope_model_info=msg.DynamicModelInfo.from_model(self.tool_scope_type),
+ tool_model_infos=[msg.DynamicModelInfo.from_model(stream.tool_model) for stream in self._streams.values()],
subtask_metas=subtask_metas,
- tables=plugin_tables,
+ migration_scripts=MIGRATION_SCRIPTS
)
def _plugin_path(self):
diff --git a/backend/python/pydevlake/tests/migration_test.py b/backend/python/pydevlake/tests/migration_test.py
new file mode 100644
index 000000000..6e19825f0
--- /dev/null
+++ b/backend/python/pydevlake/tests/migration_test.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pydevlake.migration import migration, MIGRATION_SCRIPTS
+
+
+@migration(20230520174322)
+def my_migration(b):
+ b.execute("SOME SQL")
+ b.drop_column("t", "c")
+ b.drop_table("t")
+
+
+def test_migration():
+ assert my_migration.version == 20230520174322
+ assert my_migration.name == "my_migration"
+ assert len(my_migration.operations) == 3
+
+ op1 = my_migration.operations[0]
+ assert op1.sql == "SOME SQL"
+ assert op1.dialect is None
+
+ op2 = my_migration.operations[1]
+ assert op2.table == "t"
+ assert op2.column == "c"
+
+ op3 = my_migration.operations[2]
+ assert op3.table == "t"
+
+
+def test_registration():
+ assert my_migration in MIGRATION_SCRIPTS
+
+
+def test_serialization():
+ val = my_migration.dict()
+ assert val["version"] == 20230520174322
+ assert val["name"] == "my_migration"
+ assert len(val["operations"]) == 3
+
+ op1 = val["operations"][0]
+ assert op1["type"] == "execute"
+ assert op1["sql"] == "SOME SQL"
+ assert "dialect" not in op1 or op1["dialect"] is None
+
+ op2 = val["operations"][1]
+ assert op2["type"] == "drop_column"
+ assert op2["table"] == "t"
+ assert op2["column"] == "c"
+
+ op3 = val["operations"][2]
+ assert op3["type"] == "drop_table"
+ assert op3["table"] == "t"
diff --git a/backend/python/test/fakeplugin/fakeplugin/main.py b/backend/python/test/fakeplugin/fakeplugin/main.py
index 6a048ff22..55c548965 100644
--- a/backend/python/test/fakeplugin/fakeplugin/main.py
+++ b/backend/python/test/fakeplugin/fakeplugin/main.py
@@ -19,6 +19,7 @@ from typing import Optional
import json
from sqlmodel import Field
+from pydantic import SecretStr
from pydevlake import Plugin, Connection, TransformationRule, Stream, ToolModel, ToolScope, RemoteScopeGroup, DomainType
from pydevlake.domain_layer.devops import CicdScope, CICDPipeline, CICDStatus, CICDResult, CICDType
@@ -96,7 +97,7 @@ class FakePipelineStream(Stream):
class FakeConnection(Connection):
- token: str
+ token: SecretStr
class FakeProject(ToolScope, table=True):
@@ -149,7 +150,7 @@ class FakePlugin(Plugin):
]
def test_connection(self, connection: FakeConnection):
- if connection.token != VALID_TOKEN:
+ if connection.token.get_secret_value() != VALID_TOKEN:
raise Exception("Invalid token")
@property
diff --git a/backend/server/services/remote/init.go b/backend/server/services/remote/init.go
index 3e505b96c..4c6dd8ff1 100644
--- a/backend/server/services/remote/init.go
+++ b/backend/server/services/remote/init.go
@@ -19,7 +19,7 @@ package remote
import (
"fmt"
- "github.com/apache/incubator-devlake/core/config"
+
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/errors"
pluginCore "github.com/apache/incubator-devlake/core/plugin"
@@ -43,8 +43,7 @@ func NewRemotePlugin(info *models.PluginInfo) (models.RemotePlugin, errors.Error
if err != nil {
return nil, err
}
- forceMigration := config.GetConfig().GetBool("FORCE_MIGRATION")
- err = plugin.RunMigrations(forceMigration)
+ err = plugin.RunAutoMigrations()
if err != nil {
return nil, err
}
diff --git a/backend/server/services/remote/models/conversion.go b/backend/server/services/remote/models/conversion.go
index bd221ecaa..877597ab4 100644
--- a/backend/server/services/remote/models/conversion.go
+++ b/backend/server/services/remote/models/conversion.go
@@ -20,33 +20,35 @@ package models
import (
"encoding/json"
"fmt"
- "github.com/apache/incubator-devlake/impls/dalgorm"
"reflect"
"strings"
"time"
+ "github.com/apache/incubator-devlake/impls/dalgorm"
+
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
+ "github.com/apache/incubator-devlake/core/utils"
"gorm.io/datatypes"
)
-func LoadTableModel(tableName string, schema map[string]any, encrypt bool, parentModel any) (*models.DynamicTabler, errors.Error) {
- structType, err := GenerateStructType(schema, encrypt, reflect.TypeOf(parentModel))
+func LoadTableModel(tableName string, schema utils.JsonObject, parentModel any) (*models.DynamicTabler, errors.Error) {
+ structType, err := GenerateStructType(schema, reflect.TypeOf(parentModel))
if err != nil {
return nil, err
}
return models.NewDynamicTabler(tableName, structType), nil
}
-func GenerateStructType(schema map[string]any, encrypt bool, baseType reflect.Type) (reflect.Type, errors.Error) {
+func GenerateStructType(schema utils.JsonObject, baseType reflect.Type) (reflect.Type, errors.Error) {
var structFields []reflect.StructField
- propsRaw, ok := schema["properties"]
- if !ok {
- return nil, errors.BadInput.New("Missing properties in JSON schema")
+ props, err := utils.GetProperty[utils.JsonObject](schema, "properties")
+ if err != nil {
+ return nil, err
}
- props, ok := propsRaw.(map[string]any)
- if !ok {
- return nil, errors.BadInput.New("JSON schema properties must be an object")
+ required, err := utils.GetProperty[[]string](schema, "required")
+ if err != nil {
+ return nil, err
}
if baseType != nil {
anonymousField := reflect.StructField{
@@ -61,8 +63,8 @@ func GenerateStructType(schema map[string]any, encrypt bool, baseType reflect.Ty
if isBaseTypeField(k, baseType) {
continue
}
- spec := v.(map[string]any)
- field, err := generateStructField(k, encrypt, spec)
+ spec := v.(utils.JsonObject)
+ field, err := generateStructField(k, spec, isRequired(k, required))
if err != nil {
return nil, err
}
@@ -98,6 +100,15 @@ func ToDatabaseMap(tableName string, ifc any, createdAt *time.Time, updatedAt *t
return m, nil
}
+func isRequired(fieldName string, required []string) bool {
+ for _, r := range required {
+ if fieldName == r {
+ return true
+ }
+ }
+ return false
+}
+
func isBaseTypeField(fieldName string, baseType reflect.Type) bool {
fieldName = canonicalFieldName(fieldName)
for i := 0; i < baseType.NumField(); i++ {
@@ -118,26 +129,33 @@ func canonicalFieldName(fieldName string) string {
return strings.ToLower(strings.Replace(fieldName, "_", "", -1))
}
-func generateStructField(name string, encrypt bool, schema map[string]any) (*reflect.StructField, errors.Error) {
- goType, err := getGoType(schema)
+var (
+ int64Type = reflect.TypeOf(int64(0))
+ float64Type = reflect.TypeOf(float64(0))
+ boolType = reflect.TypeOf(false)
+ stringType = reflect.TypeOf("")
+ timeType = reflect.TypeOf(time.Time{})
+ jsonMapType = reflect.TypeOf(datatypes.JSONMap{})
+)
+
+func generateStructField(name string, schema utils.JsonObject, required bool) (*reflect.StructField, errors.Error) {
+ goType, err := getGoType(schema, required)
if err != nil {
return nil, errors.Default.Wrap(err, fmt.Sprintf("couldn't resolve type for field: \"%s\"", name))
}
+ tag, err := getTag(name, schema, goType, required)
+ if err != nil {
+ return nil, err
+ }
sf := &reflect.StructField{
Name: strings.Title(name), //nolint:staticcheck
Type: goType,
- Tag: reflect.StructTag(fmt.Sprintf("json:\"%s\"", name)),
- }
- if encrypt {
- sf.Tag = reflect.StructTag(fmt.Sprintf("json:\"%s\" "+
- "gorm:\"serializer:encdec\"", //just encrypt everything for GORM operations - makes things easy
- name))
+ Tag: tag,
}
return sf, nil
}
-func getGoType(schema map[string]any) (reflect.Type, errors.Error) {
- var goType reflect.Type
+func getGoType(schema utils.JsonObject, required bool) (reflect.Type, errors.Error) {
jsonType, ok := schema["type"].(string)
if !ok {
return nil, errors.BadInput.New("\"type\" property must be a string")
@@ -145,15 +163,70 @@ func getGoType(schema map[string]any) (reflect.Type, errors.Error) {
switch jsonType {
//TODO: support more types
case "integer":
- goType = reflect.TypeOf(uint64(0))
+ return int64Type, nil
+ case "number":
+ return float64Type, nil
case "boolean":
- goType = reflect.TypeOf(false)
+ return boolType, nil
case "string":
- goType = reflect.TypeOf("")
+ format, err := utils.GetProperty[string](schema, "format")
+ if err == nil && format == "date-time" {
+ if required {
+ return timeType, nil
+ } else {
+ return reflect.PtrTo(timeType), nil
+ }
+ } else {
+ return stringType, nil
+ }
case "object":
- goType = reflect.TypeOf(datatypes.JSONMap{})
+ return jsonMapType, nil
default:
return nil, errors.BadInput.New(fmt.Sprintf("Unsupported type %s", jsonType))
}
- return goType, nil
+}
+
+func getTag(name string, schema utils.JsonObject, goType reflect.Type, required bool) (reflect.StructTag, errors.Error) {
+ tags := []string{}
+ tags = append(tags, fmt.Sprintf("json:\"%s\"", name))
+ gormTag := getGormTag(schema, goType)
+ if gormTag != "" {
+ tags = append(tags, gormTag)
+ }
+ if required {
+ tags = append(tags, "validate:\"required\"")
+ }
+ return reflect.StructTag(strings.Join(tags, " ")), nil
+}
+
+func getGormTag(schema utils.JsonObject, goType reflect.Type) string {
+ gormTags := []string{}
+ primaryKey, err := utils.GetProperty[bool](schema, "primaryKey")
+ if err == nil && primaryKey {
+ gormTags = append(gormTags, "primaryKey")
+ }
+ if goType == stringType {
+ maxLength, err := utils.GetProperty[float64](schema, "maxLength")
+ maxLengthInt := int(maxLength)
+ if err == nil {
+ if maxLengthInt > 255 {
+ gormTags = append(gormTags, "type:text")
+ } else {
+ gormTags = append(gormTags, fmt.Sprintf("type:varchar(%d)", maxLengthInt))
+ }
+ } else if primaryKey {
+ // primary keys must have a key length
+ gormTags = append(gormTags, "type:varchar(255)")
+ } else {
+ gormTags = append(gormTags, "type:text")
+ }
+ }
+ format, err := utils.GetProperty[string](schema, "format")
+ if err == nil && format == "password" {
+ gormTags = append(gormTags, "serializer:encdec")
+ }
+ if len(gormTags) == 0 {
+ return ""
+ }
+ return fmt.Sprintf("gorm:\"%s\"", strings.Join(gormTags, ";"))
}
diff --git a/backend/server/services/remote/models/conversion_test.go b/backend/server/services/remote/models/conversion_test.go
new file mode 100644
index 000000000..1b0bc9c2c
--- /dev/null
+++ b/backend/server/services/remote/models/conversion_test.go
@@ -0,0 +1,153 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import (
+ "reflect"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestGenerateSimpleField(t *testing.T) {
+ schema := map[string]interface{}{
+ "type": "integer",
+ }
+ field, err := generateStructField("i", schema, true)
+ assert.NoError(t, err)
+ assert.Equal(t, int64Type, field.Type)
+ assert.Equal(t, "I", field.Name)
+ json, ok := field.Tag.Lookup("json")
+ assert.True(t, ok)
+ assert.Equal(t, "i", json)
+ validate, ok := field.Tag.Lookup("validate")
+ assert.True(t, ok)
+ assert.Equal(t, "required", validate)
+ _, ok = field.Tag.Lookup("gorm")
+ assert.False(t, ok)
+}
+
+func TestGetGoTypeInt64(t *testing.T) {
+ schema := map[string]interface{}{
+ "type": "integer",
+ }
+ typ, err := getGoType(schema, false)
+ assert.NoError(t, err)
+ assert.Equal(t, int64Type, typ)
+}
+
+func TestGetGoTypeFloat64(t *testing.T) {
+ schema := map[string]interface{}{
+ "type": "number",
+ }
+ typ, err := getGoType(schema, false)
+ assert.NoError(t, err)
+ assert.Equal(t, float64Type, typ)
+}
+
+func TestGetGoTypeBool(t *testing.T) {
+ schema := map[string]interface{}{
+ "type": "boolean",
+ }
+ typ, err := getGoType(schema, false)
+ assert.NoError(t, err)
+ assert.Equal(t, boolType, typ)
+}
+
+func TestGetGoTypeString(t *testing.T) {
+ schema := map[string]interface{}{
+ "type": "string",
+ }
+ typ, err := getGoType(schema, false)
+ assert.NoError(t, err)
+ assert.Equal(t, stringType, typ)
+}
+
+func TestGetGoTypeTime(t *testing.T) {
+ schema := map[string]interface{}{
+ "type": "string",
+ "format": "date-time",
+ }
+ typ, err := getGoType(schema, true)
+ assert.NoError(t, err)
+ assert.Equal(t, timeType, typ)
+}
+
+func TestGetGoTypeTimePointer(t *testing.T) {
+ schema := map[string]interface{}{
+ "type": "string",
+ "format": "date-time",
+ }
+ typ, err := getGoType(schema, false)
+ assert.NoError(t, err)
+ assert.Equal(t, reflect.PtrTo(timeType), typ)
+}
+
+func TestGetGoTypeJsonMap(t *testing.T) {
+ schema := map[string]interface{}{
+ "type": "object",
+ }
+ typ, err := getGoType(schema, false)
+ assert.NoError(t, err)
+ assert.Equal(t, jsonMapType, typ)
+}
+
+func TestGetGormTagPrimaryKey(t *testing.T) {
+ schema := map[string]interface{}{
+ "type": "integer",
+ "primaryKey": true,
+ }
+ tag := getGormTag(schema, int64Type)
+ assert.Equal(t, "gorm:\"primaryKey\"", tag)
+}
+
+func TestGetGormTagVarChar(t *testing.T) {
+ schema := map[string]interface{}{
+ "type": "string",
+ "maxLength": float64(100),
+ }
+ tag := getGormTag(schema, stringType)
+ assert.Equal(t, "gorm:\"type:varchar(100)\"", tag)
+}
+
+func TestGetGormTagText(t *testing.T) {
+ schema := map[string]interface{}{
+ "type": "string",
+ "maxLength": float64(300),
+ }
+ tag := getGormTag(schema, stringType)
+ assert.Equal(t, "gorm:\"type:text\"", tag)
+}
+
+func TestGetGormTagStringPrimaryKey(t *testing.T) {
+ schema := map[string]interface{}{
+ "type": "string",
+ "primaryKey": true,
+ }
+ tag := getGormTag(schema, stringType)
+ assert.Equal(t, "gorm:\"primaryKey;type:varchar(255)\"", tag)
+}
+
+func TestGetGormTagEncDec(t *testing.T) {
+ schema := map[string]interface{}{
+ "type": "string",
+ "format": "password",
+ }
+ tag := getGormTag(schema, stringType)
+ assert.Equal(t, "gorm:\"type:text;serializer:encdec\"", tag)
+}
diff --git a/backend/server/services/remote/models/migration.go b/backend/server/services/remote/models/migration.go
new file mode 100644
index 000000000..7d980354d
--- /dev/null
+++ b/backend/server/services/remote/models/migration.go
@@ -0,0 +1,140 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import (
+ "encoding/json"
+
+ "github.com/apache/incubator-devlake/core/context"
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/plugin"
+)
+
+type Operation interface {
+ Execute(dal.Dal) errors.Error
+}
+
+type ExecuteOperation struct {
+ Sql string `json:"sql"`
+ Dialect *string `json:"dialect"`
+}
+
+func (o ExecuteOperation) Execute(dal dal.Dal) errors.Error {
+ if o.Dialect != nil {
+ if dal.Dialect() == *o.Dialect {
+ return dal.Exec(o.Sql)
+ }
+ return nil
+ } else {
+ return dal.Exec(o.Sql)
+ }
+}
+
+var _ Operation = (*ExecuteOperation)(nil)
+
+type DropColumnOperation struct {
+ Table string `json:"table"`
+ Column string `json:"column"`
+}
+
+func (o DropColumnOperation) Execute(dal dal.Dal) errors.Error {
+ return dal.DropColumns(o.Table, o.Column)
+}
+
+var _ Operation = (*DropColumnOperation)(nil)
+
+type DropTableOperation struct {
+ Table string `json:"table"`
+ Column string `json:"column"`
+}
+
+func (o DropTableOperation) Execute(dal dal.Dal) errors.Error {
+ return dal.DropTables(o.Table)
+}
+
+var _ Operation = (*DropTableOperation)(nil)
+
+type RemoteMigrationScript struct {
+ operations []Operation
+ version uint64
+ name string
+}
+
+type rawRemoteMigrationScript struct {
+ Operations []json.RawMessage `json:"operations"`
+ Version uint64 `json:"version"`
+ Name string `json:"name"`
+}
+
+func (s *RemoteMigrationScript) UnmarshalJSON(data []byte) error {
+ var rawScript rawRemoteMigrationScript
+ err := json.Unmarshal(data, &rawScript)
+ if err != nil {
+ return err
+ }
+ s.version = rawScript.Version
+ s.name = rawScript.Name
+ s.operations = make([]Operation, len(rawScript.Operations))
+ for i, operationRaw := range rawScript.Operations {
+ operationMap := make(map[string]interface{})
+ err := json.Unmarshal(operationRaw, &operationMap)
+ if err != nil {
+ return err
+ }
+ operationType := operationMap["type"].(string)
+ var operation Operation
+ switch operationType {
+ case "execute":
+ operation = &ExecuteOperation{}
+ case "drop_column":
+ operation = &DropColumnOperation{}
+ case "drop_table":
+ operation = &DropTableOperation{}
+ default:
+ return errors.BadInput.New("unsupported operation type")
+ }
+ err = json.Unmarshal(operationRaw, operation)
+ if err != nil {
+ return err
+ }
+ s.operations[i] = operation
+ }
+ return nil
+}
+
+func (s *RemoteMigrationScript) Up(basicRes context.BasicRes) errors.Error {
+ dal := basicRes.GetDal()
+ for _, operation := range s.operations {
+ err := operation.Execute(dal)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (s *RemoteMigrationScript) Version() uint64 {
+ return s.version
+}
+
+func (s *RemoteMigrationScript) Name() string {
+ return s.name
+}
+
+var _ plugin.MigrationScript = (*RemoteMigrationScript)(nil)
diff --git a/backend/server/services/remote/models/migration_test.go b/backend/server/services/remote/models/migration_test.go
new file mode 100644
index 000000000..163bfb238
--- /dev/null
+++ b/backend/server/services/remote/models/migration_test.go
@@ -0,0 +1,67 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import (
+ "encoding/json"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestUnmarshallMigrationScript(t *testing.T) {
+ raw := []byte(`{
+ "name": "test",
+ "version": 20230420123456,
+ "operations": [
+ {
+ "type": "execute",
+ "sql": "SOME SQL",
+ "dialect": "mysql"
+ },
+ {
+ "type": "drop_column",
+ "table": "t",
+ "column": "c"
+ },
+ {
+ "type": "drop_table",
+ "table": "t"
+ }
+ ]
+ }`)
+
+ var script RemoteMigrationScript
+ err := json.Unmarshal(raw, &script)
+
+ assert.NoError(t, err)
+ assert.Equal(t, "test", script.name)
+ assert.Equal(t, uint64(20230420123456), script.version)
+ assert.Len(t, script.operations, 3)
+
+ op1 := script.operations[0].(*ExecuteOperation)
+ assert.Equal(t, "SOME SQL", op1.Sql)
+ assert.Equal(t, "mysql", *op1.Dialect)
+
+ op2 := script.operations[1].(*DropColumnOperation)
+ assert.Equal(t, "t", op2.Table)
+ assert.Equal(t, "c", op2.Column)
+
+ op3 := script.operations[2].(*DropTableOperation)
+ assert.Equal(t, "t", op3.Table)
+}
diff --git a/backend/server/services/remote/models/models.go b/backend/server/services/remote/models/models.go
index b220114e1..c3bc47b51 100644
--- a/backend/server/services/remote/models/models.go
+++ b/backend/server/services/remote/models/models.go
@@ -24,30 +24,25 @@ import (
"github.com/apache/incubator-devlake/core/plugin"
)
-const (
- PythonPoetryCmd PluginType = "python-poetry"
- PythonCmd PluginType = "python"
- None PluginExtension = ""
- Metric PluginExtension = "metric"
- Datasource PluginExtension = "datasource"
-)
+type PluginExtension string
-type (
- PluginType string
- PluginExtension string
+const (
+ None PluginExtension = ""
+ Metric PluginExtension = "metric"
+ Datasource PluginExtension = "datasource"
)
type PluginInfo struct {
- Type PluginType `json:"type" validate:"required"`
- Name string `json:"name" validate:"required"`
- Extension PluginExtension `json:"extension"`
- ConnectionModelInfo *DynamicModelInfo `json:"connection_model_info" validate:"required"`
- TransformationRuleModelInfo *DynamicModelInfo `json:"transformation_rule_model_info"`
- ScopeModelInfo *DynamicModelInfo `json:"scope_model_info" validate:"dive"`
- Description string `json:"description"`
- PluginPath string `json:"plugin_path" validate:"required"`
- SubtaskMetas []SubtaskMeta `json:"subtask_metas" validate:"dive"`
- Tables []string `json:"tables"`
+ Name string `json:"name" validate:"required"`
+ Description string `json:"description"`
+ ConnectionModelInfo *DynamicModelInfo `json:"connection_model_info" validate:"required"`
+ TransformationRuleModelInfo *DynamicModelInfo `json:"transformation_rule_model_info"`
+ ScopeModelInfo *DynamicModelInfo `json:"scope_model_info" validate:"required"`
+ ToolModelInfos []*DynamicModelInfo `json:"tool_model_infos"`
+ MigrationScripts []RemoteMigrationScript `json:"migration_scripts"`
+ PluginPath string `json:"plugin_path" validate:"required"`
+ SubtaskMetas []SubtaskMeta `json:"subtask_metas"`
+ Extension PluginExtension `json:"extension"`
}
// Type aliases used by the API helper for better readability
@@ -62,8 +57,8 @@ type DynamicModelInfo struct {
TableName string `json:"table_name" validate:"required"`
}
-func (d DynamicModelInfo) LoadDynamicTabler(encrypt bool, parentModel any) (*models.DynamicTabler, errors.Error) {
- return LoadTableModel(d.TableName, d.JsonSchema, encrypt, parentModel)
+func (d DynamicModelInfo) LoadDynamicTabler(parentModel any) (*models.DynamicTabler, errors.Error) {
+ return LoadTableModel(d.TableName, d.JsonSchema, parentModel)
}
type ScopeModel struct {
diff --git a/backend/server/services/remote/models/plugin_remote.go b/backend/server/services/remote/models/plugin_remote.go
index e039912a6..1d0c0f57b 100644
--- a/backend/server/services/remote/models/plugin_remote.go
+++ b/backend/server/services/remote/models/plugin_remote.go
@@ -29,5 +29,6 @@ type RemotePlugin interface {
plugin.PluginMeta
plugin.PluginOpenApiSpec
plugin.PluginModel
- RunMigrations(forceMigrate bool) errors.Error
+ plugin.PluginMigration
+ RunAutoMigrations() errors.Error
}
diff --git a/backend/server/services/remote/plugin/plugin_impl.go b/backend/server/services/remote/plugin/plugin_impl.go
index 259d2300c..02a613748 100644
--- a/backend/server/services/remote/plugin/plugin_impl.go
+++ b/backend/server/services/remote/plugin/plugin_impl.go
@@ -19,6 +19,7 @@ package plugin
import (
"fmt"
+ "strings"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
@@ -41,9 +42,10 @@ type (
connectionTabler *coreModels.DynamicTabler
scopeTabler *coreModels.DynamicTabler
transformationRuleTabler *coreModels.DynamicTabler
+ toolModelTablers []*coreModels.DynamicTabler
+ migrationScripts []plugin.MigrationScript
resources map[string]map[string]plugin.ApiResourceHandler
openApiSpec string
- tables []dal.Tabler
}
RemotePluginTaskData struct {
DbUrl string `json:"db_url"`
@@ -55,26 +57,39 @@ type (
)
func newPlugin(info *models.PluginInfo, invoker bridge.Invoker) (*remotePluginImpl, errors.Error) {
- connectionTabler, err := info.ConnectionModelInfo.LoadDynamicTabler(true, common.Model{})
+ connectionTabler, err := info.ConnectionModelInfo.LoadDynamicTabler(common.Model{})
if err != nil {
return nil, errors.Default.Wrap(err, fmt.Sprintf("Couldn't load Connection type for plugin %s", info.Name))
}
var txRuleTabler *coreModels.DynamicTabler
if info.TransformationRuleModelInfo != nil {
- txRuleTabler, err = info.TransformationRuleModelInfo.LoadDynamicTabler(false, models.TransformationModel{})
+ txRuleTabler, err = info.TransformationRuleModelInfo.LoadDynamicTabler(models.TransformationModel{})
if err != nil {
return nil, errors.Default.Wrap(err, fmt.Sprintf("Couldn't load TransformationRule type for plugin %s", info.Name))
}
}
- scopeTabler, err := info.ScopeModelInfo.LoadDynamicTabler(false, models.ScopeModel{})
+ scopeTabler, err := info.ScopeModelInfo.LoadDynamicTabler(models.ScopeModel{})
if err != nil {
return nil, errors.Default.Wrap(err, fmt.Sprintf("Couldn't load Scope type for plugin %s", info.Name))
}
+ toolModelTablers := make([]*coreModels.DynamicTabler, len(info.ToolModelInfos))
+ for i, toolModelInfo := range info.ToolModelInfos {
+ toolModelTabler, err := toolModelInfo.LoadDynamicTabler(common.NoPKModel{})
+ if err != nil {
+ return nil, errors.Default.Wrap(err, fmt.Sprintf("Couldn't load ToolModel type for plugin %s", info.Name))
+ }
+ toolModelTablers[i] = toolModelTabler
+ }
openApiSpec, err := doc.GenerateOpenApiSpec(info)
if err != nil {
return nil, errors.Default.Wrap(err, fmt.Sprintf("Couldn't generate OpenAPI spec for plugin %s", info.Name))
}
+ scripts := make([]plugin.MigrationScript, 0)
+ for _, script := range info.MigrationScripts {
+ script := script
+ scripts = append(scripts, &script)
+ }
p := remotePluginImpl{
name: info.Name,
invoker: invoker,
@@ -83,6 +98,8 @@ func newPlugin(info *models.PluginInfo, invoker bridge.Invoker) (*remotePluginIm
connectionTabler: connectionTabler,
scopeTabler: scopeTabler,
transformationRuleTabler: txRuleTabler,
+ toolModelTablers: toolModelTablers,
+ migrationScripts: scripts,
resources: GetDefaultAPI(invoker, connectionTabler, txRuleTabler, scopeTabler, connectionHelper),
openApiSpec: *openApiSpec,
}
@@ -97,9 +114,6 @@ func newPlugin(info *models.PluginInfo, invoker bridge.Invoker) (*remotePluginIm
DomainTypes: subtask.DomainTypes,
})
}
- for _, tableName := range info.Tables {
- p.tables = append(p.tables, coreModels.NewDynamicTabler(tableName, nil))
- }
return &p, nil
}
@@ -108,7 +122,13 @@ func (p *remotePluginImpl) SubTaskMetas() []plugin.SubTaskMeta {
}
func (p *remotePluginImpl) GetTablesInfo() []dal.Tabler {
- return p.tables
+ tables := make([]dal.Tabler, 0)
+ for _, toolModelTabler := range p.toolModelTablers {
+ tables = append(tables, toolModelTabler)
+ rawTableName := strings.Replace(toolModelTabler.TableName(), "_tool_", "_raw_", 1)
+ tables = append(tables, coreModels.NewDynamicTabler(rawTableName, nil))
+ }
+ return tables
}
func (p *remotePluginImpl) PrepareTaskData(taskCtx plugin.TaskContext, options map[string]interface{}) (interface{}, errors.Error) {
@@ -191,28 +211,37 @@ func (p *remotePluginImpl) ApiResources() map[string]map[string]plugin.ApiResour
return p.resources
}
-func (p *remotePluginImpl) RunMigrations(forceMigrate bool) errors.Error {
- err := api.CallDB(basicRes.GetDal().AutoMigrate, p.connectionTabler.New())
+func (p *remotePluginImpl) RunAutoMigrations() errors.Error {
+ db := basicRes.GetDal()
+ err := api.CallDB(db.AutoMigrate, p.connectionTabler.New())
if err != nil {
return err
}
- err = api.CallDB(basicRes.GetDal().AutoMigrate, p.scopeTabler.New())
+ err = api.CallDB(db.AutoMigrate, p.scopeTabler.New())
if err != nil {
return err
}
if p.transformationRuleTabler != nil {
- err = api.CallDB(basicRes.GetDal().AutoMigrate, p.transformationRuleTabler.New())
+ err = api.CallDB(db.AutoMigrate, p.transformationRuleTabler.New())
if err != nil {
return err
}
}
- dbUrl := basicRes.GetConfig("db_url")
- err = p.invoker.Call("run-migrations", bridge.DefaultContext, dbUrl, forceMigrate).Err
- return err
+ for _, toolModelTabler := range p.toolModelTablers {
+ err = api.CallDB(db.AutoMigrate, toolModelTabler.New())
+ if err != nil {
+ return err
+ }
+ }
+ return nil
}
func (p *remotePluginImpl) OpenApiSpec() string {
return p.openApiSpec
}
+func (p *remotePluginImpl) MigrationScripts() []plugin.MigrationScript {
+ return p.migrationScripts
+}
+
var _ models.RemotePlugin = (*remotePluginImpl)(nil)