You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by "mindlesscloud (via GitHub)" <gi...@apache.org> on 2023/02/23 07:24:22 UTC

[GitHub] [incubator-devlake] mindlesscloud commented on a diff in pull request #4475: refactor: refactor starrocks, improve performance, solve bugs

mindlesscloud commented on code in PR #4475:
URL: https://github.com/apache/incubator-devlake/pull/4475#discussion_r1115310568


##########
backend/plugins/starrocks/tasks/tasks.go:
##########
@@ -271,182 +181,243 @@ func createTmpTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, starro
 			extra = v
 		}
 	}
-	tableSql := fmt.Sprintf("drop table if exists %s; create table if not exists `%s` ( %s ) %s", starrocksTmpTable, starrocksTmpTable, strings.Join(columns, ","), extra)
-	c.GetLogger().Debug(tableSql)
-	_, err = errors.Convert01(starrocks.Exec(tableSql))
+	tableSql := fmt.Sprintf("DROP TABLE IF EXISTS %s; CREATE TABLE IF NOT EXISTS `%s` ( %s ) %s", starrocksTmpTable, starrocksTmpTable, strings.Join(columns, ","), extra)
+	logger.Debug(tableSql)
+	err = starrocksDb.Exec(tableSql)
 	return columnMap, orderBy, false, err
 }
 
