You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2022/10/21 04:01:24 UTC

[incubator-devlake] branch main updated: fix: StarRocks plugin will not sync table struct if source table change struct (#3526)

This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d4daa62a fix: StarRocks plugin will not sync table struct if source table change struct (#3526)
7d4daa62a is described below

commit 7d4daa62aef107104e016d6ee2ee0b65b51314b6
Author: long2ice <lo...@gmail.com>
AuthorDate: Fri Oct 21 12:01:19 2022 +0800

    fix: StarRocks plugin will not sync table struct if source table change struct (#3526)
---
 plugins/starrocks/tasks.go | 17 ++++++-----------
 1 file changed, 6 insertions(+), 11 deletions(-)

diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index 89d8fa1b0..6a90ca9a8 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -106,9 +106,10 @@ func LoadData(c core.SubTaskContext) errors.Error {
 
 	for _, table := range starrocksTables {
 		starrocksTable := strings.TrimLeft(table, "_")
+		starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
 		var columnMap map[string]string
 		var orderBy string
-		columnMap, orderBy, err = createTable(starrocks, db, starrocksTable, table, c, config)
+		columnMap, orderBy, err = createTmpTable(starrocks, db, starrocksTmpTable, table, c, config)
 		if err != nil {
 			c.GetLogger().Error(err, "create table %s in starrocks error", table)
 			return errors.Convert(err)
@@ -127,7 +128,7 @@ func LoadData(c core.SubTaskContext) errors.Error {
 			}
 
 		}
-		err = errors.Convert(loadData(starrocks, c, starrocksTable, table, columnMap, db, config, orderBy))
+		err = errors.Convert(loadData(starrocks, c, starrocksTable, starrocksTmpTable, table, columnMap, db, config, orderBy))
 		if err != nil {
 			return errors.Convert(err)
 		}
@@ -139,7 +140,7 @@ func LoadData(c core.SubTaskContext) errors.Error {
 	return nil
 }
 
-func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table string, c core.SubTaskContext, config *StarRocksConfig) (map[string]string, string, errors.Error) {
+func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTmpTable string, table string, c core.SubTaskContext, config *StarRocksConfig) (map[string]string, string, errors.Error) {
 	columeMetas, err := db.GetColumns(&Table{name: table}, nil)
 	columnMap := make(map[string]string)
 	if err != nil {
@@ -198,20 +199,14 @@ func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table str
 			extra = v
 		}
 	}
-	tableSql := fmt.Sprintf("create table if not exists `%s` ( %s ) %s", starrocksTable, strings.Join(columns, ","), extra)
+	tableSql := fmt.Sprintf("drop table if exists %s; create table if not exists `%s` ( %s ) %s", starrocksTmpTable, starrocksTmpTable, strings.Join(columns, ","), extra)
 	c.GetLogger().Info(tableSql)
 	_, err = errors.Convert01(starrocks.Exec(tableSql))
 	return columnMap, orderBy, err
 }
 
-func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig, orderBy string) error {
+func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable, starrocksTmpTable, table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig, orderBy string) error {
 	offset := 0
-	starrocksTmpTable := starrocksTable + "_tmp"
-	// create tmp table in starrocks
-	_, execErr := starrocks.Exec(fmt.Sprintf("drop table if exists %s; create table %s like %s", starrocksTmpTable, starrocksTmpTable, starrocksTable))
-	if execErr != nil {
-		return execErr
-	}
 	var err error
 	for {
 		var rows *sql.Rows