You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2023/01/10 03:01:51 UTC

[incubator-devlake] branch release-v0.14 updated: feat: skip sync data when table not changed (#4164)

This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch release-v0.14
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v0.14 by this push:
     new 80cc03473 feat: skip sync data when table not changed (#4164)
80cc03473 is described below

commit 80cc0347363d3c100580a5aec6ffa4776636b936
Author: long2ice <ji...@merico.dev>
AuthorDate: Tue Jan 10 11:01:47 2023 +0800

    feat: skip sync data when table not changed (#4164)
    
    * refactor: make some log level to debug to reduce (#3775)
    
    * feat: skip sync data table not changed
---
 .../src/data/pipeline-config-samples/starrocks.js  |  1 +
 plugins/starrocks/api/connection.go                | 29 +++++-----
 plugins/starrocks/starrocks.go                     | 28 +++++-----
 plugins/starrocks/task_data.go                     | 29 +++++-----
 plugins/starrocks/tasks.go                         | 64 ++++++++++++++++++----
 5 files changed, 100 insertions(+), 51 deletions(-)

diff --git a/config-ui/src/data/pipeline-config-samples/starrocks.js b/config-ui/src/data/pipeline-config-samples/starrocks.js
index a49321063..3fa0fb74b 100644
--- a/config-ui/src/data/pipeline-config-samples/starrocks.js
+++ b/config-ui/src/data/pipeline-config-samples/starrocks.js
@@ -22,6 +22,7 @@ const starRocksConfig = [
       options: {
         source_type: '', // mysql or postgres
         source_dsn: '', // gorm dsn
+        update_column: '', // update column
         host: '127.0.0.1',
         port: 9030,
         user: 'root',
diff --git a/plugins/starrocks/api/connection.go b/plugins/starrocks/api/connection.go
index 7ea0d2dec..2af239651 100644
--- a/plugins/starrocks/api/connection.go
+++ b/plugins/starrocks/api/connection.go
@@ -37,19 +37,20 @@ func PostStarRocksPipeline(input *core.ApiResourceInput) (*core.ApiResourceOutpu
 type StarRocksPipelinePlan [][]struct {
 	Plugin  string `json:"plugin"`
 	Options struct {
-		SourceType  string            `json:"source_type"`
-		SourceDsn   string            `json:"source_dsn"`
-		Host        string            `json:"host"`
-		Port        int               `json:"port"`
-		User        string            `json:"user"`
-		Password    string            `json:"password"`
-		Database    string            `json:"database"`
-		BeHost      string            `json:"be_host"`
-		BePort      int               `json:"be_port"`
-		Tables      []string          `json:"tables"`
-		BatchSize   int               `json:"batch_size"`
-		OrderBy     map[string]string `json:"order_by"`
-		Extra       string            `json:"extra"`
-		DomainLayer string            `json:"domain_layer"`
+		SourceType   string            `json:"source_type"`
+		SourceDsn    string            `json:"source_dsn"`
+		UpdateColumn string            `json:"update_column"`
+		Host         string            `json:"host"`
+		Port         int               `json:"port"`
+		User         string            `json:"user"`
+		Password     string            `json:"password"`
+		Database     string            `json:"database"`
+		BeHost       string            `json:"be_host"`
+		BePort       int               `json:"be_port"`
+		Tables       []string          `json:"tables"`
+		BatchSize    int               `json:"batch_size"`
+		OrderBy      map[string]string `json:"order_by"`
+		Extra        string            `json:"extra"`
+		DomainLayer  string            `json:"domain_layer"`
 	} `json:"options"`
 }
diff --git a/plugins/starrocks/starrocks.go b/plugins/starrocks/starrocks.go
index 930bc7750..9e5cbc724 100644
--- a/plugins/starrocks/starrocks.go
+++ b/plugins/starrocks/starrocks.go
@@ -63,6 +63,7 @@ func main() {
 	cmd := &cobra.Command{Use: "StarRocks"}
 	sourceType := cmd.Flags().StringP("source_type", "st", "", "Source type")
 	sourceDsn := cmd.Flags().StringP("source_dsn", "sd", "", "Source dsn")
+	updateColumn := cmd.Flags().StringP("update_column", "uc", "", "Update column")
 	_ = cmd.MarkFlagRequired("host")
 	host := cmd.Flags().StringP("host", "h", "", "StarRocks host")
 	_ = cmd.MarkFlagRequired("port")
@@ -85,19 +86,20 @@ func main() {
 	orderBy := cmd.Flags().StringP("order_by", "o", "", "Source tables order by, default is primary key")
 	cmd.Run = func(cmd *cobra.Command, args []string) {
 		runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
-			"source_type": sourceType,
-			"source_dsn":  sourceDsn,
-			"host":        host,
-			"port":        port,
-			"user":        user,
-			"password":    password,
-			"database":    database,
-			"be_host":     beHost,
-			"be_port":     bePort,
-			"tables":      tables,
-			"batch_size":  batchSize,
-			"extra":       extra,
-			"order_by":    orderBy,
+			"source_type":   sourceType,
+			"source_dsn":    sourceDsn,
+			"update_column": updateColumn,
+			"host":          host,
+			"port":          port,
+			"user":          user,
+			"password":      password,
+			"database":      database,
+			"be_host":       beHost,
+			"be_port":       bePort,
+			"tables":        tables,
+			"batch_size":    batchSize,
+			"extra":         extra,
+			"order_by":      orderBy,
 		})
 	}
 	runner.RunCmd(cmd)
diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/task_data.go
index 35f555442..8876ae56a 100644
--- a/plugins/starrocks/task_data.go
+++ b/plugins/starrocks/task_data.go
@@ -18,18 +18,19 @@ limitations under the License.
 package main
 
 type StarRocksConfig struct {
-	SourceType  string `mapstructure:"source_type"`
-	SourceDsn   string `mapstructure:"source_dsn"`
-	Host        string
-	Port        int
-	User        string
-	Password    string
-	Database    string
-	BeHost      string `mapstructure:"be_host"`
-	BePort      int    `mapstructure:"be_port"`
-	Tables      []string
-	BatchSize   int               `mapstructure:"batch_size"`
-	OrderBy     map[string]string `mapstructure:"order_by"`
-	DomainLayer string            `mapstructure:"domain_layer"`
-	Extra       map[string]string
+	SourceType   string `mapstructure:"source_type"`
+	SourceDsn    string `mapstructure:"source_dsn"`
+	UpdateColumn string `mapstructure:"update_column"`
+	Host         string
+	Port         int
+	User         string
+	Password     string
+	Database     string
+	BeHost       string `mapstructure:"be_host"`
+	BePort       int    `mapstructure:"be_port"`
+	Tables       []string
+	BatchSize    int               `mapstructure:"batch_size"`
+	OrderBy      map[string]string `mapstructure:"order_by"`
+	DomainLayer  string            `mapstructure:"domain_layer"`
+	Extra        map[string]string
 }
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index 6705dbfc9..044854019 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -27,6 +27,7 @@ import (
 	"net/url"
 	"regexp"
 	"strings"
+	"time"
 
 	"github.com/apache/incubator-devlake/errors"
 	"github.com/apache/incubator-devlake/impl/dalgorm"
@@ -115,7 +116,12 @@ func LoadData(c core.SubTaskContext) errors.Error {
 		starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
 		var columnMap map[string]string
 		var orderBy string
-		columnMap, orderBy, err = createTmpTable(starrocks, db, starrocksTmpTable, table, c, config)
+		var skip bool
+		columnMap, orderBy, skip, err = createTmpTable(starrocks, db, starrocksTable, starrocksTmpTable, table, c, config)
+		if skip {
+			c.GetLogger().Info(fmt.Sprintf("table %s is up to date, so skip it", table))
+			continue
+		}
 		if err != nil {
 			c.GetLogger().Error(err, "create table %s in starrocks error", table)
 			return errors.Convert(err)
@@ -149,18 +155,19 @@ func LoadData(c core.SubTaskContext) errors.Error {
 	return nil
 }
 
-func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTmpTable string, table string, c core.SubTaskContext, config *StarRocksConfig) (map[string]string, string, errors.Error) {
-	columeMetas, err := db.GetColumns(&Table{name: table}, nil)
+func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, starrocksTmpTable string, table string, c core.SubTaskContext, config *StarRocksConfig) (map[string]string, string, bool, errors.Error) {
+	columnMetas, err := db.GetColumns(&Table{name: table}, nil)
+	updateColumn := config.UpdateColumn
 	columnMap := make(map[string]string)
 	if err != nil {
 		if strings.Contains(err.Error(), "cached plan must not change result type") {
 			c.GetLogger().Warn(err, "skip err: cached plan must not change result type")
-			columeMetas, err = db.GetColumns(&Table{name: table}, nil)
+			columnMetas, err = db.GetColumns(&Table{name: table}, nil)
 			if err != nil {
-				return nil, "", errors.Convert(err)
+				return nil, "", false, errors.Convert(err)
 			}
 		} else {
-			return nil, "", errors.Convert(err)
+			return nil, "", false, errors.Convert(err)
 		}
 	}
 	var pks []string
@@ -172,15 +179,52 @@ func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTmpTable string, tab
 	} else if db.Dialect() == "mysql" {
 		separator = "`"
 	} else {
-		return nil, "", errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
+		return nil, "", false, errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
 	}
 	firstcm := ""
 	firstcmName := ""
-	for _, cm := range columeMetas {
+	for _, cm := range columnMetas {
 		name := cm.Name()
+		if name == updateColumn {
+			// check update column to detect skip or not
+			rows, err := db.Cursor(
+				dal.From(table),
+				dal.Select(updateColumn),
+				dal.Limit(1),
+				dal.Orderby(fmt.Sprintf("%s desc", updateColumn)),
+			)
+			if err != nil {
+				return nil, "", false, err
+			}
+			var updatedFrom time.Time
+			if rows.Next() {
+				err = errors.Convert(rows.Scan(&updatedFrom))
+				if err != nil {
+					return nil, "", false, err
+				}
+			}
+			var starrocksErr error
+			rowsInStarRocks, starrocksErr := starrocks.Query(fmt.Sprintf("select %s from %s order by %s desc limit 1", updateColumn, starrocksTable, updateColumn))
+			if starrocksErr != nil {
+				if !strings.Contains(starrocksErr.Error(), "Unknown table") {
+					return nil, "", false, errors.Convert(starrocksErr)
+				}
+			} else {
+				var updatedTo time.Time
+				if rowsInStarRocks.Next() {
+					err = errors.Convert(rowsInStarRocks.Scan(&updatedTo))
+					if err != nil {
+						return nil, "", false, err
+					}
+				}
+				if updatedFrom.Equal(updatedTo) {
+					return nil, "", true, nil
+				}
+			}
+		}
 		columnDatatype, ok := cm.ColumnType()
 		if !ok {
-			return columnMap, "", errors.Default.New(fmt.Sprintf("Get [%s] ColumeType Failed", name))
+			return columnMap, "", false, errors.Default.New(fmt.Sprintf("Get [%s] ColumeType Failed", name))
 		}
 		dataType := getStarRocksDataType(columnDatatype)
 		columnMap[name] = dataType
@@ -218,7 +262,7 @@ func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTmpTable string, tab
 	tableSql := fmt.Sprintf("drop table if exists %s; create table if not exists `%s` ( %s ) %s", starrocksTmpTable, starrocksTmpTable, strings.Join(columns, ","), extra)
 	c.GetLogger().Debug(tableSql)
 	_, err = errors.Convert01(starrocks.Exec(tableSql))
-	return columnMap, orderBy, err
+	return columnMap, orderBy, false, err
 }
 
 func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable, starrocksTmpTable, table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig, orderBy string) error {