You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2023/01/10 03:01:51 UTC
[incubator-devlake] branch release-v0.14 updated: feat: skip sync data when table not changed (#4164)
This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch release-v0.14
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/release-v0.14 by this push:
new 80cc03473 feat: skip sync data when table not changed (#4164)
80cc03473 is described below
commit 80cc0347363d3c100580a5aec6ffa4776636b936
Author: long2ice <ji...@merico.dev>
AuthorDate: Tue Jan 10 11:01:47 2023 +0800
feat: skip sync data when table not changed (#4164)
* refactor: make some log level to debug to reduce (#3775)
* feat: skip sync data table not changed
---
.../src/data/pipeline-config-samples/starrocks.js | 1 +
plugins/starrocks/api/connection.go | 29 +++++-----
plugins/starrocks/starrocks.go | 28 +++++-----
plugins/starrocks/task_data.go | 29 +++++-----
plugins/starrocks/tasks.go | 64 ++++++++++++++++++----
5 files changed, 100 insertions(+), 51 deletions(-)
diff --git a/config-ui/src/data/pipeline-config-samples/starrocks.js b/config-ui/src/data/pipeline-config-samples/starrocks.js
index a49321063..3fa0fb74b 100644
--- a/config-ui/src/data/pipeline-config-samples/starrocks.js
+++ b/config-ui/src/data/pipeline-config-samples/starrocks.js
@@ -22,6 +22,7 @@ const starRocksConfig = [
options: {
source_type: '', // mysql or postgres
source_dsn: '', // gorm dsn
+ update_column: '', // update column
host: '127.0.0.1',
port: 9030,
user: 'root',
diff --git a/plugins/starrocks/api/connection.go b/plugins/starrocks/api/connection.go
index 7ea0d2dec..2af239651 100644
--- a/plugins/starrocks/api/connection.go
+++ b/plugins/starrocks/api/connection.go
@@ -37,19 +37,20 @@ func PostStarRocksPipeline(input *core.ApiResourceInput) (*core.ApiResourceOutpu
type StarRocksPipelinePlan [][]struct {
Plugin string `json:"plugin"`
Options struct {
- SourceType string `json:"source_type"`
- SourceDsn string `json:"source_dsn"`
- Host string `json:"host"`
- Port int `json:"port"`
- User string `json:"user"`
- Password string `json:"password"`
- Database string `json:"database"`
- BeHost string `json:"be_host"`
- BePort int `json:"be_port"`
- Tables []string `json:"tables"`
- BatchSize int `json:"batch_size"`
- OrderBy map[string]string `json:"order_by"`
- Extra string `json:"extra"`
- DomainLayer string `json:"domain_layer"`
+ SourceType string `json:"source_type"`
+ SourceDsn string `json:"source_dsn"`
+ UpdateColumn string `json:"update_column"`
+ Host string `json:"host"`
+ Port int `json:"port"`
+ User string `json:"user"`
+ Password string `json:"password"`
+ Database string `json:"database"`
+ BeHost string `json:"be_host"`
+ BePort int `json:"be_port"`
+ Tables []string `json:"tables"`
+ BatchSize int `json:"batch_size"`
+ OrderBy map[string]string `json:"order_by"`
+ Extra string `json:"extra"`
+ DomainLayer string `json:"domain_layer"`
} `json:"options"`
}
diff --git a/plugins/starrocks/starrocks.go b/plugins/starrocks/starrocks.go
index 930bc7750..9e5cbc724 100644
--- a/plugins/starrocks/starrocks.go
+++ b/plugins/starrocks/starrocks.go
@@ -63,6 +63,7 @@ func main() {
cmd := &cobra.Command{Use: "StarRocks"}
sourceType := cmd.Flags().StringP("source_type", "st", "", "Source type")
sourceDsn := cmd.Flags().StringP("source_dsn", "sd", "", "Source dsn")
+ updateColumn := cmd.Flags().StringP("update_column", "uc", "", "Update column")
_ = cmd.MarkFlagRequired("host")
host := cmd.Flags().StringP("host", "h", "", "StarRocks host")
_ = cmd.MarkFlagRequired("port")
@@ -85,19 +86,20 @@ func main() {
orderBy := cmd.Flags().StringP("order_by", "o", "", "Source tables order by, default is primary key")
cmd.Run = func(cmd *cobra.Command, args []string) {
runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
- "source_type": sourceType,
- "source_dsn": sourceDsn,
- "host": host,
- "port": port,
- "user": user,
- "password": password,
- "database": database,
- "be_host": beHost,
- "be_port": bePort,
- "tables": tables,
- "batch_size": batchSize,
- "extra": extra,
- "order_by": orderBy,
+ "source_type": sourceType,
+ "source_dsn": sourceDsn,
+ "update_column": updateColumn,
+ "host": host,
+ "port": port,
+ "user": user,
+ "password": password,
+ "database": database,
+ "be_host": beHost,
+ "be_port": bePort,
+ "tables": tables,
+ "batch_size": batchSize,
+ "extra": extra,
+ "order_by": orderBy,
})
}
runner.RunCmd(cmd)
diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/task_data.go
index 35f555442..8876ae56a 100644
--- a/plugins/starrocks/task_data.go
+++ b/plugins/starrocks/task_data.go
@@ -18,18 +18,19 @@ limitations under the License.
package main
type StarRocksConfig struct {
- SourceType string `mapstructure:"source_type"`
- SourceDsn string `mapstructure:"source_dsn"`
- Host string
- Port int
- User string
- Password string
- Database string
- BeHost string `mapstructure:"be_host"`
- BePort int `mapstructure:"be_port"`
- Tables []string
- BatchSize int `mapstructure:"batch_size"`
- OrderBy map[string]string `mapstructure:"order_by"`
- DomainLayer string `mapstructure:"domain_layer"`
- Extra map[string]string
+ SourceType string `mapstructure:"source_type"`
+ SourceDsn string `mapstructure:"source_dsn"`
+ UpdateColumn string `mapstructure:"update_column"`
+ Host string
+ Port int
+ User string
+ Password string
+ Database string
+ BeHost string `mapstructure:"be_host"`
+ BePort int `mapstructure:"be_port"`
+ Tables []string
+ BatchSize int `mapstructure:"batch_size"`
+ OrderBy map[string]string `mapstructure:"order_by"`
+ DomainLayer string `mapstructure:"domain_layer"`
+ Extra map[string]string
}
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index 6705dbfc9..044854019 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -27,6 +27,7 @@ import (
"net/url"
"regexp"
"strings"
+ "time"
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/impl/dalgorm"
@@ -115,7 +116,12 @@ func LoadData(c core.SubTaskContext) errors.Error {
starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
var columnMap map[string]string
var orderBy string
- columnMap, orderBy, err = createTmpTable(starrocks, db, starrocksTmpTable, table, c, config)
+ var skip bool
+ columnMap, orderBy, skip, err = createTmpTable(starrocks, db, starrocksTable, starrocksTmpTable, table, c, config)
+ if skip {
+ c.GetLogger().Info(fmt.Sprintf("table %s is up to date, so skip it", table))
+ continue
+ }
if err != nil {
c.GetLogger().Error(err, "create table %s in starrocks error", table)
return errors.Convert(err)
@@ -149,18 +155,19 @@ func LoadData(c core.SubTaskContext) errors.Error {
return nil
}
-func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTmpTable string, table string, c core.SubTaskContext, config *StarRocksConfig) (map[string]string, string, errors.Error) {
- columeMetas, err := db.GetColumns(&Table{name: table}, nil)
+func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, starrocksTmpTable string, table string, c core.SubTaskContext, config *StarRocksConfig) (map[string]string, string, bool, errors.Error) {
+ columnMetas, err := db.GetColumns(&Table{name: table}, nil)
+ updateColumn := config.UpdateColumn
columnMap := make(map[string]string)
if err != nil {
if strings.Contains(err.Error(), "cached plan must not change result type") {
c.GetLogger().Warn(err, "skip err: cached plan must not change result type")
- columeMetas, err = db.GetColumns(&Table{name: table}, nil)
+ columnMetas, err = db.GetColumns(&Table{name: table}, nil)
if err != nil {
- return nil, "", errors.Convert(err)
+ return nil, "", false, errors.Convert(err)
}
} else {
- return nil, "", errors.Convert(err)
+ return nil, "", false, errors.Convert(err)
}
}
var pks []string
@@ -172,15 +179,52 @@ func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTmpTable string, tab
} else if db.Dialect() == "mysql" {
separator = "`"
} else {
- return nil, "", errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
+ return nil, "", false, errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
}
firstcm := ""
firstcmName := ""
- for _, cm := range columeMetas {
+ for _, cm := range columnMetas {
name := cm.Name()
+ if name == updateColumn {
+ // check update column to detect skip or not
+ rows, err := db.Cursor(
+ dal.From(table),
+ dal.Select(updateColumn),
+ dal.Limit(1),
+ dal.Orderby(fmt.Sprintf("%s desc", updateColumn)),
+ )
+ if err != nil {
+ return nil, "", false, err
+ }
+ var updatedFrom time.Time
+ if rows.Next() {
+ err = errors.Convert(rows.Scan(&updatedFrom))
+ if err != nil {
+ return nil, "", false, err
+ }
+ }
+ var starrocksErr error
+ rowsInStarRocks, starrocksErr := starrocks.Query(fmt.Sprintf("select %s from %s order by %s desc limit 1", updateColumn, starrocksTable, updateColumn))
+ if starrocksErr != nil {
+ if !strings.Contains(starrocksErr.Error(), "Unknown table") {
+ return nil, "", false, errors.Convert(starrocksErr)
+ }
+ } else {
+ var updatedTo time.Time
+ if rowsInStarRocks.Next() {
+ err = errors.Convert(rowsInStarRocks.Scan(&updatedTo))
+ if err != nil {
+ return nil, "", false, err
+ }
+ }
+ if updatedFrom.Equal(updatedTo) {
+ return nil, "", true, nil
+ }
+ }
+ }
columnDatatype, ok := cm.ColumnType()
if !ok {
- return columnMap, "", errors.Default.New(fmt.Sprintf("Get [%s] ColumeType Failed", name))
+ return columnMap, "", false, errors.Default.New(fmt.Sprintf("Get [%s] ColumeType Failed", name))
}
dataType := getStarRocksDataType(columnDatatype)
columnMap[name] = dataType
@@ -218,7 +262,7 @@ func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTmpTable string, tab
tableSql := fmt.Sprintf("drop table if exists %s; create table if not exists `%s` ( %s ) %s", starrocksTmpTable, starrocksTmpTable, strings.Join(columns, ","), extra)
c.GetLogger().Debug(tableSql)
_, err = errors.Convert01(starrocks.Exec(tableSql))
- return columnMap, orderBy, err
+ return columnMap, orderBy, false, err
}
func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable, starrocksTmpTable, table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig, orderBy string) error {