You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2022/09/01 12:26:37 UTC
[incubator-devlake] 01/02: feat: support source dsn
This is an automated email from the ASF dual-hosted git repository.
warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 80233635d0399906b3c6c75a8c622e82bf5701c9
Author: Jinlong Peng <ji...@merico.dev>
AuthorDate: Thu Sep 1 16:58:01 2022 +0800
feat: support source dsn
---
.../src/data/pipeline-config-samples/starrocks.js | 2 ++
plugins/starrocks/api/connection.go | 2 ++
plugins/starrocks/starrocks.go | 24 ++++++++++-------
plugins/starrocks/task_data.go | 2 ++
plugins/starrocks/tasks.go | 31 +++++++++++++++++++---
plugins/starrocks/utils.go | 2 ++
6 files changed, 49 insertions(+), 14 deletions(-)
diff --git a/config-ui/src/data/pipeline-config-samples/starrocks.js b/config-ui/src/data/pipeline-config-samples/starrocks.js
index a6031392..dc8cde08 100644
--- a/config-ui/src/data/pipeline-config-samples/starrocks.js
+++ b/config-ui/src/data/pipeline-config-samples/starrocks.js
@@ -20,6 +20,8 @@ const starRocksConfig = [
{
plugin: 'starrocks',
options: {
+ source_type: '', // mysql or postgres
+ source_dsn: '', // gorm dsn
host: '127.0.0.1',
port: 9030,
user: 'root',
diff --git a/plugins/starrocks/api/connection.go b/plugins/starrocks/api/connection.go
index f6ac6b0d..21daaddd 100644
--- a/plugins/starrocks/api/connection.go
+++ b/plugins/starrocks/api/connection.go
@@ -36,6 +36,8 @@ func PostStarRocksPipeline(input *core.ApiResourceInput) (*core.ApiResourceOutpu
type StarRocksPipelinePlan [][]struct {
Plugin string `json:"plugin"`
Options struct {
+ SourceType string `json:"source_type"`
+ SourceDsn string `json:"source_dsn"`
Host string `json:"host"`
Port int `json:"port"`
User string `json:"user"`
diff --git a/plugins/starrocks/starrocks.go b/plugins/starrocks/starrocks.go
index eb3f6109..14c4175d 100644
--- a/plugins/starrocks/starrocks.go
+++ b/plugins/starrocks/starrocks.go
@@ -59,6 +59,8 @@ var PluginEntry StarRocks
func main() {
cmd := &cobra.Command{Use: "StarRocks"}
+ sourceType := cmd.Flags().StringP("source_type", "sp", "", "Source type")
+ sourceDsn := cmd.Flags().StringP("source_dsn", "sd", "", "Source dsn")
_ = cmd.MarkFlagRequired("host")
host := cmd.Flags().StringP("host", "h", "", "StarRocks host")
_ = cmd.MarkFlagRequired("port")
@@ -80,16 +82,18 @@ func main() {
extra := cmd.Flags().StringP("extra", "e", "", "StarRocks create table sql extra")
cmd.Run = func(cmd *cobra.Command, args []string) {
runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
- "host": host,
- "port": port,
- "user": user,
- "password": password,
- "database": database,
- "be_host": beHost,
- "be_port": bePort,
- "tables": tables,
- "batch_size": batchSize,
- "extra": extra,
+ "source_type": sourceType,
+ "source_dsn": sourceDsn,
+ "host": host,
+ "port": port,
+ "user": user,
+ "password": password,
+ "database": database,
+ "be_host": beHost,
+ "be_port": bePort,
+ "tables": tables,
+ "batch_size": batchSize,
+ "extra": extra,
})
}
runner.RunCmd(cmd)
diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/task_data.go
index 76f4b1b4..bb0879b8 100644
--- a/plugins/starrocks/task_data.go
+++ b/plugins/starrocks/task_data.go
@@ -17,6 +17,8 @@ limitations under the License.
package main
type StarRocksConfig struct {
+ SourceType string `mapstructure:"source_type"`
+ SourceDsn string `mapstructure:"source_dsn"`
Host string
Port int
User string
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index 61d4989a..ece4131c 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -21,14 +21,17 @@ import (
"database/sql"
"encoding/json"
"fmt"
+ "github.com/apache/incubator-devlake/impl/dalgorm"
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
+ "gorm.io/driver/mysql"
+ "gorm.io/driver/postgres"
+ "gorm.io/gorm"
"io"
"net/http"
"net/url"
"regexp"
"strings"
-
- "github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/plugins/core/dal"
)
type Table struct {
@@ -40,8 +43,28 @@ func (t *Table) TableName() string {
}
func LoadData(c core.SubTaskContext) error {
+ var db dal.Dal
config := c.GetData().(*StarRocksConfig)
- db := c.GetDal()
+ if config.SourceDsn != "" && config.SourceType != "" {
+ var o *gorm.DB
+ var err error
+ if config.SourceType == "mysql" {
+ o, err = gorm.Open(mysql.Open(config.SourceDsn))
+ if err != nil {
+ return err
+ }
+ } else if config.SourceType == "postgres" {
+ o, err = gorm.Open(postgres.Open(config.SourceDsn))
+ if err != nil {
+ return err
+ }
+ } else {
+ return fmt.Errorf("unsupported source type %s", config.SourceType)
+ }
+ db = dalgorm.NewDalgorm(o)
+ } else {
+ db = c.GetDal()
+ }
var starrocksTables []string
if config.DomainLayer != "" {
starrocksTables = getTablesByDomainLayer(config.DomainLayer)
diff --git a/plugins/starrocks/utils.go b/plugins/starrocks/utils.go
index 0306ec3a..a68c79e5 100644
--- a/plugins/starrocks/utils.go
+++ b/plugins/starrocks/utils.go
@@ -99,6 +99,8 @@ func getDataType(dataType string) string {
starrocksDatatype = "double"
} else if stringIn(dataType, "json", "jsonb") {
starrocksDatatype = "json"
+ } else if dataType == "uuid" {
+ starrocksDatatype = "char(36)"
}
return starrocksDatatype
}