You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2022/06/17 07:44:02 UTC
[incubator-devlake] branch main updated: Add starrocks plugin (#2200)
This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 21085df1 Add starrocks plugin (#2200)
21085df1 is described below
commit 21085df14469d281e7986ae2516835fc8c095c4b
Author: long2ice <lo...@gmail.com>
AuthorDate: Fri Jun 17 15:43:59 2022 +0800
Add starrocks plugin (#2200)
* feat: add starrocks plugin
* feat: drop and create table instead of replace table
* feat: add ignore_json_size
* feat: upgrade devlake
* fix: exclude _devlake in mysql
* feat: add AllTables for dal
* style: fix eslint
---
.../src/components/menus/PipelineConfigsMenu.jsx | 5 +
config-ui/src/data/Providers.js | 6 +-
.../src/data/pipeline-config-samples/starrocks.js | 37 +++++
config-ui/src/hooks/usePipelineValidation.jsx | 3 +-
.../pages/configure/connections/ConnectionForm.jsx | 8 --
.../src/pages/configure/integration/manage.jsx | 1 -
config-ui/src/pages/configure/settings/jira.jsx | 2 +-
config-ui/src/pages/pipelines/create.jsx | 20 +--
impl/dalgorm/dalgorm.go | 23 ++++
plugins/core/dal/dal.go | 2 +
plugins/starrocks/starrocks.go | 81 +++++++++++
plugins/starrocks/task_data.go | 27 ++++
plugins/starrocks/tasks.go | 150 +++++++++++++++++++++
13 files changed, 342 insertions(+), 23 deletions(-)
diff --git a/config-ui/src/components/menus/PipelineConfigsMenu.jsx b/config-ui/src/components/menus/PipelineConfigsMenu.jsx
index b498ce3c..ad7ff491 100644
--- a/config-ui/src/components/menus/PipelineConfigsMenu.jsx
+++ b/config-ui/src/components/menus/PipelineConfigsMenu.jsx
@@ -26,6 +26,7 @@ import { jiraConfig as sampleJiraPipelineConfig } from '@/data/pipeline-config-s
import { jenkinsConfig as sampleJenkinsPipelineConfig } from '@/data/pipeline-config-samples/jenkins'
import { feishuConfig as sampleFeishuPipelineConfig } from '@/data/pipeline-config-samples/feishu'
import { dbtConfig as sampleDbtPipelineConfig } from '@/data/pipeline-config-samples/dbt'
+import { starRocksConfig as sampleStarRocksConfigPipelineConfig } from '@/data/pipeline-config-samples/starrocks'
const PipelineConfigsMenu = (props) => {
const {
@@ -81,6 +82,10 @@ const PipelineConfigsMenu = (props) => {
icon='group-objects' text='Load DBT Configuration'
onClick={() => setRawConfiguration(JSON.stringify(sampleDbtPipelineConfig, null, ' '))}
/>
+ <Menu.Item
+ icon='group-objects' text='Load StarRocks Configuration'
+ onClick={() => setRawConfiguration(JSON.stringify(sampleStarRocksConfigPipelineConfig, null, ' '))}
+ />
</Menu>
)
}
diff --git a/config-ui/src/data/Providers.js b/config-ui/src/data/Providers.js
index b0b32ccc..3b647ed4 100644
--- a/config-ui/src/data/Providers.js
+++ b/config-ui/src/data/Providers.js
@@ -37,7 +37,8 @@ const Providers = {
GITEXTRACTOR: 'gitextractor',
FEISHU: 'feishu',
AE: 'ae',
- DBT: 'dbt'
+ DBT: 'dbt',
+ STARROCKS: 'starrocks',
}
const ProviderTypes = {
@@ -56,7 +57,8 @@ const ProviderLabels = {
GITEXTRACTOR: 'GitExtractor',
FEISHU: 'Feishu',
AE: 'Analysis Engine (AE)',
- DBT: 'Data Build Tool (DBT)'
+ DBT: 'Data Build Tool (DBT)',
+ STARROCKS: 'StarRocks',
}
const ProviderConnectionLimits = {
diff --git a/config-ui/src/data/pipeline-config-samples/starrocks.js b/config-ui/src/data/pipeline-config-samples/starrocks.js
new file mode 100644
index 00000000..74f4bcd2
--- /dev/null
+++ b/config-ui/src/data/pipeline-config-samples/starrocks.js
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+const starRocksConfig = [
+ [
+ {
+ Plugin: 'starrocks',
+ Options: {
+ host: '127.0.0.1',
+ port: 9030,
+ user: 'root',
+ password: '',
+ database: 'lake',
+ be_port: 8040,
+ tables: ['_tool_github_commits']
+ }
+ }
+ ]
+]
+
+export {
+ starRocksConfig
+}
diff --git a/config-ui/src/hooks/usePipelineValidation.jsx b/config-ui/src/hooks/usePipelineValidation.jsx
index 6f2913e7..5e709323 100644
--- a/config-ui/src/hooks/usePipelineValidation.jsx
+++ b/config-ui/src/hooks/usePipelineValidation.jsx
@@ -47,7 +47,8 @@ function usePipelineValidation ({
Providers.GITEXTRACTOR,
Providers.FEISHU,
Providers.AE,
- Providers.DBT
+ Providers.DBT,
+ Providers.STARROCKS
])
const clear = () => {
diff --git a/config-ui/src/pages/configure/connections/ConnectionForm.jsx b/config-ui/src/pages/configure/connections/ConnectionForm.jsx
index ef4f654c..e98aaf45 100644
--- a/config-ui/src/pages/configure/connections/ConnectionForm.jsx
+++ b/config-ui/src/pages/configure/connections/ConnectionForm.jsx
@@ -26,12 +26,10 @@ import {
Elevation,
Popover,
// PopoverInteractionKind,
- Position,
Intent,
PopoverInteractionKind
} from '@blueprintjs/core'
import { Providers } from '@/data/Providers'
-import GenerateTokenForm from '@/pages/configure/connections/GenerateTokenForm'
import FormValidationErrors from '@/components/messages/FormValidationErrors'
import InputValidationError from '@/components/validation/InputValidationError'
@@ -80,7 +78,6 @@ export default function ConnectionForm (props) {
// const [isValidForm, setIsValidForm] = useState(true)
const [allowedAuthTypes, setAllowedAuthTypes] = useState(['token', 'plain'])
- const [showTokenCreator, setShowTokenCreator] = useState(false)
const [stateErrored, setStateErrored] = useState(false)
const getConnectionStatusIcon = () => {
@@ -109,11 +106,6 @@ export default function ConnectionForm (props) {
password
})
}, [name, endpointUrl, token, username, password, onValidate])
-
- const handleTokenInteraction = (isOpen) => {
- setShowTokenCreator(isOpen)
- }
-
const fieldHasError = (fieldId) => {
return validationErrors.some(e => e.includes(fieldId))
}
diff --git a/config-ui/src/pages/configure/integration/manage.jsx b/config-ui/src/pages/configure/integration/manage.jsx
index 6f1987ca..46b6f1ce 100644
--- a/config-ui/src/pages/configure/integration/manage.jsx
+++ b/config-ui/src/pages/configure/integration/manage.jsx
@@ -56,7 +56,6 @@ export default function ManageIntegration () {
const {
sourceLimits,
- Providers,
allConnections: connections,
testedConnections,
isFetching: isLoading,
diff --git a/config-ui/src/pages/configure/settings/jira.jsx b/config-ui/src/pages/configure/settings/jira.jsx
index cda3defc..96268ec9 100644
--- a/config-ui/src/pages/configure/settings/jira.jsx
+++ b/config-ui/src/pages/configure/settings/jira.jsx
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-import React, { useCallback, useEffect, useState } from 'react'
+import React, { useEffect, useState } from 'react'
import {
Button,
ButtonGroup,
diff --git a/config-ui/src/pages/pipelines/create.jsx b/config-ui/src/pages/pipelines/create.jsx
index f9f79e96..669d2ad3 100644
--- a/config-ui/src/pages/pipelines/create.jsx
+++ b/config-ui/src/pages/pipelines/create.jsx
@@ -901,9 +901,9 @@ const CreatePipeline = (props) => {
>
<ButtonGroup
className='code-editor-controls' style={{
- borderRadius: '3px',
- boxShadow: '0px 0px 2px rgba(0, 0, 0, 0.30)'
- }}
+ borderRadius: '3px',
+ boxShadow: '0px 0px 2px rgba(0, 0, 0, 0.30)'
+ }}
>
<Popover
className='popover-options-menu-trigger'
@@ -975,11 +975,11 @@ const CreatePipeline = (props) => {
<>
<div
className='bp3-elevation-1' style={{
- backgroundColor: '#f6f6f6',
- padding: '4px 6px',
- borderRadius: '3px',
- marginBottom: '10px'
- }}
+ backgroundColor: '#f6f6f6',
+ padding: '4px 6px',
+ borderRadius: '3px',
+ marginBottom: '10px'
+ }}
>
<Icon icon='layers' color={Colors.GRAY4} size={14} style={{ marginRight: '5px' }} />
<span style={{
@@ -998,7 +998,7 @@ const CreatePipeline = (props) => {
</>
)}
</>
- )
+ )
: (
<>
<Icon
@@ -1010,7 +1010,7 @@ const CreatePipeline = (props) => {
</div>
{validationError}
</>
- )}
+ )}
</div>
</>
</Popover>
diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index 7f471982..51e72c44 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -19,6 +19,8 @@ package dalgorm
import (
"database/sql"
+ "fmt"
+ "strings"
"github.com/apache/incubator-devlake/plugins/core/dal"
"gorm.io/gorm"
@@ -148,6 +150,27 @@ func (d *Dalgorm) Delete(entity interface{}, clauses ...dal.Clause) error {
return buildTx(d.db, clauses).Delete(entity).Error
}
+// AllTables returns all tables in the database
+func (d *Dalgorm) AllTables() ([]string, error) {
+ var tableSql string
+ if d.db.Dialector.Name() == "mysql" {
+ tableSql = fmt.Sprintf("show tables")
+ } else {
+ tableSql = "select table_name from information_schema.tables where table_schema = 'public' and table_name not like '_devlake%'"
+ }
+ var tables []string
+ err := d.db.Raw(tableSql).Scan(&tables).Error
+ if err != nil {
+ return nil, err
+ }
+ var filteredTables []string
+ for _, table := range tables {
+ if !strings.HasPrefix(table, "_devlake") {
+ filteredTables = append(filteredTables, table)
+ }
+ }
+ return filteredTables, nil
+}
func NewDalgorm(db *gorm.DB) *Dalgorm {
return &Dalgorm{db}
}
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index bf8bfaea..a8790860 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -58,6 +58,8 @@ type Dal interface {
CreateIfNotExist(entity interface{}, clauses ...Clause) error
// Delete records from database
Delete(entity interface{}, clauses ...Clause) error
+ // AllTables returns all tables in database
+ AllTables() ([]string, error)
}
type DalClause struct {
diff --git a/plugins/starrocks/starrocks.go b/plugins/starrocks/starrocks.go
new file mode 100644
index 00000000..c4e5bb30
--- /dev/null
+++ b/plugins/starrocks/starrocks.go
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package main
+
+import (
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/runner"
+ "github.com/mitchellh/mapstructure"
+ "github.com/spf13/cobra"
+)
+
+type StarRocks string
+
+func (s StarRocks) SubTaskMetas() []core.SubTaskMeta {
+ return []core.SubTaskMeta{
+ LoadDataTaskMeta,
+ }
+}
+
+func (s StarRocks) PrepareTaskData(taskCtx core.TaskContext, options map[string]interface{}) (interface{}, error) {
+ var op StarRocksConfig
+ err := mapstructure.Decode(options, &op)
+ if err != nil {
+ return nil, err
+ }
+ return &op, nil
+}
+
+func (s StarRocks) Description() string {
+ return "Sync data from database to StarRocks"
+}
+
+func (s StarRocks) RootPkgPath() string {
+ return "github.com/merico-dev/lake/plugins/starrocks"
+}
+
+var PluginEntry StarRocks
+
+func main() {
+ cmd := &cobra.Command{Use: "StarRocks"}
+ _ = cmd.MarkFlagRequired("host")
+ host := cmd.Flags().StringP("host", "h", "", "StarRocks host")
+ _ = cmd.MarkFlagRequired("port")
+ port := cmd.Flags().StringP("port", "p", "", "StarRocks port")
+ _ = cmd.MarkFlagRequired("port")
+ bePort := cmd.Flags().StringP("be_port", "BP", "", "StarRocks be port")
+ _ = cmd.MarkFlagRequired("user")
+ user := cmd.Flags().StringP("user", "u", "", "StarRocks user")
+ _ = cmd.MarkFlagRequired("password")
+ password := cmd.Flags().StringP("password", "P", "", "StarRocks password")
+ _ = cmd.MarkFlagRequired("database")
+ database := cmd.Flags().StringP("database", "d", "", "StarRocks database")
+ _ = cmd.MarkFlagRequired("table")
+ tables := cmd.Flags().StringArrayP("table", "t", []string{}, "StarRocks table")
+ cmd.Run = func(cmd *cobra.Command, args []string) {
+ runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
+ "host": host,
+ "port": port,
+ "user": user,
+ "password": password,
+ "database": database,
+ "be_port": bePort,
+ "tables": tables,
+ })
+ }
+ runner.RunCmd(cmd)
+}
diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/task_data.go
new file mode 100644
index 00000000..ceb86c00
--- /dev/null
+++ b/plugins/starrocks/task_data.go
@@ -0,0 +1,27 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package main
+
+type StarRocksConfig struct {
+ Host string
+ Port int
+ User string
+ Password string
+ Database string
+ BePort int `mapstructure:"be_port"`
+ Tables []string
+}
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
new file mode 100644
index 00000000..476ffde9
--- /dev/null
+++ b/plugins/starrocks/tasks.go
@@ -0,0 +1,150 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package main
+
+import (
+ "bytes"
+ "database/sql"
+ "encoding/json"
+ "fmt"
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
+ "io/ioutil"
+ "net/http"
+ "strings"
+)
+
+func LoadData(c core.SubTaskContext) error {
+ config := c.GetData().(*StarRocksConfig)
+ db := c.GetDal()
+ tables := config.Tables
+ var err error
+ if len(tables) == 0 {
+ tables, err = db.AllTables()
+ if err != nil {
+ return err
+ }
+ }
+ starrocks, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", config.User, config.Password, config.Host, config.Port, config.Database))
+ if err != nil {
+ return err
+ }
+
+ for _, table := range tables {
+ err = loadData(starrocks, c, table, db, config)
+ if err != nil {
+ c.GetLogger().Error("load data %s error: %s", table, err)
+ }
+ }
+ return nil
+}
+func loadData(starrocks *sql.DB, c core.SubTaskContext, table string, db dal.Dal, config *StarRocksConfig) error {
+ var data []map[string]interface{}
+ // select data from db
+ rows, err := db.Raw(fmt.Sprintf("select * from %s", table))
+ if err != nil {
+ return err
+ }
+ cols, err := rows.Columns()
+ if err != nil {
+ return err
+ }
+ for rows.Next() {
+ row := make(map[string]interface{})
+ columns := make([]string, len(cols))
+ columnPointers := make([]interface{}, len(cols))
+ for i, _ := range columns {
+ columnPointers[i] = &columns[i]
+ }
+ err = rows.Scan(columnPointers...)
+ if err != nil {
+ return err
+ }
+ for i, colName := range cols {
+ row[colName] = columns[i]
+ }
+ data = append(data, row)
+ }
+ if len(data) == 0 {
+ c.GetLogger().Warn("table %s is empty, so skip", table)
+ return nil
+ }
+ starrocksTable := strings.TrimLeft(table, "_")
+ // create tmp table in starrocks
+ _, err = starrocks.Exec(fmt.Sprintf("create table %s_tmp like %s", starrocksTable, starrocksTable))
+ if err != nil {
+ return err
+ }
+ // insert data to tmp table
+ url := fmt.Sprintf("http://%s:%d/api/%s/%s_tmp/_stream_load", config.Host, config.BePort, config.Database, starrocksTable)
+ headers := map[string]string{
+ "format": "json",
+ "strip_outer_array": "true",
+ "Expect": "100-continue",
+ "ignore_json_size": "true",
+ }
+ // marshal User to json
+ jsonData, err := json.Marshal(data)
+ if err != nil {
+ panic(err)
+ }
+ client := http.Client{}
+ req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(jsonData))
+ if err != nil {
+ panic(err)
+ }
+ req.SetBasicAuth(config.User, config.Password)
+ for k, v := range headers {
+ req.Header.Set(k, v)
+ }
+ resp, err := client.Do(req)
+ if err != nil {
+ return err
+ }
+ b, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ var result map[string]interface{}
+ err = json.Unmarshal(b, &result)
+ if err != nil {
+ return err
+ }
+ if resp.StatusCode != http.StatusOK {
+ c.GetLogger().Error("%s %s", resp.StatusCode, b)
+ }
+ if result["Status"] != "Success" {
+ c.GetLogger().Error("load %s failed: %s", table, b)
+ } else {
+ // drop old table and rename tmp table to old table
+ _, err = starrocks.Exec(fmt.Sprintf("drop table if exists %s;alter table %s_tmp rename %s", starrocksTable, starrocksTable, starrocksTable))
+ if err != nil {
+ return err
+ }
+ c.GetLogger().Info("load %s to starrocks success", table)
+ }
+ return err
+}
+
+var (
+ LoadDataTaskMeta = core.SubTaskMeta{
+ Name: "LoadData",
+ EntryPoint: LoadData,
+ EnabledByDefault: true,
+ Description: "Load data to StarRocks",
+ }
+)