You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2023/02/23 12:06:05 UTC
[incubator-devlake] branch main updated: refactor: refactor starrocks, improve performance, solve bugs (#4475)
This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 511ed3b19 refactor: refactor starrocks, improve performance, solve bugs (#4475)
511ed3b19 is described below
commit 511ed3b19e1b9f11a251d73fac5e1b3aeb003019
Author: abeizn <zi...@merico.dev>
AuthorDate: Thu Feb 23 20:05:59 2023 +0800
refactor: refactor starrocks, improve performance, solve bugs (#4475)
* refactor: refactor starrocks, improve performance, solve bugs
---
backend/plugins/starrocks/impl/impl.go | 2 +-
backend/plugins/starrocks/tasks/tasks.go | 552 ++++++++++++++++---------------
2 files changed, 283 insertions(+), 271 deletions(-)
diff --git a/backend/plugins/starrocks/impl/impl.go b/backend/plugins/starrocks/impl/impl.go
index 00eed64a1..129c989ed 100644
--- a/backend/plugins/starrocks/impl/impl.go
+++ b/backend/plugins/starrocks/impl/impl.go
@@ -34,7 +34,7 @@ var _ plugin.PluginModel = (*StarRocks)(nil)
func (s StarRocks) SubTaskMetas() []plugin.SubTaskMeta {
return []plugin.SubTaskMeta{
- tasks.LoadDataTaskMeta,
+ tasks.ExportDataTaskMeta,
}
}
diff --git a/backend/plugins/starrocks/tasks/tasks.go b/backend/plugins/starrocks/tasks/tasks.go
index 1b504e1c9..648271631 100644
--- a/backend/plugins/starrocks/tasks/tasks.go
+++ b/backend/plugins/starrocks/tasks/tasks.go
@@ -19,14 +19,8 @@ package tasks
import (
"bytes"
- "database/sql"
"encoding/json"
"fmt"
- "github.com/apache/incubator-devlake/core/dal"
- "github.com/apache/incubator-devlake/core/errors"
- "github.com/apache/incubator-devlake/core/plugin"
- "github.com/apache/incubator-devlake/impls/dalgorm"
- "github.com/apache/incubator-devlake/plugins/starrocks/utils"
"io"
"net/http"
"net/url"
@@ -34,10 +28,17 @@ import (
"strings"
"time"
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/impls/dalgorm"
+ "github.com/apache/incubator-devlake/plugins/starrocks/utils"
+
"github.com/lib/pq"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
"gorm.io/gorm"
+ "gorm.io/gorm/clause"
)
type Table struct {
@@ -48,107 +49,61 @@ func (t *Table) TableName() string {
return t.name
}
-func LoadData(c plugin.SubTaskContext) errors.Error {
- var db dal.Dal
+type DataConfigParams struct {
+ Ctx plugin.SubTaskContext
+ Config *StarRocksConfig
+ SrcDb dal.Dal
+ DestDb dal.Dal
+ SrcTableName string
+ DestTableName string
+}
+
+func ExportData(c plugin.SubTaskContext) errors.Error {
+ logger := c.GetLogger()
config := c.GetData().(*StarRocksConfig)
- if config.SourceDsn != "" && config.SourceType != "" {
- var o *gorm.DB
- var err error
- if config.SourceType == "mysql" {
- o, err = gorm.Open(mysql.Open(config.SourceDsn))
- if err != nil {
- return errors.Convert(err)
- }
- } else if config.SourceType == "postgres" {
- o, err = gorm.Open(postgres.Open(config.SourceDsn))
- if err != nil {
- return errors.Convert(err)
- }
- } else {
- return errors.NotFound.New(fmt.Sprintf("unsupported source type %s", config.SourceType))
- }
- db = dalgorm.NewDalgorm(o)
- sqlDB, err := o.DB()
- if err != nil {
- return errors.Convert(err)
- }
- defer sqlDB.Close()
- } else {
- db = c.GetDal()
+
+ // 1. Get db instance
+ db, err := getDbInstance(c)
+ if err != nil {
+ return errors.Convert(err)
}
- var starrocksTables []string
- if config.DomainLayer != "" {
- starrocksTables = utils.GetTablesByDomainLayer(config.DomainLayer)
- if starrocksTables == nil {
- return errors.NotFound.New(fmt.Sprintf("no table found by domain layer: %s", config.DomainLayer))
- }
- } else {
- tables := config.Tables
- allTables, err := db.AllTables()
- if err != nil {
- return err
- }
- if len(tables) == 0 {
- starrocksTables = allTables
- } else {
- for _, table := range allTables {
- for _, r := range tables {
- var ok bool
- ok, err = errors.Convert01(regexp.Match(r, []byte(table)))
- if err != nil {
- return err
- }
- if ok {
- starrocksTables = append(starrocksTables, table)
- }
- }
- }
- }
+ // 2. Filter out the tables to export
+ starrocksTables, err := getExportingTables(c, db)
+ if err != nil {
+ return errors.Convert(err)
}
-
- starrocks, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", config.User, config.Password, config.Host, config.Port, config.Database))
+ // 3. copy devlake data to starrocks
+ sr, err := gorm.Open(mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", config.User, config.Password, config.Host, config.Port, config.Database)))
if err != nil {
return errors.Convert(err)
}
- defer starrocks.Close()
+ starrocksDb := dalgorm.NewDalgorm(sr)
for _, table := range starrocksTables {
- starrocksTable := strings.TrimLeft(table, "_")
- starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
- var columnMap map[string]string
- var orderBy string
- var skip bool
- columnMap, orderBy, skip, err = createTmpTable(starrocks, db, starrocksTable, starrocksTmpTable, table, c, config)
+ select {
+ case <-c.GetContext().Done():
+ return errors.Convert(c.GetContext().Err())
+ default:
+ }
+
+ dc := DataConfigParams{
+ Ctx: c,
+ Config: config,
+ SrcDb: db,
+ DestDb: starrocksDb,
+ SrcTableName: table,
+ DestTableName: strings.TrimLeft(table, "_"),
+ }
+ columnMap, orderBy, skip, err := createTmpTableInStarrocks(&dc)
if skip {
- c.GetLogger().Info(fmt.Sprintf("table %s is up to date, so skip it", table))
+ logger.Info(fmt.Sprintf("table %s is up to date, so skip it", table))
continue
}
if err != nil {
- c.GetLogger().Error(err, "create table %s in starrocks error", table)
+ logger.Error(err, "create table %s in starrocks error", table)
return errors.Convert(err)
}
- if db.Dialect() == "postgres" {
- err = db.Exec("begin transaction isolation level repeatable read")
- if err != nil {
- return errors.Convert(err)
- }
- } else if db.Dialect() == "mysql" {
- err = db.Exec("set session transaction isolation level repeatable read")
- if err != nil {
- return errors.Convert(err)
- }
- err = errors.Convert(db.Exec("start transaction"))
- if err != nil {
- return errors.Convert(err)
- }
- } else {
- return errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
- }
- err = errors.Convert(loadData(starrocks, c, starrocksTable, starrocksTmpTable, table, columnMap, db, config, orderBy))
- if err != nil {
- return errors.Convert(err)
- }
- err = errors.Convert(db.Exec("commit"))
+ err = copyDataToDst(&dc, columnMap, orderBy)
if err != nil {
return errors.Convert(err)
}
@@ -156,26 +111,33 @@ func LoadData(c plugin.SubTaskContext) errors.Error {
return nil
}
-func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, starrocksTmpTable string, table string, c plugin.SubTaskContext, config *StarRocksConfig) (map[string]string, string, bool, errors.Error) {
+// create temp table for dealing with some complex logic
+func createTmpTableInStarrocks(dc *DataConfigParams) (map[string]string, string, bool, error) {
+ logger := dc.Ctx.GetLogger()
+ config := dc.Config
+ db := dc.SrcDb
+ starrocksDb := dc.DestDb
+ table := dc.SrcTableName
+ starrocksTable := dc.DestTableName
+ starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
+
columnMetas, err := db.GetColumns(&Table{name: table}, nil)
updateColumn := config.UpdateColumn
columnMap := make(map[string]string)
if err != nil {
if strings.Contains(err.Error(), "cached plan must not change result type") {
- c.GetLogger().Warn(err, "skip err: cached plan must not change result type")
+ logger.Warn(err, "skip err: cached plan must not change result type")
columnMetas, err = db.GetColumns(&Table{name: table}, nil)
if err != nil {
- return nil, "", false, errors.Convert(err)
+ return nil, "", false, err
}
} else {
- return nil, "", false, errors.Convert(err)
+ return nil, "", false, err
}
}
- var pks []string
- var orders []string
- var columns []string
- var separator string
+ var pks, orders, columns []string
+ var separator, firstcm, firstcmName string
if db.Dialect() == "postgres" {
separator = "\""
} else if db.Dialect() == "mysql" {
@@ -183,57 +145,29 @@ func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, starro
} else {
return nil, "", false, errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
}
- firstcm := ""
- firstcmName := ""
- var rowsInStarRocks *sql.Rows
- var rowsInPostgres dal.Rows
- defer func() {
- if rowsInStarRocks != nil {
- rowsInStarRocks.Close()
- }
- if rowsInPostgres != nil {
- rowsInPostgres.Close()
- }
- }()
for _, cm := range columnMetas {
name := cm.Name()
if name == updateColumn {
// check update column to detect skip or not
- rowsInPostgres, err = db.Cursor(
- dal.From(table),
- dal.Select(updateColumn),
- dal.Limit(1),
- dal.Orderby(fmt.Sprintf("%s desc", updateColumn)),
- )
+ var updatedFrom time.Time
+ err = db.All(&updatedFrom, dal.Select(updateColumn), dal.From(table), dal.Limit(1), dal.Orderby(fmt.Sprintf("%s desc", updateColumn)))
if err != nil {
return nil, "", false, err
}
- var updatedFrom time.Time
- if rowsInPostgres.Next() {
- err = errors.Convert(rowsInPostgres.Scan(&updatedFrom))
- if err != nil {
+
+ var updatedTo time.Time
+ err = starrocksDb.All(&updatedTo, dal.Select(updateColumn), dal.From(starrocksTable), dal.Limit(1), dal.Orderby(fmt.Sprintf("%s desc", updateColumn)))
+ if err != nil {
+ if !strings.Contains(err.Error(), "Unknown table") {
return nil, "", false, err
}
- }
- var starrocksErr error
- rowsInStarRocks, starrocksErr = starrocks.Query(fmt.Sprintf("select %s from %s order by %s desc limit 1", updateColumn, starrocksTable, updateColumn))
- if starrocksErr != nil {
- if !strings.Contains(starrocksErr.Error(), "Unknown table") {
- return nil, "", false, errors.Convert(starrocksErr)
- }
} else {
- var updatedTo time.Time
- if rowsInStarRocks.Next() {
- err = errors.Convert(rowsInStarRocks.Scan(&updatedTo))
- if err != nil {
- return nil, "", false, err
- }
- }
if updatedFrom.Equal(updatedTo) {
return nil, "", true, nil
}
}
}
+
columnDatatype, ok := cm.ColumnType()
if !ok {
return columnMap, "", false, errors.Default.New(fmt.Sprintf("Get [%s] ColumeType Failed", name))
@@ -271,85 +205,156 @@ func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, starro
extra = v
}
}
- tableSql := fmt.Sprintf("drop table if exists %s; create table if not exists `%s` ( %s ) %s", starrocksTmpTable, starrocksTmpTable, strings.Join(columns, ","), extra)
- c.GetLogger().Debug(tableSql)
- _, err = errors.Convert01(starrocks.Exec(tableSql))
+ tableSql := fmt.Sprintf("DROP TABLE IF EXISTS %s; CREATE TABLE IF NOT EXISTS `%s` ( %s ) %s", starrocksTmpTable, starrocksTmpTable, strings.Join(columns, ","), extra)
+ logger.Debug(tableSql)
+ err = starrocksDb.Exec(tableSql)
return columnMap, orderBy, false, err
}
-func loadData(starrocks *sql.DB, c plugin.SubTaskContext, starrocksTable, starrocksTmpTable, table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig, orderBy string) error {
- offset := 0
+// put data to final dst database
+func copyDataToDst(dc *DataConfigParams, columnMap map[string]string, orderBy string) error {
+ c := dc.Ctx
+ logger := dc.Ctx.GetLogger()
+ config := dc.Config
+ db := dc.SrcDb
+ starrocksDb := dc.DestDb
+ table := dc.SrcTableName
+ starrocksTable := dc.DestTableName
+ starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
+
+ var offset int
var err error
- for {
- var data []map[string]interface{}
- // select data from db
- err = func() error {
- var rows dal.Rows
- rows, err = db.Cursor(
- dal.From(table),
- dal.Orderby(orderBy),
- dal.Limit(config.BatchSize),
- dal.Offset(offset),
- )
- if err != nil {
- return err
- }
- defer rows.Close()
- cols, err := rows.Columns()
- if err != nil {
- return err
- }
- for rows.Next() {
- row := make(map[string]interface{})
- columns := make([]interface{}, len(cols))
- columnPointers := make([]interface{}, len(cols))
- for i := range columns {
- dataType := columnMap[cols[i]]
- if strings.HasPrefix(dataType, "array") {
- var arr []string
- columns[i] = &arr
- columnPointers[i] = pq.Array(&arr)
- } else {
- columnPointers[i] = &columns[i]
- }
- }
- err = rows.Scan(columnPointers...)
- if err != nil {
- return err
- }
- for i, colName := range cols {
- row[colName] = columns[i]
- }
- data = append(data, row)
+ var rows dal.Rows
+ rows, err = db.Cursor(
+ dal.From(table),
+ dal.Orderby(orderBy),
+ )
+ if err != nil {
+ return err
+ }
+ defer rows.Close()
+
+ var data []map[string]interface{}
+ cols, err := (rows).Columns()
+ if err != nil {
+ return err
+ }
+
+ var batchCount int
+ for rows.Next() {
+ select {
+ case <-c.GetContext().Done():
+ return c.GetContext().Err()
+ default:
+ }
+ row := make(map[string]interface{})
+ columns := make([]interface{}, len(cols))
+ columnPointers := make([]interface{}, len(cols))
+ for i := range columns {
+ dataType := columnMap[cols[i]]
+ if strings.HasPrefix(dataType, "array") {
+ var arr []string
+ columns[i] = &arr
+ columnPointers[i] = pq.Array(&arr)
+ } else {
+ columnPointers[i] = &columns[i]
}
- return nil
- }()
+ }
+ err = rows.Scan(columnPointers...)
if err != nil {
return err
}
- if len(data) == 0 {
- c.GetLogger().Warn(nil, "no data found in table %s already, limit: %d, offset: %d, so break", table, config.BatchSize, offset)
- break
+ for i, colName := range cols {
+ row[colName] = columns[i]
}
- // insert data to tmp table
- loadURL := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", config.BeHost, config.BePort, config.Database, starrocksTmpTable)
- headers := map[string]string{
- "format": "json",
- "strip_outer_array": "true",
- "Expect": "100-continue",
- "ignore_json_size": "true",
- "Connection": "close",
+ data = append(data, row)
+ batchCount += 1
+ if batchCount == config.BatchSize {
+ err = putBatchData(c, starrocksTmpTable, table, data, config, offset)
+ if err != nil {
+ return err
+ }
+ batchCount = 0
+ data = nil
}
- jsonData, err := json.Marshal(data)
+ }
+ if batchCount != 0 {
+ err = putBatchData(c, starrocksTmpTable, table, data, config, offset)
if err != nil {
return err
}
- client := http.Client{
- CheckRedirect: func(req *http.Request, via []*http.Request) error {
- return http.ErrUseLastResponse
- },
+ }
+
+ // drop old table
+ err = starrocksDb.Exec("DROP TABLE IF EXISTS ?", clause.Table{Name: starrocksTable})
+ if err != nil {
+ return err
+ }
+ // rename tmp table to old table
+ err = starrocksDb.Exec("ALTER TABLE ? RENAME ?", clause.Table{Name: starrocksTmpTable}, clause.Table{Name: starrocksTable})
+ if err != nil {
+ return err
+ }
+
+ // check data count
+ sourceCount, err := db.Count(dal.From(table))
+ if err != nil {
+ return err
+ }
+ starrocksCount, err := starrocksDb.Count(dal.From(starrocksTable))
+ if err != nil {
+ return err
+ }
+ if sourceCount != starrocksCount {
+ logger.Warn(nil, "source count %d not equal to starrocks count %d", sourceCount, starrocksCount)
+ }
+ logger.Info("load %s to starrocks success", table)
+ return nil
+}
+
+// put batch size data to database
+func putBatchData(c plugin.SubTaskContext, starrocksTmpTable, table string, data []map[string]interface{}, config *StarRocksConfig, offset int) error {
+ logger := c.GetLogger()
+ // insert data to tmp table
+ loadURL := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", config.BeHost, config.BePort, config.Database, starrocksTmpTable)
+ headers := map[string]string{
+ "format": "json",
+ "strip_outer_array": "true",
+ "Expect": "100-continue",
+ "ignore_json_size": "true",
+ "Connection": "close",
+ }
+ jsonData, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
+ client := http.Client{
+ CheckRedirect: func(req *http.Request, via []*http.Request) error {
+ return http.ErrUseLastResponse
+ },
+ }
+ req, err := http.NewRequest(http.MethodPut, loadURL, bytes.NewBuffer(jsonData))
+ if err != nil {
+ return err
+ }
+ req.SetBasicAuth(config.User, config.Password)
+ for k, v := range headers {
+ req.Header.Set(k, v)
+ }
+ resp, err := client.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ var b []byte
+
+ if resp.StatusCode == 307 {
+ var location *url.URL
+ location, err = resp.Location()
+ if err != nil {
+ return err
}
- req, err := http.NewRequest(http.MethodPut, loadURL, bytes.NewBuffer(jsonData))
+ req, err = http.NewRequest(http.MethodPut, location.String(), bytes.NewBuffer(jsonData))
if err != nil {
return err
}
@@ -357,96 +362,103 @@ func loadData(starrocks *sql.DB, c plugin.SubTaskContext, starrocksTable, starro
for k, v := range headers {
req.Header.Set(k, v)
}
- resp, err := client.Do(req)
+ respRetry, err := client.Do(req)
if err != nil {
return err
}
- if resp.StatusCode == 307 {
- var location *url.URL
- location, err = resp.Location()
- if err != nil {
- return err
- }
- req, err = http.NewRequest(http.MethodPut, location.String(), bytes.NewBuffer(jsonData))
- if err != nil {
- return err
- }
- req.SetBasicAuth(config.User, config.Password)
- for k, v := range headers {
- req.Header.Set(k, v)
- }
- resp, err = client.Do(req)
- }
- if err != nil {
- return err
- }
- b, err := io.ReadAll(resp.Body)
+ defer respRetry.Body.Close()
+ b, err = io.ReadAll(respRetry.Body)
if err != nil {
return err
}
- var result map[string]interface{}
- err = json.Unmarshal(b, &result)
+ } else {
+ b, err = io.ReadAll(resp.Body)
if err != nil {
return err
}
- if resp.StatusCode != http.StatusOK {
- c.GetLogger().Error(nil, "[%s]: %s", resp.StatusCode, string(b))
- }
- if result["Status"] != "Success" {
- c.GetLogger().Error(nil, "load %s failed: %s", table, string(b))
- } else {
- c.GetLogger().Debug("load %s success: %s, limit: %d, offset: %d", table, b, config.BatchSize, offset)
- }
- offset += len(data)
}
- // drop old table
- _, err = starrocks.Exec(fmt.Sprintf("drop table if exists %s", starrocksTable))
+
+ var result map[string]interface{}
+ err = json.Unmarshal(b, &result)
if err != nil {
return err
}
- // rename tmp table to old table
- _, err = starrocks.Exec(fmt.Sprintf("alter table %s rename %s", starrocksTmpTable, starrocksTable))
- if err != nil {
- return err
+ if resp.StatusCode != http.StatusOK {
+ logger.Error(nil, "[%s]: %s", resp.StatusCode, string(b))
}
- // check data count
- rows, err := db.Cursor(
- dal.Select("count(*)"),
- dal.From(table),
- )
- if err != nil {
- return err
+ if result["Status"] != "Success" {
+ logger.Error(nil, "load %s failed: %s", table, string(b))
+ } else {
+ logger.Debug("load %s success: %s, limit: %d, offset: %d", table, b, config.BatchSize, offset)
}
- defer rows.Close()
- var sourceCount int
- for rows.Next() {
- err = rows.Scan(&sourceCount)
- if err != nil {
- return err
+ return nil
+}
+
+// get db instance
+func getDbInstance(c plugin.SubTaskContext) (db dal.Dal, err error) {
+ config := c.GetData().(*StarRocksConfig)
+ if config.SourceDsn != "" && config.SourceType != "" {
+ var o *gorm.DB
+ switch config.SourceType {
+ case "mysql":
+ o, err = gorm.Open(mysql.Open(config.SourceDsn))
+ if err != nil {
+ return nil, err
+ }
+ case "postgres":
+ o, err = gorm.Open(postgres.Open(config.SourceDsn))
+ if err != nil {
+ return nil, err
+ }
+ default:
+ return nil, errors.NotFound.New(fmt.Sprintf("unsupported source type %s", config.SourceType))
}
+ db = dalgorm.NewDalgorm(o)
+ } else {
+ db = c.GetDal()
}
- rowsStarRocks, err := starrocks.Query(fmt.Sprintf("select count(*) from %s", starrocksTable))
- if err != nil {
- return err
- }
- defer rowsStarRocks.Close()
- var starrocksCount int
- for rowsStarRocks.Next() {
- err = rowsStarRocks.Scan(&starrocksCount)
+
+ return db, nil
+
+}
+
+// get imported tables
+func getExportingTables(c plugin.SubTaskContext, db dal.Dal) (starrocksTables []string, err error) {
+ config := c.GetData().(*StarRocksConfig)
+ if config.DomainLayer != "" {
+ starrocksTables = utils.GetTablesByDomainLayer(config.DomainLayer)
+ if starrocksTables == nil {
+ return nil, errors.NotFound.New(fmt.Sprintf("no table found by domain layer: %s", config.DomainLayer))
+ }
+ } else {
+ tables := config.Tables
+ allTables, err := db.AllTables()
if err != nil {
- return err
+ return nil, err
+ }
+ if len(tables) == 0 {
+ starrocksTables = allTables
+ } else {
+ for _, table := range allTables {
+ for _, r := range tables {
+ var ok bool
+ ok, err := regexp.Match(r, []byte(table))
+ if err != nil {
+ return nil, err
+ }
+ if ok {
+ starrocksTables = append(starrocksTables, table)
+ }
+ }
+ }
}
}
- if sourceCount != starrocksCount {
- c.GetLogger().Warn(nil, "source count %d not equal to starrocks count %d", sourceCount, starrocksCount)
- }
- c.GetLogger().Info("load %s to starrocks success", table)
- return nil
+ return starrocksTables, nil
}
-var LoadDataTaskMeta = plugin.SubTaskMeta{
- Name: "LoadData",
- EntryPoint: LoadData,
+var ExportDataTaskMeta = plugin.SubTaskMeta{
+ Name: "ExportData",
+ EntryPoint: ExportData,
EnabledByDefault: true,
Description: "Load data to StarRocks",
}