You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2022/10/21 08:20:23 UTC

[incubator-devlake] branch release-v0.14 updated: fix: starrocks plugin sync data order keyword (#3538)

This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch release-v0.14
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v0.14 by this push:
     new 3c5bf8327 fix: starrocks plugin sync data order keyword (#3538)
3c5bf8327 is described below

commit 3c5bf83279cc6f929c0b1d3dae2da57119d929ab
Author: long2ice <lo...@gmail.com>
AuthorDate: Fri Oct 21 16:20:10 2022 +0800

    fix: starrocks plugin sync data order keyword (#3538)
    
    * fix: starrocks plugin sync data order keyword
    
    * ci: lint error
---
 impl/dalgorm/dalgorm.go    |  5 +++++
 plugins/core/dal/dal.go    |  2 ++
 plugins/starrocks/tasks.go | 33 ++++++++++++++++++++++-----------
 3 files changed, 29 insertions(+), 11 deletions(-)

diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index 9d16556b7..33cf7e0bf 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -216,6 +216,11 @@ func (d *Dalgorm) AllTables() ([]string, errors.Error) {
 	return filteredTables, nil
 }
 
+// Dialect returns the dialect of the database
+func (d *Dalgorm) Dialect() string {
+	return d.db.Dialector.Name()
+}
+
 // NewDalgorm FIXME ...
 func NewDalgorm(db *gorm.DB) *Dalgorm {
 	return &Dalgorm{db}
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index 257eaf5a1..3a31e1d12 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -88,6 +88,8 @@ type Dal interface {
 	GetColumns(dst schema.Tabler, filter func(columnMeta ColumnMeta) bool) (cms []ColumnMeta, err errors.Error)
 	// GetPrimarykeyFields get the PrimaryKey from `gorm` tag
 	GetPrimaryKeyFields(t reflect.Type) []reflect.StructField
+	// Dialect returns the dialect of current database
+	Dialect() string
 }
 
 // GetColumnNames returns table Column Names in database
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index 4539315a8..f23cc0959 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -113,19 +113,22 @@ func LoadData(c core.SubTaskContext) errors.Error {
 			c.GetLogger().Error(err, "create table %s in starrocks error", table)
 			return errors.Convert(err)
 		}
-		// try postgre syntax, because we can't get dialect here
-		err := errors.Convert(db.Exec("begin transaction isolation level repeatable read"))
-		if err != nil {
-			// try mysql syntax
-			err = errors.Convert(db.Exec("set session transaction isolation level repeatable read"))
+		if db.Dialect() == "postgres" {
+			err = db.Exec("begin transaction isolation level repeatable read")
 			if err != nil {
-				return err
+				return errors.Convert(err)
+			}
+		} else if db.Dialect() == "mysql" {
+			err = db.Exec("set session transaction isolation level repeatable read")
+			if err != nil {
+				return errors.Convert(err)
 			}
 			err = errors.Convert(db.Exec("start transaction"))
 			if err != nil {
-				return err
+				return errors.Convert(err)
 			}
-
+		} else {
+			return errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
 		}
 		err = errors.Convert(loadData(starrocks, c, starrocksTable, starrocksTmpTable, table, columnMap, db, config, orderBy))
 		if err != nil {
@@ -156,6 +159,14 @@ func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTmpTable string, tab
 	var pks []string
 	var orders []string
 	var columns []string
+	var separator string
+	if db.Dialect() == "postgres" {
+		separator = "\""
+	} else if db.Dialect() == "mysql" {
+		separator = "`"
+	} else {
+		return nil, "", errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
+	}
 	firstcm := ""
 	firstcmName := ""
 	for _, cm := range columeMetas {
@@ -171,18 +182,18 @@ func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTmpTable string, tab
 		isPrimaryKey, ok := cm.PrimaryKey()
 		if isPrimaryKey && ok {
 			pks = append(pks, fmt.Sprintf("`%s`", name))
-			orders = append(orders, name)
+			orders = append(orders, fmt.Sprintf("%s%s%s", separator, name, separator))
 		}
 		if firstcm == "" {
 			firstcm = fmt.Sprintf("`%s`", name)
-			firstcmName = name
+			firstcmName = fmt.Sprintf("%s%s%s", separator, name, separator)
 		}
 	}
 
 	if len(pks) == 0 {
 		pks = append(pks, firstcm)
 	}
-	orderBy := strings.Join(orders, ",")
+	orderBy := strings.Join(orders, ", ")
 	if config.OrderBy != nil {
 		if v, ok := config.OrderBy[table]; ok {
 			orderBy = v