You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by zh...@apache.org on 2023/03/20 08:16:02 UTC
[incubator-devlake] branch main updated: fix: add db close connection (#4710)
This is an automated email from the ASF dual-hosted git repository.
zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new e1aad1542 fix: add db close connection (#4710)
e1aad1542 is described below
commit e1aad1542990d1ea99d3689cb388b84a9bb3bed6
Author: abeizn <zi...@merico.dev>
AuthorDate: Mon Mar 20 16:15:57 2023 +0800
fix: add db close connection (#4710)
* fix: add db close connection
* fix: add db close connection
* fix: add db close connection and adjust getDbInstance
---
backend/plugins/starrocks/tasks/tasks.go | 58 +++++++++++++++++++-------------
1 file changed, 34 insertions(+), 24 deletions(-)
diff --git a/backend/plugins/starrocks/tasks/tasks.go b/backend/plugins/starrocks/tasks/tasks.go
index 648271631..b69dc5197 100644
--- a/backend/plugins/starrocks/tasks/tasks.go
+++ b/backend/plugins/starrocks/tasks/tasks.go
@@ -63,10 +63,22 @@ func ExportData(c plugin.SubTaskContext) errors.Error {
config := c.GetData().(*StarRocksConfig)
// 1. Get db instance
- db, err := getDbInstance(c)
- if err != nil {
- return errors.Convert(err)
+ var db dal.Dal
+ if config.SourceDsn != "" && config.SourceType != "" {
+ o, err := getDbInstance(c)
+ if err != nil {
+ return errors.Convert(err)
+ }
+ db = dalgorm.NewDalgorm(o)
+ sqlDB, err := o.DB()
+ if err != nil {
+ return errors.Convert(err)
+ }
+ defer sqlDB.Close()
+ } else {
+ db = c.GetDal()
}
+
// 2. Filter out the tables to export
starrocksTables, err := getExportingTables(c, db)
if err != nil {
@@ -78,6 +90,11 @@ func ExportData(c plugin.SubTaskContext) errors.Error {
return errors.Convert(err)
}
starrocksDb := dalgorm.NewDalgorm(sr)
+ sqlStarrocksDB, err := sr.DB()
+ if err != nil {
+ return errors.Convert(err)
+ }
+ defer sqlStarrocksDB.Close()
for _, table := range starrocksTables {
select {
@@ -395,31 +412,24 @@ func putBatchData(c plugin.SubTaskContext, starrocksTmpTable, table string, data
}
// get db instance
-func getDbInstance(c plugin.SubTaskContext) (db dal.Dal, err error) {
+func getDbInstance(c plugin.SubTaskContext) (o *gorm.DB, err error) {
config := c.GetData().(*StarRocksConfig)
- if config.SourceDsn != "" && config.SourceType != "" {
- var o *gorm.DB
- switch config.SourceType {
- case "mysql":
- o, err = gorm.Open(mysql.Open(config.SourceDsn))
- if err != nil {
- return nil, err
- }
- case "postgres":
- o, err = gorm.Open(postgres.Open(config.SourceDsn))
- if err != nil {
- return nil, err
- }
- default:
- return nil, errors.NotFound.New(fmt.Sprintf("unsupported source type %s", config.SourceType))
+ switch config.SourceType {
+ case "mysql":
+ o, err = gorm.Open(mysql.Open(config.SourceDsn))
+ if err != nil {
+ return nil, err
}
- db = dalgorm.NewDalgorm(o)
- } else {
- db = c.GetDal()
+ case "postgres":
+ o, err = gorm.Open(postgres.Open(config.SourceDsn))
+ if err != nil {
+ return nil, err
+ }
+ default:
+ return nil, errors.NotFound.New(fmt.Sprintf("unsupported source type %s", config.SourceType))
}
- return db, nil
-
+ return o, nil
}
// get imported tables