-func loadData(starrocks *sql.DB, c plugin.SubTaskContext, starrocksTable, starrocksTmpTable, table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig, orderBy string) error {
-	offset := 0
+// put data to final dst database
+func putDataToDst(c plugin.SubTaskContext, starrocksDb, db dal.Dal, starrocksTable, starrocksTmpTable, table string, columnMap map[string]string, orderBy string) error {
+	logger := c.GetLogger()
+	config := c.GetData().(*StarRocksConfig)
+	var offset int
 	var err error
-	for {
-		var data []map[string]interface{}
-		// select data from db
-		err = func() error {
-			var rows dal.Rows
-			rows, err = db.Cursor(
-				dal.From(table),
-				dal.Orderby(orderBy),
-				dal.Limit(config.BatchSize),
-				dal.Offset(offset),
-			)
-			if err != nil {
-				return err
-			}
-			defer rows.Close()
-			cols, err := rows.Columns()
-			if err != nil {
-				return err
-			}
-			for rows.Next() {
-				row := make(map[string]interface{})
-				columns := make([]interface{}, len(cols))
-				columnPointers := make([]interface{}, len(cols))
-				for i := range columns {
-					dataType := columnMap[cols[i]]
-					if strings.HasPrefix(dataType, "array") {
-						var arr []string
-						columns[i] = &arr
-						columnPointers[i] = pq.Array(&arr)
-					} else {
-						columnPointers[i] = &columns[i]
-					}
-				}
-				err = rows.Scan(columnPointers...)
-				if err != nil {
-					return err
-				}
-				for i, colName := range cols {
-					row[colName] = columns[i]
-				}
-				data = append(data, row)
+	var rows dal.Rows
+
+	rows, err = db.Cursor(
+		dal.From(table),
+		dal.Orderby(orderBy),
+	)
+	if err != nil {
+		return err
+	}
+	defer rows.Close()
+
+	var data []map[string]interface{}
+	cols, err := (rows).Columns()
+	if err != nil {
+		return err
+	}
+
+	var batchCount int
+	for rows.Next() {
+		select {
+		case <-c.GetContext().Done():
+			return c.GetContext().Err()
+		default:
+		}
+		row := make(map[string]interface{})
+		columns := make([]interface{}, len(cols))
+		columnPointers := make([]interface{}, len(cols))
+		for i := range columns {
+			dataType := columnMap[cols[i]]
+			if strings.HasPrefix(dataType, "array") {
+				var arr []string
+				columns[i] = &arr
+				columnPointers[i] = pq.Array(&arr)
+			} else {
+				columnPointers[i] = &columns[i]
 			}
-			return nil
-		}()
-		if err != nil {
-			return err
-		}
-		if len(data) == 0 {
-			c.GetLogger().Warn(nil, "no data found in table %s already, limit: %d, offset: %d, so break", table, config.BatchSize, offset)
-			break
-		}
-		// insert data to tmp table
-		loadURL := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", config.BeHost, config.BePort, config.Database, starrocksTmpTable)
-		headers := map[string]string{
-			"format":            "json",
-			"strip_outer_array": "true",
-			"Expect":            "100-continue",
-			"ignore_json_size":  "true",
-			"Connection":        "close",
-		}
-		jsonData, err := json.Marshal(data)
-		if err != nil {
-			return err
-		}
-		client := http.Client{
-			CheckRedirect: func(req *http.Request, via []*http.Request) error {
-				return http.ErrUseLastResponse
-			},
 		}
-		req, err := http.NewRequest(http.MethodPut, loadURL, bytes.NewBuffer(jsonData))
+		err = rows.Scan(columnPointers...)
 		if err != nil {
 			return err
 		}
-		req.SetBasicAuth(config.User, config.Password)
-		for k, v := range headers {
-			req.Header.Set(k, v)
-		}
-		resp, err := client.Do(req)
-		if err != nil {
-			return err
+		for i, colName := range cols {
+			row[colName] = columns[i]
 		}
-		if resp.StatusCode == 307 {
-			var location *url.URL
-			location, err = resp.Location()
+		data = append(data, row)
+		batchCount += 1
+		if batchCount == config.BatchSize {
+			err = putBatchData(c, starrocksTmpTable, table, data, config, offset)
 			if err != nil {
 				return err
 			}
-			req, err = http.NewRequest(http.MethodPut, location.String(), bytes.NewBuffer(jsonData))
-			if err != nil {
-				return err
-			}
-			req.SetBasicAuth(config.User, config.Password)
-			for k, v := range headers {
-				req.Header.Set(k, v)
-			}
-			resp, err = client.Do(req)
-		}
-		if err != nil {
-			return err
+			batchCount = 0
+			data = nil
 		}
-		b, err := io.ReadAll(resp.Body)
-		if err != nil {
-			return err
-		}
-		var result map[string]interface{}
-		err = json.Unmarshal(b, &result)
+	}
+	if batchCount != 0 {
+		err = putBatchData(c, starrocksTmpTable, table, data, config, offset)
 		if err != nil {
 			return err
 		}
-		if resp.StatusCode != http.StatusOK {
-			c.GetLogger().Error(nil, "[%s]: %s", resp.StatusCode, string(b))
-		}
-		if result["Status"] != "Success" {
-			c.GetLogger().Error(nil, "load %s failed: %s", table, string(b))
-		} else {
-			c.GetLogger().Debug("load %s success: %s, limit: %d, offset: %d", table, b, config.BatchSize, offset)
-		}
-		offset += len(data)
 	}
+
 	// drop old table
-	_, err = starrocks.Exec(fmt.Sprintf("drop table if exists %s", starrocksTable))
+	err = starrocksDb.Exec("DROP TABLE IF EXISTS ?", clause.Table{Name: starrocksTable})
 	if err != nil {
 		return err
 	}
 	// rename tmp table to old table
-	_, err = starrocks.Exec(fmt.Sprintf("alter table %s rename %s", starrocksTmpTable, starrocksTable))
+	err = starrocksDb.Exec("ALTER TABLE ? RENAME ?", clause.Table{Name: starrocksTmpTable}, clause.Table{Name: starrocksTable})
 	if err != nil {
 		return err
 	}
+
 	// check data count
-	rows, err := db.Cursor(
-		dal.Select("count(*)"),
-		dal.From(table),
-	)
+	sourceCount, err := db.Count(dal.From(table))
 	if err != nil {
 		return err
 	}
-	defer rows.Close()
-	var sourceCount int
-	for rows.Next() {
-		err = rows.Scan(&sourceCount)
-		if err != nil {
-			return err
-		}
+	starrocksCount, err := starrocksDb.Count(dal.From(starrocksTable))
+	if err != nil {
+		return err
+	}
+	if sourceCount != starrocksCount {
+		logger.Warn(nil, "source count %d not equal to starrocks count %d", sourceCount, starrocksCount)
+	}
+	logger.Info("load %s to starrocks success", table)
+	return nil
+}
+
+// put batch size data to database
+func putBatchData(c plugin.SubTaskContext, starrocksTmpTable, table string, data []map[string]interface{}, config *StarRocksConfig, offset int) error {
+	logger := c.GetLogger()
+	// insert data to tmp table
+	loadURL := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", config.BeHost, config.BePort, config.Database, starrocksTmpTable)
+	headers := map[string]string{
+		"format":            "json",
+		"strip_outer_array": "true",
+		"Expect":            "100-continue",
+		"ignore_json_size":  "true",
+		"Connection":        "close",
+	}
+	jsonData, err := json.Marshal(data)
+	if err != nil {
+		return err
+	}
+	client := http.Client{
+		CheckRedirect: func(req *http.Request, via []*http.Request) error {
+			return http.ErrUseLastResponse
+		},
 	}
-	rowsStarRocks, err := starrocks.Query(fmt.Sprintf("select count(*) from %s", starrocksTable))
+	req, err := http.NewRequest(http.MethodPut, loadURL, bytes.NewBuffer(jsonData))
 	if err != nil {
 		return err
 	}
-	defer rowsStarRocks.Close()
-	var starrocksCount int
-	for rowsStarRocks.Next() {
-		err = rowsStarRocks.Scan(&starrocksCount)
+	req.SetBasicAuth(config.User, config.Password)
+	for k, v := range headers {
+		req.Header.Set(k, v)
+	}
+	resp, err := client.Do(req)
+	if err != nil {
+		return err
+	}

Review Comment:
   Don't forget to close the resp.Body 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@devlake.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org