You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2022/06/17 07:44:02 UTC

[incubator-devlake] branch main updated: Add starrocks plugin (#2200)

This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 21085df1 Add starrocks plugin (#2200)
21085df1 is described below

commit 21085df14469d281e7986ae2516835fc8c095c4b
Author: long2ice <lo...@gmail.com>
AuthorDate: Fri Jun 17 15:43:59 2022 +0800

    Add starrocks plugin (#2200)
    
    * feat: add starrocks plugin
    
    * feat: drop and create table instead of replace table
    
    * feat: add ignore_json_size
    
    * feat: upgrade devlake
    
    * fix: exclude _devlake in mysql
    
    * feat: add AllTables for dal
    
    * style: fix eslint
---
 .../src/components/menus/PipelineConfigsMenu.jsx   |   5 +
 config-ui/src/data/Providers.js                    |   6 +-
 .../src/data/pipeline-config-samples/starrocks.js  |  37 +++++
 config-ui/src/hooks/usePipelineValidation.jsx      |   3 +-
 .../pages/configure/connections/ConnectionForm.jsx |   8 --
 .../src/pages/configure/integration/manage.jsx     |   1 -
 config-ui/src/pages/configure/settings/jira.jsx    |   2 +-
 config-ui/src/pages/pipelines/create.jsx           |  20 +--
 impl/dalgorm/dalgorm.go                            |  23 ++++
 plugins/core/dal/dal.go                            |   2 +
 plugins/starrocks/starrocks.go                     |  81 +++++++++++
 plugins/starrocks/task_data.go                     |  27 ++++
 plugins/starrocks/tasks.go                         | 150 +++++++++++++++++++++
 13 files changed, 342 insertions(+), 23 deletions(-)

diff --git a/config-ui/src/components/menus/PipelineConfigsMenu.jsx b/config-ui/src/components/menus/PipelineConfigsMenu.jsx
index b498ce3c..ad7ff491 100644
--- a/config-ui/src/components/menus/PipelineConfigsMenu.jsx
+++ b/config-ui/src/components/menus/PipelineConfigsMenu.jsx
@@ -26,6 +26,7 @@ import { jiraConfig as sampleJiraPipelineConfig } from '@/data/pipeline-config-s
 import { jenkinsConfig as sampleJenkinsPipelineConfig } from '@/data/pipeline-config-samples/jenkins'
 import { feishuConfig as sampleFeishuPipelineConfig } from '@/data/pipeline-config-samples/feishu'
 import { dbtConfig as sampleDbtPipelineConfig } from '@/data/pipeline-config-samples/dbt'
+import { starRocksConfig as sampleStarRocksConfigPipelineConfig } from '@/data/pipeline-config-samples/starrocks'
 
 const PipelineConfigsMenu = (props) => {
   const {
@@ -81,6 +82,10 @@ const PipelineConfigsMenu = (props) => {
         icon='group-objects' text='Load DBT Configuration'
         onClick={() => setRawConfiguration(JSON.stringify(sampleDbtPipelineConfig, null, '  '))}
       />
+      <Menu.Item
+        icon='group-objects' text='Load StarRocks Configuration'
+        onClick={() => setRawConfiguration(JSON.stringify(sampleStarRocksConfigPipelineConfig, null, '  '))}
+      />
     </Menu>
   )
 }
diff --git a/config-ui/src/data/Providers.js b/config-ui/src/data/Providers.js
index b0b32ccc..3b647ed4 100644
--- a/config-ui/src/data/Providers.js
+++ b/config-ui/src/data/Providers.js
@@ -37,7 +37,8 @@ const Providers = {
   GITEXTRACTOR: 'gitextractor',
   FEISHU: 'feishu',
   AE: 'ae',
-  DBT: 'dbt'
+  DBT: 'dbt',
+  STARROCKS: 'starrocks',
 }
 
 const ProviderTypes = {
@@ -56,7 +57,8 @@ const ProviderLabels = {
   GITEXTRACTOR: 'GitExtractor',
   FEISHU: 'Feishu',
   AE: 'Analysis Engine (AE)',
-  DBT: 'Data Build Tool (DBT)'
+  DBT: 'Data Build Tool (DBT)',
+  STARROCKS: 'StarRocks',
 }
 
 const ProviderConnectionLimits = {
diff --git a/config-ui/src/data/pipeline-config-samples/starrocks.js b/config-ui/src/data/pipeline-config-samples/starrocks.js
new file mode 100644
index 00000000..74f4bcd2
--- /dev/null
+++ b/config-ui/src/data/pipeline-config-samples/starrocks.js
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+const starRocksConfig = [
+  [
+    {
+      Plugin: 'starrocks',
+      Options: {
+        host: '127.0.0.1',
+        port: 9030,
+        user: 'root',
+        password: '',
+        database: 'lake',
+        be_port: 8040,
+        tables: ['_tool_github_commits']
+      }
+    }
+  ]
+]
+
+export {
+  starRocksConfig
+}
diff --git a/config-ui/src/hooks/usePipelineValidation.jsx b/config-ui/src/hooks/usePipelineValidation.jsx
index 6f2913e7..5e709323 100644
--- a/config-ui/src/hooks/usePipelineValidation.jsx
+++ b/config-ui/src/hooks/usePipelineValidation.jsx
@@ -47,7 +47,8 @@ function usePipelineValidation ({
     Providers.GITEXTRACTOR,
     Providers.FEISHU,
     Providers.AE,
-    Providers.DBT
+    Providers.DBT,
+    Providers.STARROCKS
   ])
 
   const clear = () => {
diff --git a/config-ui/src/pages/configure/connections/ConnectionForm.jsx b/config-ui/src/pages/configure/connections/ConnectionForm.jsx
index ef4f654c..e98aaf45 100644
--- a/config-ui/src/pages/configure/connections/ConnectionForm.jsx
+++ b/config-ui/src/pages/configure/connections/ConnectionForm.jsx
@@ -26,12 +26,10 @@ import {
   Elevation,
   Popover,
   // PopoverInteractionKind,
-  Position,
   Intent,
   PopoverInteractionKind
 } from '@blueprintjs/core'
 import { Providers } from '@/data/Providers'
-import GenerateTokenForm from '@/pages/configure/connections/GenerateTokenForm'
 import FormValidationErrors from '@/components/messages/FormValidationErrors'
 import InputValidationError from '@/components/validation/InputValidationError'
 
@@ -80,7 +78,6 @@ export default function ConnectionForm (props) {
 
   // const [isValidForm, setIsValidForm] = useState(true)
   const [allowedAuthTypes, setAllowedAuthTypes] = useState(['token', 'plain'])
-  const [showTokenCreator, setShowTokenCreator] = useState(false)
   const [stateErrored, setStateErrored] = useState(false)
 
   const getConnectionStatusIcon = () => {
@@ -109,11 +106,6 @@ export default function ConnectionForm (props) {
       password
     })
   }, [name, endpointUrl, token, username, password, onValidate])
-
-  const handleTokenInteraction = (isOpen) => {
-    setShowTokenCreator(isOpen)
-  }
-
   const fieldHasError = (fieldId) => {
     return validationErrors.some(e => e.includes(fieldId))
   }
diff --git a/config-ui/src/pages/configure/integration/manage.jsx b/config-ui/src/pages/configure/integration/manage.jsx
index 6f1987ca..46b6f1ce 100644
--- a/config-ui/src/pages/configure/integration/manage.jsx
+++ b/config-ui/src/pages/configure/integration/manage.jsx
@@ -56,7 +56,6 @@ export default function ManageIntegration () {
 
   const {
     sourceLimits,
-    Providers,
     allConnections: connections,
     testedConnections,
     isFetching: isLoading,
diff --git a/config-ui/src/pages/configure/settings/jira.jsx b/config-ui/src/pages/configure/settings/jira.jsx
index cda3defc..96268ec9 100644
--- a/config-ui/src/pages/configure/settings/jira.jsx
+++ b/config-ui/src/pages/configure/settings/jira.jsx
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-import React, { useCallback, useEffect, useState } from 'react'
+import React, { useEffect, useState } from 'react'
 import {
   Button,
   ButtonGroup,
diff --git a/config-ui/src/pages/pipelines/create.jsx b/config-ui/src/pages/pipelines/create.jsx
index f9f79e96..669d2ad3 100644
--- a/config-ui/src/pages/pipelines/create.jsx
+++ b/config-ui/src/pages/pipelines/create.jsx
@@ -901,9 +901,9 @@ const CreatePipeline = (props) => {
                         >
                           <ButtonGroup
                             className='code-editor-controls' style={{
-                            borderRadius: '3px',
-                            boxShadow: '0px 0px 2px rgba(0, 0, 0, 0.30)'
-                          }}
+                              borderRadius: '3px',
+                              boxShadow: '0px 0px 2px rgba(0, 0, 0, 0.30)'
+                            }}
                           >
                             <Popover
                               className='popover-options-menu-trigger'
@@ -975,11 +975,11 @@ const CreatePipeline = (props) => {
                                           <>
                                             <div
                                               className='bp3-elevation-1' style={{
-                                              backgroundColor: '#f6f6f6',
-                                              padding: '4px 6px',
-                                              borderRadius: '3px',
-                                              marginBottom: '10px'
-                                            }}
+                                                backgroundColor: '#f6f6f6',
+                                                padding: '4px 6px',
+                                                borderRadius: '3px',
+                                                marginBottom: '10px'
+                                              }}
                                             >
                                               <Icon icon='layers' color={Colors.GRAY4} size={14} style={{ marginRight: '5px' }} />
                                               <span style={{
@@ -998,7 +998,7 @@ const CreatePipeline = (props) => {
                                           </>
                                         )}
                                       </>
-                                    )
+                                      )
                                     : (
                                       <>
                                         <Icon
@@ -1010,7 +1010,7 @@ const CreatePipeline = (props) => {
                                         </div>
                                         {validationError}
                                       </>
-                                    )}
+                                      )}
                                 </div>
                               </>
                             </Popover>
diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index 7f471982..51e72c44 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -19,6 +19,8 @@ package dalgorm
 
 import (
 	"database/sql"
+	"fmt"
+	"strings"
 
 	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"gorm.io/gorm"
@@ -148,6 +150,27 @@ func (d *Dalgorm) Delete(entity interface{}, clauses ...dal.Clause) error {
 	return buildTx(d.db, clauses).Delete(entity).Error
 }
 
+// AllTables returns all tables in the database
+func (d *Dalgorm) AllTables() ([]string, error) {
+	var tableSql string
+	if d.db.Dialector.Name() == "mysql" {
+		tableSql = fmt.Sprintf("show tables")
+	} else {
+		tableSql = "select table_name from information_schema.tables where table_schema = 'public' and table_name not like '_devlake%'"
+	}
+	var tables []string
+	err := d.db.Raw(tableSql).Scan(&tables).Error
+	if err != nil {
+		return nil, err
+	}
+	var filteredTables []string
+	for _, table := range tables {
+		if !strings.HasPrefix(table, "_devlake") {
+			filteredTables = append(filteredTables, table)
+		}
+	}
+	return filteredTables, nil
+}
 func NewDalgorm(db *gorm.DB) *Dalgorm {
 	return &Dalgorm{db}
 }
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index bf8bfaea..a8790860 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -58,6 +58,8 @@ type Dal interface {
 	CreateIfNotExist(entity interface{}, clauses ...Clause) error
 	// Delete records from database
 	Delete(entity interface{}, clauses ...Clause) error
+	// AllTables returns all tables in database
+	AllTables() ([]string, error)
 }
 
 type DalClause struct {
diff --git a/plugins/starrocks/starrocks.go b/plugins/starrocks/starrocks.go
new file mode 100644
index 00000000..c4e5bb30
--- /dev/null
+++ b/plugins/starrocks/starrocks.go
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package main
+
+import (
+	"github.com/apache/incubator-devlake/plugins/core"
+	"github.com/apache/incubator-devlake/runner"
+	"github.com/mitchellh/mapstructure"
+	"github.com/spf13/cobra"
+)
+
+type StarRocks string
+
+func (s StarRocks) SubTaskMetas() []core.SubTaskMeta {
+	return []core.SubTaskMeta{
+		LoadDataTaskMeta,
+	}
+}
+
+func (s StarRocks) PrepareTaskData(taskCtx core.TaskContext, options map[string]interface{}) (interface{}, error) {
+	var op StarRocksConfig
+	err := mapstructure.Decode(options, &op)
+	if err != nil {
+		return nil, err
+	}
+	return &op, nil
+}
+
+func (s StarRocks) Description() string {
+	return "Sync data from database to StarRocks"
+}
+
+func (s StarRocks) RootPkgPath() string {
+	return "github.com/merico-dev/lake/plugins/starrocks"
+}
+
+var PluginEntry StarRocks
+
+func main() {
+	cmd := &cobra.Command{Use: "StarRocks"}
+	_ = cmd.MarkFlagRequired("host")
+	host := cmd.Flags().StringP("host", "h", "", "StarRocks host")
+	_ = cmd.MarkFlagRequired("port")
+	port := cmd.Flags().StringP("port", "p", "", "StarRocks port")
+	_ = cmd.MarkFlagRequired("port")
+	bePort := cmd.Flags().StringP("be_port", "BP", "", "StarRocks be port")
+	_ = cmd.MarkFlagRequired("user")
+	user := cmd.Flags().StringP("user", "u", "", "StarRocks user")
+	_ = cmd.MarkFlagRequired("password")
+	password := cmd.Flags().StringP("password", "P", "", "StarRocks password")
+	_ = cmd.MarkFlagRequired("database")
+	database := cmd.Flags().StringP("database", "d", "", "StarRocks database")
+	_ = cmd.MarkFlagRequired("table")
+	tables := cmd.Flags().StringArrayP("table", "t", []string{}, "StarRocks table")
+	cmd.Run = func(cmd *cobra.Command, args []string) {
+		runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
+			"host":     host,
+			"port":     port,
+			"user":     user,
+			"password": password,
+			"database": database,
+			"be_port":  bePort,
+			"tables":   tables,
+		})
+	}
+	runner.RunCmd(cmd)
+}
diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/task_data.go
new file mode 100644
index 00000000..ceb86c00
--- /dev/null
+++ b/plugins/starrocks/task_data.go
@@ -0,0 +1,27 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package main
+
+type StarRocksConfig struct {
+	Host     string
+	Port     int
+	User     string
+	Password string
+	Database string
+	BePort   int `mapstructure:"be_port"`
+	Tables   []string
+}
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
new file mode 100644
index 00000000..476ffde9
--- /dev/null
+++ b/plugins/starrocks/tasks.go
@@ -0,0 +1,150 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package main
+
+import (
+	"bytes"
+	"database/sql"
+	"encoding/json"
+	"fmt"
+	"github.com/apache/incubator-devlake/plugins/core"
+	"github.com/apache/incubator-devlake/plugins/core/dal"
+	"io/ioutil"
+	"net/http"
+	"strings"
+)
+
+func LoadData(c core.SubTaskContext) error {
+	config := c.GetData().(*StarRocksConfig)
+	db := c.GetDal()
+	tables := config.Tables
+	var err error
+	if len(tables) == 0 {
+		tables, err = db.AllTables()
+		if err != nil {
+			return err
+		}
+	}
+	starrocks, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", config.User, config.Password, config.Host, config.Port, config.Database))
+	if err != nil {
+		return err
+	}
+
+	for _, table := range tables {
+		err = loadData(starrocks, c, table, db, config)
+		if err != nil {
+			c.GetLogger().Error("load data %s error: %s", table, err)
+		}
+	}
+	return nil
+}
+func loadData(starrocks *sql.DB, c core.SubTaskContext, table string, db dal.Dal, config *StarRocksConfig) error {
+	var data []map[string]interface{}
+	// select data from db
+	rows, err := db.Raw(fmt.Sprintf("select * from %s", table))
+	if err != nil {
+		return err
+	}
+	cols, err := rows.Columns()
+	if err != nil {
+		return err
+	}
+	for rows.Next() {
+		row := make(map[string]interface{})
+		columns := make([]string, len(cols))
+		columnPointers := make([]interface{}, len(cols))
+		for i, _ := range columns {
+			columnPointers[i] = &columns[i]
+		}
+		err = rows.Scan(columnPointers...)
+		if err != nil {
+			return err
+		}
+		for i, colName := range cols {
+			row[colName] = columns[i]
+		}
+		data = append(data, row)
+	}
+	if len(data) == 0 {
+		c.GetLogger().Warn("table %s is empty, so skip", table)
+		return nil
+	}
+	starrocksTable := strings.TrimLeft(table, "_")
+	// create tmp table in starrocks
+	_, err = starrocks.Exec(fmt.Sprintf("create table %s_tmp like %s", starrocksTable, starrocksTable))
+	if err != nil {
+		return err
+	}
+	// insert data to tmp table
+	url := fmt.Sprintf("http://%s:%d/api/%s/%s_tmp/_stream_load", config.Host, config.BePort, config.Database, starrocksTable)
+	headers := map[string]string{
+		"format":            "json",
+		"strip_outer_array": "true",
+		"Expect":            "100-continue",
+		"ignore_json_size":  "true",
+	}
+	// marshal User to json
+	jsonData, err := json.Marshal(data)
+	if err != nil {
+		panic(err)
+	}
+	client := http.Client{}
+	req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(jsonData))
+	if err != nil {
+		panic(err)
+	}
+	req.SetBasicAuth(config.User, config.Password)
+	for k, v := range headers {
+		req.Header.Set(k, v)
+	}
+	resp, err := client.Do(req)
+	if err != nil {
+		return err
+	}
+	b, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return err
+	}
+	var result map[string]interface{}
+	err = json.Unmarshal(b, &result)
+	if err != nil {
+		return err
+	}
+	if resp.StatusCode != http.StatusOK {
+		c.GetLogger().Error("%s %s", resp.StatusCode, b)
+	}
+	if result["Status"] != "Success" {
+		c.GetLogger().Error("load %s failed: %s", table, b)
+	} else {
+		// drop old table and rename tmp table to old table
+		_, err = starrocks.Exec(fmt.Sprintf("drop table if exists %s;alter table %s_tmp rename %s", starrocksTable, starrocksTable, starrocksTable))
+		if err != nil {
+			return err
+		}
+		c.GetLogger().Info("load %s to starrocks success", table)
+	}
+	return err
+}
+
+var (
+	LoadDataTaskMeta = core.SubTaskMeta{
+		Name:             "LoadData",
+		EntryPoint:       LoadData,
+		EnabledByDefault: true,
+		Description:      "Load data to StarRocks",
+	}
+)