You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ka...@apache.org on 2023/05/25 19:05:51 UTC

[incubator-devlake] branch release-v0.17 updated: fix: fix missing records azuredevops (#5138) (#5285)

This is an automated email from the ASF dual-hosted git repository.

ka94 pushed a commit to branch release-v0.17
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v0.17 by this push:
     new 50207e90d fix: fix missing records azuredevops (#5138) (#5285)
50207e90d is described below

commit 50207e90db20f492dc5662aca1760def13a77e3e
Author: Camille Teruel <ca...@gmail.com>
AuthorDate: Thu May 25 21:05:46 2023 +0200

    fix: fix missing records azuredevops (#5138) (#5285)
    
    * refactor: Move db_url parsing and engine creation out of Context
    
    * feat: Implement tool layer table migration
    
    * fix: Make build_id a primary key to prevent duplicate
    
    A job id returned by the "timeline" endpoint is not unique as it is the id of the job definition
    not of the job execution.
    Add build_id as a primary key to prevent overwriting.
    
    * fix: build fake python plugin before e2e tests
    
    * fix: Fix enum name conflict
    
    ---------
    
    Co-authored-by: Camille Teruel <ca...@meri.co>
---
 backend/Makefile                                   |  3 ++-
 backend/python/README.md                           | 19 +++++++++++++
 .../plugins/azuredevops/azuredevops/models.py      | 18 ++++++-------
 .../azuredevops/azuredevops/streams/builds.py      | 18 ++++++-------
 .../azuredevops/azuredevops/streams/jobs.py        | 18 ++++++-------
 .../azuredevops/streams/pull_requests.py           |  6 ++---
 .../python/plugins/azuredevops/tests/__init__.py   | 14 ++++++++++
 backend/python/pydevlake/pydevlake/context.py      | 31 ++--------------------
 backend/python/pydevlake/pydevlake/ipc.py          | 25 ++++++++++++++---
 backend/python/pydevlake/pydevlake/model.py        | 21 ++++++++++++++-
 backend/python/pydevlake/pydevlake/plugin.py       | 25 ++++++++++++++---
 backend/python/pydevlake/pydevlake/stream.py       |  3 ++-
 backend/python/pydevlake/pydevlake/subtasks.py     | 17 +++---------
 .../python/pydevlake/pydevlake/testing/testing.py  |  6 +++--
 backend/python/pydevlake/tests/stream_test.py      | 12 ++++++---
 .../server/services/remote/plugin/plugin_impl.go   |  3 ++-
 16 files changed, 150 insertions(+), 89 deletions(-)

diff --git a/backend/Makefile b/backend/Makefile
index a13908eab..256cbe3c1 100644
--- a/backend/Makefile
+++ b/backend/Makefile
@@ -53,6 +53,7 @@ build-server: swag
 build-python: #don't mix this with the other build commands
 	find ./python/ -name "*.sh" | xargs chmod +x &&\
 	sh python/build.sh
+	sh python/build.sh python/test
 
 build: build-plugin build-server
 
@@ -89,7 +90,7 @@ build-pydevlake:
 	poetry install -C python/pydevlake
 
 python-unit-test: build-pydevlake
-	sh python/build.sh test &&\
+	sh python/build.sh python/test &&\
 	sh ./python/run_tests.sh
 
 e2e-plugins-test:
diff --git a/backend/python/README.md b/backend/python/README.md
index db99ee5f7..a68cca8ea 100644
--- a/backend/python/README.md
+++ b/backend/python/README.md
@@ -232,6 +232,25 @@ Do not forget `table=True`, otherwise no table will be created in the database.
 
 To facilitate or even eliminate extraction, your tool models should be close to the raw data you collect. Note that if you collect data from a JSON REST API that uses camelCased properties, you can still define snake_cased attributes in your model. The camelCased attributes aliases will be generated, so no special care is needed during extraction.
 
+#### Migration of tool models
+
+Tool models, connection, scope and transformation rule types are stored in the DevLake database.
+When you change the definition of one of those types, you need to migrate the database.
+You should implement the migration logic in the model class by defining a `migrate` class method. This method takes a sqlalchemy session as argument that you can use to
+execute SQL `ALTER TABLE` statements.
+
+```python
+class User(ToolModel, table=True):
+    id: str = Field(primary_key=True)
+    name: str
+    email: str
+    age: int
+
+    @classmethod
+    def migrate(cls, session):
+        session.execute(f"ALTER TABLE {cls.__tablename__} ADD COLUMN age INT")
+```
+
 
 ### Create the stream class
 
diff --git a/backend/python/plugins/azuredevops/azuredevops/models.py b/backend/python/plugins/azuredevops/azuredevops/models.py
index 1fd6e7c89..0f432b9d3 100644
--- a/backend/python/plugins/azuredevops/azuredevops/models.py
+++ b/backend/python/plugins/azuredevops/azuredevops/models.py
@@ -46,14 +46,14 @@ class GitRepository(ToolScope, table=True):
 
 
 class GitPullRequest(ToolModel, table=True):
-    class Status(Enum):
+    class PRStatus(Enum):
         Abandoned = "abandoned"
         Active = "active"
         Completed = "completed"
 
     pull_request_id: int = Field(primary_key=True)
     description: Optional[str] = Field(sa_column=Column(Text))
-    status: Status
+    status: PRStatus
     created_by_id: str = Field(source='/createdBy/id')
     created_by_name: str = Field(source='/createdBy/displayName')
     creation_date: datetime.datetime
@@ -86,14 +86,14 @@ class GitPullRequestCommit(ToolModel, table=True):
 
 
 class Build(ToolModel, table=True):
-    class Status(Enum):
+    class BuildStatus(Enum):
         Cancelling = "cancelling"
         Completed = "completed"
         InProgress = "inProgress"
         NotStarted = "notStarted"
         Postponed = "postponed"
 
-    class Result(Enum):
+    class BuildResult(Enum):
         Canceled = "canceled"
         Failed = "failed"
         Non = "none"
@@ -104,19 +104,19 @@ class Build(ToolModel, table=True):
     name: str = Field(source='/definition/name')
     start_time: Optional[datetime.datetime]
     finish_time: Optional[datetime.datetime]
-    status: Status
-    result: Result
+    status: BuildStatus
+    result: BuildResult
     source_branch: str
     source_version: str
 
 
 class Job(ToolModel, table=True):
-    class State(Enum):
+    class JobState(Enum):
         Completed = "completed"
         InProgress = "inProgress"
         Pending = "pending"
 
-    class Result(Enum):
+    class JobResult(Enum):
         Abandoned = "abandoned"
         Canceled = "canceled"
         Failed = "failed"
@@ -125,7 +125,7 @@ class Job(ToolModel, table=True):
         SucceededWithIssues = "succeededWithIssues"
 
     id: str = Field(primary_key=True)
-    build_id: str
+    build_id: str = Field(primary_key=True)
     name: str
     startTime: datetime.datetime
     finishTime: datetime.datetime
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/builds.py b/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
index 006d3315a..97687f80a 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/builds.py
@@ -37,25 +37,25 @@ class Builds(Stream):
 
     def convert(self, b: Build, ctx: Context):
         result = None
-        if b.result == Build.Result.Canceled:
+        if b.result == Build.BuildResult.Canceled:
             result = devops.CICDResult.ABORT
-        elif b.result == Build.Result.Failed:
+        elif b.result == Build.BuildResult.Failed:
             result = devops.CICDResult.FAILURE
-        elif b.result == Build.Result.PartiallySucceeded:
+        elif b.result == Build.BuildResult.PartiallySucceeded:
             result = devops.CICDResult.SUCCESS
-        elif b.result ==  Build.Result.Succeeded:
+        elif b.result ==  Build.BuildResult.Succeeded:
             result = devops.CICDResult.SUCCESS
 
         status = None
-        if b.status == Build.Status.Cancelling:
+        if b.status == Build.BuildStatus.Cancelling:
             status = devops.CICDStatus.DONE
-        elif b.status == Build.Status.Completed:
+        elif b.status == Build.BuildStatus.Completed:
             status = devops.CICDStatus.DONE
-        elif b.status ==  Build.Status.InProgress:
+        elif b.status ==  Build.BuildStatus.InProgress:
             status = devops.CICDStatus.IN_PROGRESS
-        elif b.status == Build.Status.NotStarted:
+        elif b.status == Build.BuildStatus.NotStarted:
             status = devops.CICDStatus.IN_PROGRESS
-        elif b.status ==  Build.Status.Postponed:
+        elif b.status ==  Build.BuildStatus.Postponed:
             status = devops.CICDStatus.IN_PROGRESS
 
         type = devops.CICDType.BUILD
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py b/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
index 49676d97a..3ef53cfdf 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/jobs.py
@@ -38,25 +38,25 @@ class Jobs(Substream):
 
     def convert(self, j: Job, ctx: Context) -> Iterable[devops.CICDPipeline]:
         result = None
-        if j.result == Job.Result.Abandoned:
+        if j.result == Job.JobResult.Abandoned:
             result = devops.CICDResult.ABORT
-        elif j.result == Job.Result.Canceled:
+        elif j.result == Job.JobResult.Canceled:
             result = devops.CICDResult.ABORT
-        elif j.result == Job.Result.Failed:
+        elif j.result == Job.JobResult.Failed:
             result = devops.CICDResult.FAILURE
-        elif j.result == Job.Result.Skipped:
+        elif j.result == Job.JobResult.Skipped:
             result = devops.CICDResult.ABORT
-        elif j.result == Job.Result.Succeeded:
+        elif j.result == Job.JobResult.Succeeded:
             result = devops.CICDResult.SUCCESS
-        elif j.result == Job.Result.SucceededWithIssues:
+        elif j.result == Job.JobResult.SucceededWithIssues:
             result = devops.CICDResult.FAILURE
 
         status = None
-        if j.state == Job.State.Completed:
+        if j.state == Job.JobState.Completed:
             status = devops.CICDStatus.DONE
-        elif j.state == Job.State.InProgress:
+        elif j.state == Job.JobState.InProgress:
             status = devops.CICDStatus.IN_PROGRESS
-        if j.state == Job.State.Pending:
+        if j.state == Job.JobState.Pending:
             status = devops.CICDStatus.IN_PROGRESS
 
         type = devops.CICDType.BUILD
diff --git a/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py b/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py
index b7ab834be..2cfff0ada 100644
--- a/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py
+++ b/backend/python/plugins/azuredevops/azuredevops/streams/pull_requests.py
@@ -39,11 +39,11 @@ class GitPullRequests(Stream):
 
         # Use the same status values as GitHub plugin
         status = None
-        if pr.status == GitPullRequest.Status.Abandoned:
+        if pr.status == GitPullRequest.PRStatus.Abandoned:
             status = 'CLOSED'
-        elif pr.status == GitPullRequest.Status.Active:
+        elif pr.status == GitPullRequest.PRStatus.Active:
             status = 'OPEN'
-        elif pr.status == GitPullRequest.Status.Completed:
+        elif pr.status == GitPullRequest.PRStatus.Completed:
             status = 'MERGED'
 
         yield code.PullRequest(
diff --git a/backend/python/plugins/azuredevops/tests/__init__.py b/backend/python/plugins/azuredevops/tests/__init__.py
new file mode 100644
index 000000000..65d64ce95
--- /dev/null
+++ b/backend/python/plugins/azuredevops/tests/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+
+#     http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/backend/python/pydevlake/pydevlake/context.py b/backend/python/pydevlake/pydevlake/context.py
index 660ff8847..a9d5f4d11 100644
--- a/backend/python/pydevlake/pydevlake/context.py
+++ b/backend/python/pydevlake/pydevlake/context.py
@@ -14,52 +14,25 @@
 # limitations under the License.
 
 
-from urllib.parse import urlparse, parse_qsl
-
 from sqlalchemy.engine import Engine
-from sqlmodel import SQLModel, create_engine
 
 from pydevlake.model import Connection, TransformationRule, ToolScope
 
 
 class Context:
     def __init__(self,
-                 db_url: str,
+                 engine: Engine,
                  scope: ToolScope,
                  connection: Connection,
                  transformation_rule: TransformationRule = None,
                  options: dict = None):
-        self.db_url = db_url
+        self.engine = engine
         self.scope = scope
         self.connection = connection
         self.transformation_rule = transformation_rule
         self.options = options or {}
         self._engine = None
 
-    @property
-    def engine(self) -> Engine:
-        if not self._engine:
-            db_url, args = self.get_engine_db_url()
-            try:
-                self._engine = create_engine(db_url, connect_args=args)
-                SQLModel.metadata.create_all(self._engine)
-            except Exception as e:
-                raise Exception(f"Unable to make a database connection") from e
-        return self._engine
-
     @property
     def incremental(self) -> bool:
         return self.options.get('incremental') is True
-
-    def get_engine_db_url(self) -> tuple[str, dict[str, any]]:
-        db_url = self.db_url
-        if not db_url:
-            raise Exception("Missing db_url setting")
-        db_url = db_url.replace("postgres://", "postgresql://")
-        db_url = db_url.split('?')[0]
-        # `parseTime` parameter is not understood by MySQL driver,
-        # so we have to parse query args to remove it
-        connect_args = dict(parse_qsl(urlparse(self.db_url).query))
-        if 'parseTime' in connect_args:
-            del connect_args['parseTime']
-        return db_url, connect_args
diff --git a/backend/python/pydevlake/pydevlake/ipc.py b/backend/python/pydevlake/pydevlake/ipc.py
index 7acd95391..e59065af0 100644
--- a/backend/python/pydevlake/pydevlake/ipc.py
+++ b/backend/python/pydevlake/pydevlake/ipc.py
@@ -19,7 +19,10 @@ import json
 from functools import wraps
 from typing import Generator, TextIO, Optional
 
+from urllib.parse import urlparse, parse_qsl
 from fire.decorators import SetParseFn
+from sqlmodel import create_engine
+from sqlalchemy.engine import Engine
 
 from pydevlake.context import Context
 from pydevlake.message import Message
@@ -98,8 +101,8 @@ class PluginCommands:
         return self._plugin.make_pipeline(scope_tx_rule_pairs, entities, connection)
 
     @plugin_method
-    def run_migrations(self, force: bool):
-        self._plugin.run_migrations(force)
+    def run_migrations(self, db_url, force: bool):
+        self._plugin.run_migrations(create_db_engine(db_url), force)
 
     @plugin_method
     def plugin_info(self):
@@ -122,4 +125,20 @@ class PluginCommands:
         else:
             transformation_rule = None
         options = data.get('options', {})
-        return Context(db_url, scope, connection, transformation_rule, options)
+        return Context(create_db_engine(db_url), scope, connection, transformation_rule, options)
+
+def create_db_engine(db_url) -> Engine:
+    # SQLAlchemy doesn't understand postgres:// scheme
+    db_url = db_url.replace("postgres://", "postgresql://")
+    # Remove query args
+    base_url = db_url.split('?')[0]
+    # `parseTime` parameter is not understood by MySQL driver,
+    # so we have to parse query args to remove it
+    connect_args = dict(parse_qsl(urlparse(db_url).query))
+    if 'parseTime' in connect_args:
+        del connect_args['parseTime']
+    try:
+        engine = create_engine(base_url, connect_args=connect_args)
+        return engine
+    except Exception as e:
+        raise Exception(f"Unable to make a database connection") from e
diff --git a/backend/python/pydevlake/pydevlake/model.py b/backend/python/pydevlake/pydevlake/model.py
index 135e10c0a..995c13b3a 100644
--- a/backend/python/pydevlake/pydevlake/model.py
+++ b/backend/python/pydevlake/pydevlake/model.py
@@ -22,7 +22,7 @@ from datetime import datetime
 import inflect
 from pydantic import AnyUrl, validator
 from sqlalchemy import Column, DateTime
-from sqlalchemy.orm import declared_attr
+from sqlalchemy.orm import declared_attr, Session
 from sqlalchemy.inspection import inspect
 from sqlmodel import SQLModel, Field
 
@@ -56,6 +56,13 @@ class ToolTable(SQLModel):
             parts = attr_name.split('_')
             return parts[0] + ''.join(word.capitalize() for word in parts[1:])
 
+    @classmethod
+    def migrate(cls, session: Session):
+        """
+        Redefine this method to perform migration on this tool model.
+        """
+        pass
+
 
 class Connection(ToolTable, Model):
     name: str
@@ -163,3 +170,15 @@ def _get_plugin_name(cls):
     # that is not a python module
     depth = len(module.__name__.split('.')) + 1
     return path_segments[-depth]
+
+
+class SubtaskRun(SQLModel, table=True):
+    """
+    Table storing information about the execution of subtasks.
+    """
+    id: Optional[int] = Field(primary_key=True)
+    subtask_name: str
+    connection_id: int
+    started: datetime
+    completed: Optional[datetime]
+    state: str # JSON encoded dict of atomic values
diff --git a/backend/python/pydevlake/pydevlake/plugin.py b/backend/python/pydevlake/pydevlake/plugin.py
index d14b9ad5d..4958e729e 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -20,13 +20,16 @@ import os
 import sys
 
 import fire
+from sqlmodel import SQLModel, Session
+from sqlalchemy.inspection import inspect
+from sqlalchemy.engine import Engine
 
 import pydevlake.message as msg
 from pydevlake.subtasks import Subtask
 from pydevlake.ipc import PluginCommands
 from pydevlake.context import Context
 from pydevlake.stream import Stream, DomainType
-from pydevlake.model import ToolScope, DomainScope, Connection, TransformationRule
+from pydevlake.model import ToolScope, DomainScope, Connection, TransformationRule, SubtaskRun
 
 
 ScopeTxRulePair = tuple[ToolScope, Optional[TransformationRule]]
@@ -102,9 +105,23 @@ class Plugin(ABC):
     def convert(self, ctx: Context, stream: str):
         yield from self.get_stream(stream).convertor.run(ctx)
 
-    def run_migrations(self, force: bool):
-        # TODO: Create tables
-        pass
+    def run_migrations(self, engine: Engine, force: bool = False):
+        # NOTE: Not sure what "force" is for
+        # TODO: Support migration for transformation rule and connection tables
+        # They are currently created on go-side.
+        tool_models = [stream.tool_model for stream in self._streams.values()]
+        tool_models.append(self.tool_scope_type)
+        inspector = inspect(engine)
+        tables = SQLModel.metadata.tables
+        with Session(engine) as session:
+            for model in tool_models:
+                if inspector.has_table(model.__tablename__):
+                    # TODO: Add version table and migrate if needed
+                    model.migrate(session)
+                else:
+                    tables[model.__tablename__].create(engine)
+            session.commit()
+        tables[SubtaskRun.__tablename__].create(engine, checkfirst=True)
 
     def make_remote_scopes(self, connection: Connection, group_id: Optional[str] = None) -> msg.RemoteScopes:
         if group_id:
diff --git a/backend/python/pydevlake/pydevlake/stream.py b/backend/python/pydevlake/pydevlake/stream.py
index d6df9f7f4..ae7e421bd 100644
--- a/backend/python/pydevlake/pydevlake/stream.py
+++ b/backend/python/pydevlake/pydevlake/stream.py
@@ -80,7 +80,8 @@ class Stream:
             __tablename__ = table_name
 
         self._raw_model = StreamRawModel
-        RawModel.metadata.create_all(session.get_bind())
+        table = RawModel.metadata.tables[table_name]
+        table.create(session.get_bind(), checkfirst=True)
         return self._raw_model
 
     def collect(self, state, context) -> Iterable[tuple[object, dict]]:
diff --git a/backend/python/pydevlake/pydevlake/subtasks.py b/backend/python/pydevlake/pydevlake/subtasks.py
index fb5b6c4fa..c343c359f 100644
--- a/backend/python/pydevlake/pydevlake/subtasks.py
+++ b/backend/python/pydevlake/pydevlake/subtasks.py
@@ -17,13 +17,13 @@
 from abc import abstractmethod
 import json
 from datetime import datetime
-from typing import Tuple, Dict, Iterable, Optional, Generator
+from typing import Tuple, Dict, Iterable, Generator
 
 
 import sqlalchemy.sql as sql
-from sqlmodel import Session, SQLModel, Field, select
+from sqlmodel import Session, select
 
-from pydevlake.model import RawModel, ToolModel, DomainModel
+from pydevlake.model import RawModel, ToolModel, DomainModel, SubtaskRun
 from pydevlake.context import Context
 from pydevlake.message import RemoteProgress
 from pydevlake import logger
@@ -132,17 +132,6 @@ class Subtask:
             "scope_id": ctx.scope.id
         })
 
-class SubtaskRun(SQLModel, table=True):
-    """
-    Table storing information about the execution of subtasks.
-    """
-    id: Optional[int] = Field(primary_key=True)
-    subtask_name: str
-    connection_id: int
-    started: datetime
-    completed: Optional[datetime]
-    state: str # JSON encoded dict of atomic values
-
 
 class Collector(Subtask):
     @property
diff --git a/backend/python/pydevlake/pydevlake/testing/testing.py b/backend/python/pydevlake/pydevlake/testing/testing.py
index c690e95ed..41ac87ac2 100644
--- a/backend/python/pydevlake/pydevlake/testing/testing.py
+++ b/backend/python/pydevlake/pydevlake/testing/testing.py
@@ -17,6 +17,8 @@ import pytest
 
 from typing import Union, Type, Iterable, Generator, Optional
 
+from sqlmodel import create_engine
+
 from pydevlake.context import Context
 from pydevlake.plugin import Plugin
 from pydevlake.message import RemoteScopeGroup, PipelineTask
@@ -49,7 +51,7 @@ class ContextBuilder:
 
     def build(self):
         return Context(
-            db_url='sqlite:///:memory:',
+            engine=create_engine('sqlite:///:memory:'),
             scope=self.scope,
             connection=self.connection,
             transformation_rule=self.transformation_rule
@@ -78,7 +80,7 @@ def assert_stream_run(stream: Stream, connection: Connection, scope: ToolScope,
     """
     Test that a stream can run all 3 steps without error.
     """
-    ctx = Context(db_url='sqlite:///:memory:', connection=connection, scope=scope, transformation_rule=transformation_rule)
+    ctx = ContextBuilder().with_connection(connection).with_scope(scope).with_transformation_rule(transformation_rule).build()
     stream.collector.run(ctx)
     stream.extractor.run(ctx)
     stream.convertor.run(ctx)
diff --git a/backend/python/pydevlake/tests/stream_test.py b/backend/python/pydevlake/tests/stream_test.py
index 6fe8f8669..d1caee668 100644
--- a/backend/python/pydevlake/tests/stream_test.py
+++ b/backend/python/pydevlake/tests/stream_test.py
@@ -17,7 +17,7 @@
 import json
 
 import pytest
-from sqlmodel import Session, Field
+from sqlmodel import SQLModel, Session, Field, create_engine
 
 from pydevlake import Stream, Connection, Context, DomainType
 from pydevlake.model import ToolModel, DomainModel, ToolScope
@@ -58,6 +58,12 @@ class DummyConnection(Connection):
     raw_data: list[dict]
 
 
+@pytest.fixture
+def engine():
+    engine = create_engine("sqlite+pysqlite:///:memory:")
+    SQLModel.metadata.create_all(engine)
+    return engine
+
 @pytest.fixture
 def raw_data():
     return [
@@ -77,9 +83,9 @@ def scope():
 
 
 @pytest.fixture
-def ctx(connection, scope):
+def ctx(connection, scope, engine):
     return Context(
-        db_url="sqlite+pysqlite:///:memory:",
+        engine=engine,
         scope=scope,
         connection=connection,
         options={}
diff --git a/backend/server/services/remote/plugin/plugin_impl.go b/backend/server/services/remote/plugin/plugin_impl.go
index e8f563690..e46bd3157 100644
--- a/backend/server/services/remote/plugin/plugin_impl.go
+++ b/backend/server/services/remote/plugin/plugin_impl.go
@@ -198,7 +198,8 @@ func (p *remotePluginImpl) RunMigrations(forceMigrate bool) errors.Error {
 			return err
 		}
 	}
-	err = p.invoker.Call("run-migrations", bridge.DefaultContext, forceMigrate).Err
+	dbUrl := basicRes.GetConfig("db_url")
+	err = p.invoker.Call("run-migrations", bridge.DefaultContext, dbUrl, forceMigrate).Err
 	return err
 }