You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2022/09/20 01:12:18 UTC
[incubator-devlake] 01/02: feat: starrocks plugin support pg array
This is an automated email from the ASF dual-hosted git repository.
warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 00f34fabd8d07a35a02acc7cbadf367943067ff3
Author: long2ice <lo...@gmail.com>
AuthorDate: Mon Sep 19 19:49:10 2022 +0800
feat: starrocks plugin support pg array
---
plugins/starrocks/tasks.go | 30 +++++++++++++++++++++---------
plugins/starrocks/utils.go | 7 ++++++-
2 files changed, 27 insertions(+), 10 deletions(-)
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index b0a096d9..5bd68265 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/incubator-devlake/impl/dalgorm"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/core/dal"
+ "github.com/lib/pq"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
"gorm.io/gorm"
@@ -104,12 +105,13 @@ func LoadData(c core.SubTaskContext) errors.Error {
for _, table := range starrocksTables {
starrocksTable := strings.TrimLeft(table, "_")
- err = createTable(starrocks, db, starrocksTable, table, c, config.Extra)
+ var columnMap map[string]string
+ columnMap, err = createTable(starrocks, db, starrocksTable, table, c, config.Extra)
if err != nil {
c.GetLogger().Error(err, "create table %s in starrocks error", table)
return errors.Convert(err)
}
- err = loadData(starrocks, c, starrocksTable, table, db, config)
+ err = loadData(starrocks, c, starrocksTable, table, columnMap, db, config)
if err != nil {
c.GetLogger().Error(err, "load data %s error", table)
return errors.Convert(err)
@@ -117,10 +119,11 @@ func LoadData(c core.SubTaskContext) errors.Error {
}
return nil
}
-func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table string, c core.SubTaskContext, extra string) errors.Error {
+func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table string, c core.SubTaskContext, extra string) (map[string]string, errors.Error) {
columeMetas, err := db.GetColumns(&Table{name: table}, nil)
+ columnMap := make(map[string]string)
if err != nil {
- return err
+ return columnMap, err
}
var pks []string
var columns []string
@@ -129,9 +132,11 @@ func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table str
name := cm.Name()
starrocksDatatype, ok := cm.ColumnType()
if !ok {
- return errors.Default.New(fmt.Sprintf("Get [%s] ColumeType Failed", name))
+ return columnMap, errors.Default.New(fmt.Sprintf("Get [%s] ColumeType Failed", name))
}
- column := fmt.Sprintf("`%s` %s", name, getDataType(starrocksDatatype))
+ dataType := getDataType(starrocksDatatype)
+ columnMap[name] = dataType
+ column := fmt.Sprintf("`%s` %s", name, dataType)
columns = append(columns, column)
isPrimaryKey, ok := cm.PrimaryKey()
if isPrimaryKey && ok {
@@ -152,10 +157,10 @@ func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table str
tableSql := fmt.Sprintf("create table if not exists `%s` ( %s ) %s", starrocksTable, strings.Join(columns, ","), extra)
c.GetLogger().Info(tableSql)
_, err = errors.Convert01(starrocks.Exec(tableSql))
- return err
+ return columnMap, err
}
-func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, table string, db dal.Dal, config *StarRocksConfig) error {
+func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig) error {
offset := 0
starrocksTmpTable := starrocksTable + "_tmp"
// create tmp table in starrocks
@@ -181,7 +186,14 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, t
columns := make([]interface{}, len(cols))
columnPointers := make([]interface{}, len(cols))
for i := range columns {
- columnPointers[i] = &columns[i]
+ dataType := columnMap[cols[i]]
+ if strings.HasPrefix(dataType, "array") {
+ var arr []string
+ columns[i] = &arr
+ columnPointers[i] = pq.Array(&arr)
+ } else {
+ columnPointers[i] = &columns[i]
+ }
}
err = rows.Scan(columnPointers...)
if err != nil {
diff --git a/plugins/starrocks/utils.go b/plugins/starrocks/utils.go
index ace195ff..0aa1ce5b 100644
--- a/plugins/starrocks/utils.go
+++ b/plugins/starrocks/utils.go
@@ -17,7 +17,10 @@ limitations under the License.
package main
-import "strings"
+import (
+ "fmt"
+ "strings"
+)
func getTablesByDomainLayer(domainLayer string) []string {
switch domainLayer {
@@ -102,6 +105,8 @@ func getDataType(dataType string) string {
starrocksDatatype = "json"
} else if dataType == "uuid" {
starrocksDatatype = "char(36)"
+ } else if strings.HasSuffix(dataType, "[]") {
+ starrocksDatatype = fmt.Sprintf("array<%s>", getDataType(strings.Split(dataType, "[]")[0]))
}
return starrocksDatatype
}