You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by "abeizn (via GitHub)" <gi...@apache.org> on 2023/02/21 11:22:44 UTC

[GitHub] [incubator-devlake] abeizn opened a new pull request, #4475: refactor: refactor starrocks, improve performance, solve bugs

abeizn opened a new pull request, #4475:
URL: https://github.com/apache/incubator-devlake/pull/4475

   ### Summary
   refactor starrocks, improve performance, solve bugs.
   
   ### Does this close any open issues?
   Closes #4418 
   Closes #4376 
   Closes #4377 
   
   ### Screenshots
   export data:
   ![image](https://user-images.githubusercontent.com/101256042/220330683-24aa6414-7b52-447c-b373-02ab8f1c6bdf.png)
   
   cancel task:
   ![image](https://user-images.githubusercontent.com/101256042/220331600-2424b331-475c-43a3-a2f9-8e15b549589f.png)
   
   pipeline is blocked for 30+ mins:
   ![image](https://user-images.githubusercontent.com/101256042/220331822-30332cf1-dff5-4408-be5c-7123ea213c78.png)
   
   
   
   ### Other Information
   Any other information that is important to this PR.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@devlake.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-devlake] mindlesscloud commented on a diff in pull request #4475: refactor: refactor starrocks, improve performance, solve bugs

Posted by "mindlesscloud (via GitHub)" <gi...@apache.org>.
mindlesscloud commented on code in PR #4475:
URL: https://github.com/apache/incubator-devlake/pull/4475#discussion_r1115310568


##########
backend/plugins/starrocks/tasks/tasks.go:
##########
@@ -271,182 +181,243 @@ func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, starro
 			extra = v
 		}
 	}
-	tableSql := fmt.Sprintf("drop table if exists %s; create table if not exists `%s` ( %s ) %s", starrocksTmpTable, starrocksTmpTable, strings.Join(columns, ","), extra)
-	c.GetLogger().Debug(tableSql)
-	_, err = errors.Convert01(starrocks.Exec(tableSql))
+	tableSql := fmt.Sprintf("DROP TABLE IF EXISTS %s; CREATE TABLE IF NOT EXISTS `%s` ( %s ) %s", starrocksTmpTable, starrocksTmpTable, strings.Join(columns, ","), extra)
+	logger.Debug(tableSql)
+	err = starrocksDb.Exec(tableSql)
 	return columnMap, orderBy, false, err
 }
 
