You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2022/09/01 12:26:37 UTC

[incubator-devlake] 01/02: feat: support source dsn

This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git

commit 80233635d0399906b3c6c75a8c622e82bf5701c9
Author: Jinlong Peng <ji...@merico.dev>
AuthorDate: Thu Sep 1 16:58:01 2022 +0800

    feat: support source dsn
---
 .../src/data/pipeline-config-samples/starrocks.js  |  2 ++
 plugins/starrocks/api/connection.go                |  2 ++
 plugins/starrocks/starrocks.go                     | 24 ++++++++++-------
 plugins/starrocks/task_data.go                     |  2 ++
 plugins/starrocks/tasks.go                         | 31 +++++++++++++++++++---
 plugins/starrocks/utils.go                         |  2 ++
 6 files changed, 49 insertions(+), 14 deletions(-)

diff --git a/config-ui/src/data/pipeline-config-samples/starrocks.js b/config-ui/src/data/pipeline-config-samples/starrocks.js
index a6031392..dc8cde08 100644
--- a/config-ui/src/data/pipeline-config-samples/starrocks.js
+++ b/config-ui/src/data/pipeline-config-samples/starrocks.js
@@ -20,6 +20,8 @@ const starRocksConfig = [
     {
       plugin: 'starrocks',
       options: {
+        source_type: '', // mysql or postgres
+        source_dsn: '', // gorm dsn
         host: '127.0.0.1',
         port: 9030,
         user: 'root',
diff --git a/plugins/starrocks/api/connection.go b/plugins/starrocks/api/connection.go
index f6ac6b0d..21daaddd 100644
--- a/plugins/starrocks/api/connection.go
+++ b/plugins/starrocks/api/connection.go
@@ -36,6 +36,8 @@ func PostStarRocksPipeline(input *core.ApiResourceInput) (*core.ApiResourceOutpu
 type StarRocksPipelinePlan [][]struct {
 	Plugin  string `json:"plugin"`
 	Options struct {
+		SourceType  string   `json:"source_type"`
+		SourceDsn   string   `json:"source_dsn"`
 		Host        string   `json:"host"`
 		Port        int      `json:"port"`
 		User        string   `json:"user"`
diff --git a/plugins/starrocks/starrocks.go b/plugins/starrocks/starrocks.go
index eb3f6109..14c4175d 100644
--- a/plugins/starrocks/starrocks.go
+++ b/plugins/starrocks/starrocks.go
@@ -59,6 +59,8 @@ var PluginEntry StarRocks
 
 func main() {
 	cmd := &cobra.Command{Use: "StarRocks"}
+	sourceType := cmd.Flags().StringP("source_type", "sp", "", "Source type")
+	sourceDsn := cmd.Flags().StringP("source_dsn", "sd", "", "Source dsn")
 	_ = cmd.MarkFlagRequired("host")
 	host := cmd.Flags().StringP("host", "h", "", "StarRocks host")
 	_ = cmd.MarkFlagRequired("port")
@@ -80,16 +82,18 @@ func main() {
 	extra := cmd.Flags().StringP("extra", "e", "", "StarRocks create table sql extra")
 	cmd.Run = func(cmd *cobra.Command, args []string) {
 		runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
-			"host":       host,
-			"port":       port,
-			"user":       user,
-			"password":   password,
-			"database":   database,
-			"be_host":    beHost,
-			"be_port":    bePort,
-			"tables":     tables,
-			"batch_size": batchSize,
-			"extra":      extra,
+			"source_type": sourceType,
+			"source_dsn":  sourceDsn,
+			"host":        host,
+			"port":        port,
+			"user":        user,
+			"password":    password,
+			"database":    database,
+			"be_host":     beHost,
+			"be_port":     bePort,
+			"tables":      tables,
+			"batch_size":  batchSize,
+			"extra":       extra,
 		})
 	}
 	runner.RunCmd(cmd)
diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/task_data.go
index 76f4b1b4..bb0879b8 100644
--- a/plugins/starrocks/task_data.go
+++ b/plugins/starrocks/task_data.go
@@ -17,6 +17,8 @@ limitations under the License.
 package main
 
 type StarRocksConfig struct {
+	SourceType  string `mapstructure:"source_type"`
+	SourceDsn   string `mapstructure:"source_dsn"`
 	Host        string
 	Port        int
 	User        string
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index 61d4989a..ece4131c 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -21,14 +21,17 @@ import (
 	"database/sql"
 	"encoding/json"
 	"fmt"
+	"github.com/apache/incubator-devlake/impl/dalgorm"
+	"github.com/apache/incubator-devlake/plugins/core"
+	"github.com/apache/incubator-devlake/plugins/core/dal"
+	"gorm.io/driver/mysql"
+	"gorm.io/driver/postgres"
+	"gorm.io/gorm"
 	"io"
 	"net/http"
 	"net/url"
 	"regexp"
 	"strings"
-
-	"github.com/apache/incubator-devlake/plugins/core"
-	"github.com/apache/incubator-devlake/plugins/core/dal"
 )
 
 type Table struct {
@@ -40,8 +43,28 @@ func (t *Table) TableName() string {
 }
 
 func LoadData(c core.SubTaskContext) error {
+	var db dal.Dal
 	config := c.GetData().(*StarRocksConfig)
-	db := c.GetDal()
+	if config.SourceDsn != "" && config.SourceType != "" {
+		var o *gorm.DB
+		var err error
+		if config.SourceType == "mysql" {
+			o, err = gorm.Open(mysql.Open(config.SourceDsn))
+			if err != nil {
+				return err
+			}
+		} else if config.SourceType == "postgres" {
+			o, err = gorm.Open(postgres.Open(config.SourceDsn))
+			if err != nil {
+				return err
+			}
+		} else {
+			return fmt.Errorf("unsupported source type %s", config.SourceType)
+		}
+		db = dalgorm.NewDalgorm(o)
+	} else {
+		db = c.GetDal()
+	}
 	var starrocksTables []string
 	if config.DomainLayer != "" {
 		starrocksTables = getTablesByDomainLayer(config.DomainLayer)
diff --git a/plugins/starrocks/utils.go b/plugins/starrocks/utils.go
index 0306ec3a..a68c79e5 100644
--- a/plugins/starrocks/utils.go
+++ b/plugins/starrocks/utils.go
@@ -99,6 +99,8 @@ func getDataType(dataType string) string {
 		starrocksDatatype = "double"
 	} else if stringIn(dataType, "json", "jsonb") {
 		starrocksDatatype = "json"
+	} else if dataType == "uuid" {
+		starrocksDatatype = "char(36)"
 	}
 	return starrocksDatatype
 }