You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2022/10/21 08:20:23 UTC
[incubator-devlake] branch release-v0.14 updated: fix: starrocks plugin sync data order keyword (#3538)
This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch release-v0.14
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/release-v0.14 by this push:
new 3c5bf8327 fix: starrocks plugin sync data order keyword (#3538)
3c5bf8327 is described below
commit 3c5bf83279cc6f929c0b1d3dae2da57119d929ab
Author: long2ice <lo...@gmail.com>
AuthorDate: Fri Oct 21 16:20:10 2022 +0800
fix: starrocks plugin sync data order keyword (#3538)
* fix: starrocks plugin sync data order keyword
* ci: lint error
---
impl/dalgorm/dalgorm.go | 5 +++++
plugins/core/dal/dal.go | 2 ++
plugins/starrocks/tasks.go | 33 ++++++++++++++++++++++-----------
3 files changed, 29 insertions(+), 11 deletions(-)
diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index 9d16556b7..33cf7e0bf 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -216,6 +216,11 @@ func (d *Dalgorm) AllTables() ([]string, errors.Error) {
return filteredTables, nil
}
+// Dialect returns the dialect of the database
+func (d *Dalgorm) Dialect() string {
+ return d.db.Dialector.Name()
+}
+
// NewDalgorm FIXME ...
func NewDalgorm(db *gorm.DB) *Dalgorm {
return &Dalgorm{db}
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 257eaf5a1..3a31e1d12 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -88,6 +88,8 @@ type Dal interface {
GetColumns(dst schema.Tabler, filter func(columnMeta ColumnMeta) bool) (cms []ColumnMeta, err errors.Error)
// GetPrimarykeyFields get the PrimaryKey from `gorm` tag
GetPrimaryKeyFields(t reflect.Type) []reflect.StructField
+ // Dialect returns the dialect of current database
+ Dialect() string
}
// GetColumnNames returns table Column Names in database
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index 4539315a8..f23cc0959 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -113,19 +113,22 @@ func LoadData(c core.SubTaskContext) errors.Error {
c.GetLogger().Error(err, "create table %s in starrocks error", table)
return errors.Convert(err)
}
- // try postgre syntax, because we can't get dialect here
- err := errors.Convert(db.Exec("begin transaction isolation level repeatable read"))
- if err != nil {
- // try mysql syntax
- err = errors.Convert(db.Exec("set session transaction isolation level repeatable read"))
+ if db.Dialect() == "postgres" {
+ err = db.Exec("begin transaction isolation level repeatable read")
if err != nil {
- return err
+ return errors.Convert(err)
+ }
+ } else if db.Dialect() == "mysql" {
+ err = db.Exec("set session transaction isolation level repeatable read")
+ if err != nil {
+ return errors.Convert(err)
}
err = errors.Convert(db.Exec("start transaction"))
if err != nil {
- return err
+ return errors.Convert(err)
}
-
+ } else {
+ return errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
}
err = errors.Convert(loadData(starrocks, c, starrocksTable, starrocksTmpTable, table, columnMap, db, config, orderBy))
if err != nil {
@@ -156,6 +159,14 @@ func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTmpTable string, tab
var pks []string
var orders []string
var columns []string
+ var separator string
+ if db.Dialect() == "postgres" {
+ separator = "\""
+ } else if db.Dialect() == "mysql" {
+ separator = "`"
+ } else {
+ return nil, "", errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
+ }
firstcm := ""
firstcmName := ""
for _, cm := range columeMetas {
@@ -171,18 +182,18 @@ func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTmpTable string, tab
isPrimaryKey, ok := cm.PrimaryKey()
if isPrimaryKey && ok {
pks = append(pks, fmt.Sprintf("`%s`", name))
- orders = append(orders, name)
+ orders = append(orders, fmt.Sprintf("%s%s%s", separator, name, separator))
}
if firstcm == "" {
firstcm = fmt.Sprintf("`%s`", name)
- firstcmName = name
+ firstcmName = fmt.Sprintf("%s%s%s", separator, name, separator)
}
}
if len(pks) == 0 {
pks = append(pks, firstcm)
}
- orderBy := strings.Join(orders, ",")
+ orderBy := strings.Join(orders, ", ")
if config.OrderBy != nil {
if v, ok := config.OrderBy[table]; ok {
orderBy = v