-func loadData(starrocks *sql.DB, c plugin.SubTaskContext, starrocksTable, starrocksTmpTable, table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig, orderBy string) error {
-	offset := 0
+// put data to final dst database
+func putDataToDst(c plugin.SubTaskContext, starrocksDb, db dal.Dal, starrocksTable, starrocksTmpTable, table string, columnMap map[string]string, orderBy string) error {
+	logger := c.GetLogger()
+	config := c.GetData().(*StarRocksConfig)
+	var offset int
 	var err error
-	for {
-		var data []map[string]interface{}
-		// select data from db
-		err = func() error {
-			var rows dal.Rows
-			rows, err = db.Cursor(
-				dal.From(table),
-				dal.Orderby(orderBy),
-				dal.Limit(config.BatchSize),
-				dal.Offset(offset),
-			)
-			if err != nil {
-				return err
-			}
-			defer rows.Close()
-			cols, err := rows.Columns()
-			if err != nil {
-				return err
-			}
-			for rows.Next() {
-				row := make(map[string]interface{})
-				columns := make([]interface{}, len(cols))
-				columnPointers := make([]interface{}, len(cols))
-				for i := range columns {
-					dataType := columnMap[cols[i]]
-					if strings.HasPrefix(dataType, "array") {
-						var arr []string
-						columns[i] = &arr
-						columnPointers[i] = pq.Array(&arr)
-					} else {
-						columnPointers[i] = &columns[i]
-					}
-				}
-				err = rows.Scan(columnPointers...)
-				if err != nil {
-					return err
-				}
-				for i, colName := range cols {
-					row[colName] = columns[i]
-				}
-				data = append(data, row)
+	var rows dal.Rows
+
+	rows, err = db.Cursor(
+		dal.From(table),
+		dal.Orderby(orderBy),
+	)
+	if err != nil {
+		return err
+	}
+	defer rows.Close()
+
+	var data []map[string]interface{}
+	cols, err := (rows).Columns()
+	if err != nil {
+		return err
+	}
+
+	var batchCount int
+	for rows.Next() {
+		select {
+		case <-c.GetContext().Done():
+			return c.GetContext().Err()
+		default:
+		}
+		row := make(map[string]interface{})
+		columns := make([]interface{}, len(cols))
+		columnPointers := make([]interface{}, len(cols))
+		for i := range columns {
+			dataType := columnMap[cols[i]]
+			if strings.HasPrefix(dataType, "array") {
+				var arr []string
+				columns[i] = &arr
+				columnPointers[i] = pq.Array(&arr)
+			} else {
+				columnPointers[i] = &columns[i]
 			}
-			return nil
-		}()
-		if err != nil {
-			return err
-		}
-		if len(data) == 0 {
-			c.GetLogger().Warn(nil, "no data found in table %s already, limit: %d, offset: %d, so break", table, config.BatchSize, offset)
-			break
-		}
-		// insert data to tmp table
-		loadURL := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", config.BeHost, config.BePort, config.Database, starrocksTmpTable)
-		headers := map[string]string{
-			"format":            "json",
-			"strip_outer_array": "true",
-			"Expect":            "100-continue",
-			"ignore_json_size":  "true",
-			"Connection":        "close",
-		}
-		jsonData, err := json.Marshal(data)
-		if err != nil {
-			return err
-		}
-		client := http.Client{
-			CheckRedirect: func(req *http.Request, via []*http.Request) error {
-				return http.ErrUseLastResponse
-			},
 		}
-		req, err := http.NewRequest(http.MethodPut, loadURL, bytes.NewBuffer(jsonData))
+		err = rows.Scan(columnPointers...)
 		if err != nil {
 			return err
 		}
-		req.SetBasicAuth(config.User, config.Password)
-		for k, v := range headers {
-			req.Header.Set(k, v)
-		}
-		resp, err := client.Do(req)
-		if err != nil {
-			return err
+		for i, colName := range cols {
+			row[colName] = columns[i]
 		}
-		if resp.StatusCode == 307 {
-			var location *url.URL
-			location, err = resp.Location()
+		data = append(data, row)
+		batchCount += 1
+		if batchCount == config.BatchSize {
+			err = putBatchData(c, starrocksTmpTable, table, data, config, offset)
 			if err != nil {
 				return err
 			}
-			req, err = http.NewRequest(http.MethodPut, location.String(), bytes.NewBuffer(jsonData))
-			if err != nil {
-				return err
-			}
-			req.SetBasicAuth(config.User, config.Password)
-			for k, v := range headers {
-				req.Header.Set(k, v)
-			}
-			resp, err = client.Do(req)
-		}
-		if err != nil {
-			return err
+			batchCount = 0
+			data = nil
 		}
-		b, err := io.ReadAll(resp.Body)
-		if err != nil {
-			return err
-		}
-		var result map[string]interface{}
-		err = json.Unmarshal(b, &result)
+	}
+	if batchCount != 0 {
+		err = putBatchData(c, starrocksTmpTable, table, data, config, offset)
 		if err != nil {
 			return err
 		}
-		if resp.StatusCode != http.StatusOK {
-			c.GetLogger().Error(nil, "[%s]: %s", resp.StatusCode, string(b))
-		}
-		if result["Status"] != "Success" {
-			c.GetLogger().Error(nil, "load %s failed: %s", table, string(b))
-		} else {
-			c.GetLogger().Debug("load %s success: %s, limit: %d, offset: %d", table, b, config.BatchSize, offset)
-		}
-		offset += len(data)
 	}
+
 	// drop old table
-	_, err = starrocks.Exec(fmt.Sprintf("drop table if exists %s", starrocksTable))
+	err = starrocksDb.Exec("DROP TABLE IF EXISTS ?", clause.Table{Name: starrocksTable})
 	if err != nil {
 		return err
 	}
 	// rename tmp table to old table
-	_, err = starrocks.Exec(fmt.Sprintf("alter table %s rename %s", starrocksTmpTable, starrocksTable))
+	err = starrocksDb.Exec("ALTER TABLE ? RENAME ?", clause.Table{Name: starrocksTmpTable}, clause.Table{Name: starrocksTable})
 	if err != nil {
 		return err
 	}
+
 	// check data count
-	rows, err := db.Cursor(
-		dal.Select("count(*)"),
-		dal.From(table),
-	)
+	sourceCount, err := db.Count(dal.From(table))
 	if err != nil {
 		return err
 	}
-	defer rows.Close()
-	var sourceCount int
-	for rows.Next() {
-		err = rows.Scan(&sourceCount)
-		if err != nil {
-			return err
-		}
+	starrocksCount, err := starrocksDb.Count(dal.From(starrocksTable))
+	if err != nil {
+		return err
+	}
+	if sourceCount != starrocksCount {
+		logger.Warn(nil, "source count %d not equal to starrocks count %d", sourceCount, starrocksCount)
+	}
+	logger.Info("load %s to starrocks success", table)
+	return nil
+}
+
+// put batch size data to database
+func putBatchData(c plugin.SubTaskContext, starrocksTmpTable, table string, data []map[string]interface{}, config *StarRocksConfig, offset int) error {
+	logger := c.GetLogger()
+	// insert data to tmp table
+	loadURL := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", config.BeHost, config.BePort, config.Database, starrocksTmpTable)
+	headers := map[string]string{
+		"format":            "json",
+		"strip_outer_array": "true",
+		"Expect":            "100-continue",
+		"ignore_json_size":  "true",
+		"Connection":        "close",
+	}
+	jsonData, err := json.Marshal(data)
+	if err != nil {
+		return err
+	}
+	client := http.Client{
+		CheckRedirect: func(req *http.Request, via []*http.Request) error {
+			return http.ErrUseLastResponse
+		},
 	}
-	rowsStarRocks, err := starrocks.Query(fmt.Sprintf("select count(*) from %s", starrocksTable))
+	req, err := http.NewRequest(http.MethodPut, loadURL, bytes.NewBuffer(jsonData))
 	if err != nil {
 		return err
 	}
-	defer rowsStarRocks.Close()
-	var starrocksCount int
-	for rowsStarRocks.Next() {
-		err = rowsStarRocks.Scan(&starrocksCount)
+	req.SetBasicAuth(config.User, config.Password)
+	for k, v := range headers {
+		req.Header.Set(k, v)
+	}
+	resp, err := client.Do(req)
+	if err != nil {
+		return err
+	}

Review Comment:
   Don't forget to close the resp.Body 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@devlake.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-devlake] abeizn merged pull request #4475: refactor: refactor starrocks, improve performance, solve bugs

Posted by "abeizn (via GitHub)" <gi...@apache.org>.
abeizn merged PR #4475:
URL: https://github.com/apache/incubator-devlake/pull/4475


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@devlake.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-devlake] klesh commented on a diff in pull request #4475: refactor: refactor starrocks, improve performance, solve bugs

Posted by "klesh (via GitHub)" <gi...@apache.org>.
klesh commented on code in PR #4475:
URL: https://github.com/apache/incubator-devlake/pull/4475#discussion_r1115177291


##########
backend/plugins/starrocks/tasks/tasks.go:
##########
@@ -48,187 +49,96 @@ func (t *Table) TableName() string {
 	return t.name
 }
 
-func LoadData(c plugin.SubTaskContext) errors.Error {
-	var db dal.Dal
+func ExportData(c plugin.SubTaskContext) errors.Error {
+	logger := c.GetLogger()
 	config := c.GetData().(*StarRocksConfig)
-	if config.SourceDsn != "" && config.SourceType != "" {
-		var o *gorm.DB
-		var err error
-		if config.SourceType == "mysql" {
-			o, err = gorm.Open(mysql.Open(config.SourceDsn))
-			if err != nil {
-				return errors.Convert(err)
-			}
-		} else if config.SourceType == "postgres" {
-			o, err = gorm.Open(postgres.Open(config.SourceDsn))
-			if err != nil {
-				return errors.Convert(err)
-			}
-		} else {
-			return errors.NotFound.New(fmt.Sprintf("unsupported source type %s", config.SourceType))
-		}
-		db = dalgorm.NewDalgorm(o)
-		sqlDB, err := o.DB()
-		if err != nil {
-			return errors.Convert(err)
-		}
-		defer sqlDB.Close()
-	} else {
-		db = c.GetDal()
+	// 1. Get db instance
+	db, err := getDbInstance(c)
+	if err != nil {
+		return errors.Convert(err)
 	}
-	var starrocksTables []string
-	if config.DomainLayer != "" {
-		starrocksTables = utils.GetTablesByDomainLayer(config.DomainLayer)
-		if starrocksTables == nil {
-			return errors.NotFound.New(fmt.Sprintf("no table found by domain layer: %s", config.DomainLayer))
-		}
-	} else {
-		tables := config.Tables
-		allTables, err := db.AllTables()
-		if err != nil {
-			return err
-		}
-		if len(tables) == 0 {
-			starrocksTables = allTables
-		} else {
-			for _, table := range allTables {
-				for _, r := range tables {
-					var ok bool
-					ok, err = errors.Convert01(regexp.Match(r, []byte(table)))
-					if err != nil {
-						return err
-					}
-					if ok {
-						starrocksTables = append(starrocksTables, table)
-					}
-				}
-			}
-		}
+	// 2. Filter out the tables to import
+	starrocksTables, err := getImportTables(c, db)

Review Comment:
   import? I think it should be export?



##########
backend/plugins/starrocks/tasks/tasks.go:
##########
@@ -48,187 +49,96 @@ func (t *Table) TableName() string {
 	return t.name
 }
 
-func LoadData(c plugin.SubTaskContext) errors.Error {
-	var db dal.Dal
+func ExportData(c plugin.SubTaskContext) errors.Error {
+	logger := c.GetLogger()
 	config := c.GetData().(*StarRocksConfig)
-	if config.SourceDsn != "" && config.SourceType != "" {
-		var o *gorm.DB
-		var err error
-		if config.SourceType == "mysql" {
-			o, err = gorm.Open(mysql.Open(config.SourceDsn))
-			if err != nil {
-				return errors.Convert(err)
-			}
-		} else if config.SourceType == "postgres" {
-			o, err = gorm.Open(postgres.Open(config.SourceDsn))
-			if err != nil {
-				return errors.Convert(err)
-			}
-		} else {
-			return errors.NotFound.New(fmt.Sprintf("unsupported source type %s", config.SourceType))
-		}
-		db = dalgorm.NewDalgorm(o)
-		sqlDB, err := o.DB()
-		if err != nil {
-			return errors.Convert(err)
-		}
-		defer sqlDB.Close()
-	} else {
-		db = c.GetDal()
+	// 1. Get db instance
+	db, err := getDbInstance(c)
+	if err != nil {
+		return errors.Convert(err)
 	}
-	var starrocksTables []string
-	if config.DomainLayer != "" {
-		starrocksTables = utils.GetTablesByDomainLayer(config.DomainLayer)
-		if starrocksTables == nil {
-			return errors.NotFound.New(fmt.Sprintf("no table found by domain layer: %s", config.DomainLayer))
-		}
-	} else {
-		tables := config.Tables
-		allTables, err := db.AllTables()
-		if err != nil {
-			return err
-		}
-		if len(tables) == 0 {
-			starrocksTables = allTables
-		} else {
-			for _, table := range allTables {
-				for _, r := range tables {
-					var ok bool
-					ok, err = errors.Convert01(regexp.Match(r, []byte(table)))
-					if err != nil {
-						return err
-					}
-					if ok {
-						starrocksTables = append(starrocksTables, table)
-					}
-				}
-			}
-		}
+	// 2. Filter out the tables to import
+	starrocksTables, err := getImportTables(c, db)

Review Comment:
   `getExportingTables` maybe



##########
backend/plugins/starrocks/tasks/tasks.go:
##########
@@ -48,187 +49,96 @@ func (t *Table) TableName() string {
 	return t.name
 }
 
-func LoadData(c plugin.SubTaskContext) errors.Error {
-	var db dal.Dal
+func ExportData(c plugin.SubTaskContext) errors.Error {
+	logger := c.GetLogger()
 	config := c.GetData().(*StarRocksConfig)
-	if config.SourceDsn != "" && config.SourceType != "" {
-		var o *gorm.DB
-		var err error
-		if config.SourceType == "mysql" {
-			o, err = gorm.Open(mysql.Open(config.SourceDsn))
-			if err != nil {
-				return errors.Convert(err)
-			}
-		} else if config.SourceType == "postgres" {
-			o, err = gorm.Open(postgres.Open(config.SourceDsn))
-			if err != nil {
-				return errors.Convert(err)
-			}
-		} else {
-			return errors.NotFound.New(fmt.Sprintf("unsupported source type %s", config.SourceType))
-		}
-		db = dalgorm.NewDalgorm(o)
-		sqlDB, err := o.DB()
-		if err != nil {
-			return errors.Convert(err)
-		}
-		defer sqlDB.Close()
-	} else {
-		db = c.GetDal()
+	// 1. Get db instance
+	db, err := getDbInstance(c)
+	if err != nil {
+		return errors.Convert(err)
 	}
-	var starrocksTables []string
-	if config.DomainLayer != "" {
-		starrocksTables = utils.GetTablesByDomainLayer(config.DomainLayer)
-		if starrocksTables == nil {
-			return errors.NotFound.New(fmt.Sprintf("no table found by domain layer: %s", config.DomainLayer))
-		}
-	} else {
-		tables := config.Tables
-		allTables, err := db.AllTables()
-		if err != nil {
-			return err
-		}
-		if len(tables) == 0 {
-			starrocksTables = allTables
-		} else {
-			for _, table := range allTables {
-				for _, r := range tables {
-					var ok bool
-					ok, err = errors.Convert01(regexp.Match(r, []byte(table)))
-					if err != nil {
-						return err
-					}
-					if ok {
-						starrocksTables = append(starrocksTables, table)
-					}
-				}
-			}
-		}
+	// 2. Filter out the tables to import
+	starrocksTables, err := getImportTables(c, db)
+	if err != nil {
+		return errors.Convert(err)
 	}
-
-	starrocks, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", config.User, config.Password, config.Host, config.Port, config.Database))
+	// 3. put devlake data to starrocks
+	sr, err := gorm.Open(mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", config.User, config.Password, config.Host, config.Port, config.Database)))
 	if err != nil {
 		return errors.Convert(err)
 	}
-	defer starrocks.Close()
+	starrocksDb := dalgorm.NewDalgorm(sr)
 
 	for _, table := range starrocksTables {
+		select {
+		case <-c.GetContext().Done():
+			return errors.Convert(c.GetContext().Err())
+		default:
+		}
 		starrocksTable := strings.TrimLeft(table, "_")
 		starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
-		var columnMap map[string]string
-		var orderBy string
-		var skip bool
-		columnMap, orderBy, skip, err = createTmpTable(starrocks, db, starrocksTable, starrocksTmpTable, table, c, config)
+		columnMap, orderBy, skip, err := createTmpTable(c, starrocksDb, db, starrocksTable, starrocksTmpTable, table)

Review Comment:
   How about `createTmpTableInStarrocks`?



##########
backend/plugins/starrocks/tasks/tasks.go:
##########
@@ -48,187 +49,96 @@ func (t *Table) TableName() string {
 	return t.name
 }
 
-func LoadData(c plugin.SubTaskContext) errors.Error {
-	var db dal.Dal
+func ExportData(c plugin.SubTaskContext) errors.Error {
+	logger := c.GetLogger()
 	config := c.GetData().(*StarRocksConfig)
-	if config.SourceDsn != "" && config.SourceType != "" {
-		var o *gorm.DB
-		var err error
-		if config.SourceType == "mysql" {
-			o, err = gorm.Open(mysql.Open(config.SourceDsn))
-			if err != nil {
-				return errors.Convert(err)
-			}
-		} else if config.SourceType == "postgres" {
-			o, err = gorm.Open(postgres.Open(config.SourceDsn))
-			if err != nil {
-				return errors.Convert(err)
-			}
-		} else {
-			return errors.NotFound.New(fmt.Sprintf("unsupported source type %s", config.SourceType))
-		}
-		db = dalgorm.NewDalgorm(o)
-		sqlDB, err := o.DB()
-		if err != nil {
-			return errors.Convert(err)
-		}
-		defer sqlDB.Close()
-	} else {
-		db = c.GetDal()
+	// 1. Get db instance
+	db, err := getDbInstance(c)
+	if err != nil {
+		return errors.Convert(err)
 	}
-	var starrocksTables []string
-	if config.DomainLayer != "" {
-		starrocksTables = utils.GetTablesByDomainLayer(config.DomainLayer)
-		if starrocksTables == nil {
-			return errors.NotFound.New(fmt.Sprintf("no table found by domain layer: %s", config.DomainLayer))
-		}
-	} else {
-		tables := config.Tables
-		allTables, err := db.AllTables()
-		if err != nil {
-			return err
-		}
-		if len(tables) == 0 {
-			starrocksTables = allTables
-		} else {
-			for _, table := range allTables {
-				for _, r := range tables {
-					var ok bool
-					ok, err = errors.Convert01(regexp.Match(r, []byte(table)))
-					if err != nil {
-						return err
-					}
-					if ok {
-						starrocksTables = append(starrocksTables, table)
-					}
-				}
-			}
-		}
+	// 2. Filter out the tables to import
+	starrocksTables, err := getImportTables(c, db)
+	if err != nil {
+		return errors.Convert(err)
 	}
-
-	starrocks, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", config.User, config.Password, config.Host, config.Port, config.Database))
+	// 3. put devlake data to starrocks
+	sr, err := gorm.Open(mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", config.User, config.Password, config.Host, config.Port, config.Database)))
 	if err != nil {
 		return errors.Convert(err)
 	}
-	defer starrocks.Close()
+	starrocksDb := dalgorm.NewDalgorm(sr)
 
 	for _, table := range starrocksTables {
+		select {
+		case <-c.GetContext().Done():
+			return errors.Convert(c.GetContext().Err())
+		default:
+		}
 		starrocksTable := strings.TrimLeft(table, "_")
 		starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
-		var columnMap map[string]string
-		var orderBy string
-		var skip bool
-		columnMap, orderBy, skip, err = createTmpTable(starrocks, db, starrocksTable, starrocksTmpTable, table, c, config)
+		columnMap, orderBy, skip, err := createTmpTable(c, starrocksDb, db, starrocksTable, starrocksTmpTable, table)
 		if skip {
-			c.GetLogger().Info(fmt.Sprintf("table %s is up to date, so skip it", table))
+			logger.Info(fmt.Sprintf("table %s is up to date, so skip it", table))
 			continue
 		}
 		if err != nil {
-			c.GetLogger().Error(err, "create table %s in starrocks error", table)
+			logger.Error(err, "create table %s in starrocks error", table)
 			return errors.Convert(err)
 		}
-		if db.Dialect() == "postgres" {
-			err = db.Exec("begin transaction isolation level repeatable read")
-			if err != nil {
-				return errors.Convert(err)
-			}
-		} else if db.Dialect() == "mysql" {
-			err = db.Exec("set session transaction isolation level repeatable read")
-			if err != nil {
-				return errors.Convert(err)
-			}
-			err = errors.Convert(db.Exec("start transaction"))
-			if err != nil {
-				return errors.Convert(err)
-			}
-		} else {
-			return errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
-		}
-		err = errors.Convert(loadData(starrocks, c, starrocksTable, starrocksTmpTable, table, columnMap, db, config, orderBy))
-		if err != nil {
-			return errors.Convert(err)
-		}
-		err = errors.Convert(db.Exec("commit"))
+		err = putDataToDst(c, starrocksDb, db, starrocksTable, starrocksTmpTable, table, columnMap, orderBy)

Review Comment:
   `copyDataToDst`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@devlake.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org