You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2022/09/20 01:12:18 UTC

[incubator-devlake] 01/02: feat: starrocks plugin support pg array

This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git

commit 00f34fabd8d07a35a02acc7cbadf367943067ff3
Author: long2ice <lo...@gmail.com>
AuthorDate: Mon Sep 19 19:49:10 2022 +0800

    feat: starrocks plugin support pg array
---
 plugins/starrocks/tasks.go | 30 +++++++++++++++++++++---------
 plugins/starrocks/utils.go |  7 ++++++-
 2 files changed, 27 insertions(+), 10 deletions(-)

diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index b0a096d9..5bd68265 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -26,6 +26,7 @@ import (
 	"github.com/apache/incubator-devlake/impl/dalgorm"
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/apache/incubator-devlake/plugins/core/dal"
+	"github.com/lib/pq"
 	"gorm.io/driver/mysql"
 	"gorm.io/driver/postgres"
 	"gorm.io/gorm"
@@ -104,12 +105,13 @@ func LoadData(c core.SubTaskContext) errors.Error {
 
 	for _, table := range starrocksTables {
 		starrocksTable := strings.TrimLeft(table, "_")
-		err = createTable(starrocks, db, starrocksTable, table, c, config.Extra)
+		var columnMap map[string]string
+		columnMap, err = createTable(starrocks, db, starrocksTable, table, c, config.Extra)
 		if err != nil {
 			c.GetLogger().Error(err, "create table %s in starrocks error", table)
 			return errors.Convert(err)
 		}
-		err = loadData(starrocks, c, starrocksTable, table, db, config)
+		err = loadData(starrocks, c, starrocksTable, table, columnMap, db, config)
 		if err != nil {
 			c.GetLogger().Error(err, "load data %s error", table)
 			return errors.Convert(err)
@@ -117,10 +119,11 @@ func LoadData(c core.SubTaskContext) errors.Error {
 	}
 	return nil
 }
-func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table string, c core.SubTaskContext, extra string) errors.Error {
+func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table string, c core.SubTaskContext, extra string) (map[string]string, errors.Error) {
 	columeMetas, err := db.GetColumns(&Table{name: table}, nil)
+	columnMap := make(map[string]string)
 	if err != nil {
-		return err
+		return columnMap, err
 	}
 	var pks []string
 	var columns []string
@@ -129,9 +132,11 @@ func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table str
 		name := cm.Name()
 		starrocksDatatype, ok := cm.ColumnType()
 		if !ok {
-			return errors.Default.New(fmt.Sprintf("Get [%s] ColumeType Failed", name))
+			return columnMap, errors.Default.New(fmt.Sprintf("Get [%s] ColumeType Failed", name))
 		}
-		column := fmt.Sprintf("`%s` %s", name, getDataType(starrocksDatatype))
+		dataType := getDataType(starrocksDatatype)
+		columnMap[name] = dataType
+		column := fmt.Sprintf("`%s` %s", name, dataType)
 		columns = append(columns, column)
 		isPrimaryKey, ok := cm.PrimaryKey()
 		if isPrimaryKey && ok {
@@ -152,10 +157,10 @@ func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table str
 	tableSql := fmt.Sprintf("create table if not exists `%s` ( %s ) %s", starrocksTable, strings.Join(columns, ","), extra)
 	c.GetLogger().Info(tableSql)
 	_, err = errors.Convert01(starrocks.Exec(tableSql))
-	return err
+	return columnMap, err
 }
 
-func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, table string, db dal.Dal, config *StarRocksConfig) error {
+func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig) error {
 	offset := 0
 	starrocksTmpTable := starrocksTable + "_tmp"
 	// create tmp table in starrocks
@@ -181,7 +186,14 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, t
 			columns := make([]interface{}, len(cols))
 			columnPointers := make([]interface{}, len(cols))
 			for i := range columns {
-				columnPointers[i] = &columns[i]
+				dataType := columnMap[cols[i]]
+				if strings.HasPrefix(dataType, "array") {
+					var arr []string
+					columns[i] = &arr
+					columnPointers[i] = pq.Array(&arr)
+				} else {
+					columnPointers[i] = &columns[i]
+				}
 			}
 			err = rows.Scan(columnPointers...)
 			if err != nil {
diff --git a/plugins/starrocks/utils.go b/plugins/starrocks/utils.go
index ace195ff..0aa1ce5b 100644
--- a/plugins/starrocks/utils.go
+++ b/plugins/starrocks/utils.go
@@ -17,7 +17,10 @@ limitations under the License.
 
 package main
 
-import "strings"
+import (
+	"fmt"
+	"strings"
+)
 
 func getTablesByDomainLayer(domainLayer string) []string {
 	switch domainLayer {
@@ -102,6 +105,8 @@ func getDataType(dataType string) string {
 		starrocksDatatype = "json"
 	} else if dataType == "uuid" {
 		starrocksDatatype = "char(36)"
+	} else if strings.HasSuffix(dataType, "[]") {
+		starrocksDatatype = fmt.Sprintf("array<%s>", getDataType(strings.Split(dataType, "[]")[0]))
 	}
 	return starrocksDatatype
 }