You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2022/06/08 06:50:32 UTC

[incubator-devlake] branch main updated: Refactor `ApiCollector` to simplify deadlock/canceling logic PoC (#2053)

This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 1be33a5b Refactor `ApiCollector` to simplify deadlock/canceling logic PoC (#2053)
1be33a5b is described below

commit 1be33a5bc600771005390f17a4bc39c360eb8cf5
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Wed Jun 8 14:50:28 2022 +0800

    Refactor `ApiCollector` to simplify deadlock/canceling logic PoC (#2053)
    
    * refactor: simplify api collector deadlock logic
    
    * refactor: improve dal sql expression
    
    * fix: jira users collection
    
    * feat: setup mockery
    
     1. remove mock files from repo
     2. generate mock files before unit-test
    
    * fix: avoid `<nil>` appearing in `.env`
    
    * refactor: added debug logging to print `since`
    
    * refactor: backup old `gomonkey` unit-tests
    
    * refactor: relocate helpers to avoid cycle import
    
    * fix: unit test
    
    * feat: auto build lake-builder on tags `builder-v*`
    
    * feat: front-lint run only for config-ui folder and pr
    
    * fix: update tester image
    
    * fix: remove invalid escaping
    
    * fix: config unit test
    
    * fix: rebase
    
    * refactor: connection helper
    
    * fix: make mock error messages
    
    * fix: linting
    
    * fix: linting
---
 .github/workflows/build-builder.yml                |   24 +
 .github/workflows/build.yml                        |    2 +-
 .github/workflows/front-end-lint.yml               |    6 +-
 .github/workflows/golangci-lint.yml                |    2 +
 .github/workflows/test-e2e.yml                     |    2 +-
 .github/workflows/test.yml                         |    2 +-
 .gitignore                                         |    5 +-
 Makefile                                           |    8 +-
 config/config.go                                   |    7 +-
 config/config_test.go                              |    4 +-
 devops/lake-builder/Dockerfile                     |    6 +-
 .../e2ehelper}/data_flow_tester.go                 |    7 +-
 .../e2ehelper}/data_flow_tester_test.go            |    2 +-
 .../pluginhelper}/csv_file_iterator.go             |    2 +-
 .../pluginhelper}/csv_file_iterator_test.go        |    2 +-
 main.go => helpers/unithelper/dummy_logger.go      |   18 +-
 .../unithelper/dummy_subtaskcontext.go             |   17 +-
 impl/dalgorm/dalgorm.go                            |  117 ++
 main.go                                            |   13 +
 models/migrationscripts/register.go                |    6 +-
 main.go => plugins/core/config.go                  |   13 +-
 plugins/core/dal/dal.go                            |  107 ++
 plugins/core/logger.go                             |    4 +
 plugins/core/plugin_task.go                        |   13 +-
 plugins/gitlab/e2e/project_test.go                 |    4 +-
 plugins/helper/api_async_client.go                 |   86 +-
 ...client_test.go => api_async_client_test.go.old} |    0
 plugins/helper/api_client_test.go                  |  122 --
 plugins/helper/api_collector.go                    |  492 +++-----
 .../helper/api_collector_func.go                   |   52 +-
 plugins/helper/api_collector_test.go               | 1185 +-------------------
 plugins/helper/api_extractor.go                    |    5 +-
 ...extractor_test.go => api_extractor_test.go.old} |    0
 ...ider_test.go => batch_save_divider_test.go.old} |    0
 .../{batch_save_test.go => batch_save_test.go.old} |    0
 .../callbacks.go}                                  |    6 +-
 plugins/helper/connection.go                       |  244 ++--
 plugins/helper/connection_test.go                  |  183 ++-
 ...onvertor_test.go => data_convertor_test.go.old} |    0
 plugins/helper/default_task_context.go             |   64 +-
 plugins/helper/iterator.go                         |   38 +
 plugins/helper/worker_scheduler.go                 |  200 ++--
 plugins/helper/worker_scheduler_test.go            |   56 +-
 plugins/jira/api/connection.go                     |   48 +-
 plugins/jira/api/init.go                           |   13 +-
 plugins/jira/api/proxy.go                          |   16 +-
 plugins/jira/jira.go                               |   18 +-
 .../migrationscripts/updateSchemas20220601.go      |   55 +-
 plugins/jira/tasks/api_client.go                   |    6 -
 plugins/jira/tasks/changelog_collector.go          |   28 +-
 plugins/jira/tasks/remotelink_collector.go         |   32 +-
 plugins/jira/tasks/user_collector.go               |   12 +-
 plugins/jira/tasks/worklog_collector.go            |   31 +-
 runner/directrun.go                                |    2 +-
 runner/loader.go                                   |    3 +-
 plugins/jira/api/init.go => runner/migration.go    |   19 +-
 runner/run_task.go                                 |    2 +-
 services/init.go                                   |    6 +-
 utils/callframes.go                                |    4 +-
 59 files changed, 1214 insertions(+), 2207 deletions(-)

diff --git a/.github/workflows/build-builder.yml b/.github/workflows/build-builder.yml
new file mode 100644
index 00000000..27b7861b
--- /dev/null
+++ b/.github/workflows/build-builder.yml
@@ -0,0 +1,24 @@
+name: Build-BuilderImage-Push-Docker
+on:
+  push:
+    tags:
+    - 'builder-*'
+jobs:
+  build-lake:
+    name: Build lake-builder image
+    runs-on: ubuntu-20.04
+    steps:
+    - uses: actions/checkout@v2
+    - uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9
+      with:
+        username: ${{ secrets.DOCKER_REGISTRY_USERNAME }}
+        password: ${{ secrets.DOCKER_REGISTRY_PASSWORD }}
+    - name: Build lake image
+      run: |
+        cd devops/lake-builder/
+        export IMAGE_LAKE=mericodev/lake-builder
+        export IMAGE_VER=${GITHUB_REF:18}
+        docker build -t $IMAGE_LAKE:latest --file ./Dockerfile .
+        docker tag $IMAGE_LAKE:latest $IMAGE_LAKE:$IMAGE_VER
+        docker push $IMAGE_LAKE:$IMAGE_VER
+        docker push $IMAGE_LAKE:latest
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 5544bfa5..f7395585 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -2,7 +2,7 @@ name: Build-Images-Push-Docker
 on:
   push:
     tags:
-    - '*'
+    - 'v*'
 jobs:
   build-lake:
     name: Build lake image
diff --git a/.github/workflows/front-end-lint.yml b/.github/workflows/front-end-lint.yml
index 5bc8fd62..81debb42 100644
--- a/.github/workflows/front-end-lint.yml
+++ b/.github/workflows/front-end-lint.yml
@@ -1,5 +1,9 @@
 name: Frontend-Lint
-on: push
+on:
+  pull_request:
+    branches: [ main ]
+    paths:
+      - 'config-ui/**'
 jobs:
   build:
     runs-on: ubuntu-latest
diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml
index 856344b7..1fe0ba2b 100644
--- a/.github/workflows/golangci-lint.yml
+++ b/.github/workflows/golangci-lint.yml
@@ -19,6 +19,8 @@ jobs:
         with:
           path: golangci-lint
           key: ${{ runner.os }}-golangci-lint
+      - name: generate mock
+        run: make mock
       - name: golangci-lint
         uses: golangci/golangci-lint-action@v3
         with:
diff --git a/.github/workflows/test-e2e.yml b/.github/workflows/test-e2e.yml
index bf4c4d8c..1a083811 100644
--- a/.github/workflows/test-e2e.yml
+++ b/.github/workflows/test-e2e.yml
@@ -24,7 +24,7 @@ jobs:
           MYSQL_USER: merico
           MYSQL_PASSWORD: merico
           MYSQL_ROOT_PASSWORD: root
-    container: mericodev/lake-builder:0.0.5
+    container: mericodev/lake-builder:v0.0.5
     steps:
       - uses: actions/checkout@v3
       - name: Cache test-e2e
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 7fa02e0f..2c6f5e51 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -17,7 +17,7 @@ jobs:
         os: [ubuntu-latest]
     runs-on: ${{ matrix.os }}
     container:
-      image: mericodev/lake-builder:0.0.5
+      image: mericodev/lake-builder:v0.0.5
     steps:
     - name: Checkout code
       uses: actions/checkout@v3
diff --git a/.gitignore b/.gitignore
index 8c3f60d0..408e6e1a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -148,6 +148,9 @@ bin
 libgit2
 .air.toml
 
-#swagger .json and .yaml
+# swagger .json and .yaml
 api/docs/swagger.json
 api/docs/swagger.yaml
+
+# mocks
+mocks/
diff --git a/Makefile b/Makefile
index 186ef726..17ff5d67 100644
--- a/Makefile
+++ b/Makefile
@@ -51,9 +51,15 @@ configure-dev:
 commit:
 	git cz
 
+mock:
+	rm -rf mocks
+	mockery --dir=./plugins/core --unroll-variadic=false --name='.*'
+	mockery --dir=./plugins/core/dal --unroll-variadic=false --name='.*'
+	mockery --dir=./plugins/helper --unroll-variadic=false --name='.*'
+
 test: unit-test e2e-test
 
-unit-test: build
+unit-test: mock build
 	set -e; for m in $$(go list ./... | egrep -v 'test|models|e2e'); do echo $$m; go test -timeout 60s -gcflags=all=-l -v $$m; done
 
 e2e-test: build
diff --git a/config/config.go b/config/config.go
index 39126e4a..4ecaedb6 100644
--- a/config/config.go
+++ b/config/config.go
@@ -70,11 +70,14 @@ func replaceNewEnvItemInOldContent(v *viper.Viper, envFileContent string) (error
 			switch ret := val.(type) {
 			case string:
 				ret = strings.Replace(ret, `\`, `\\`, -1)
-				ret = strings.Replace(ret, `=`, `\=`, -1)
-				ret = strings.Replace(ret, `'`, `\'`, -1)
+				//ret = strings.Replace(ret, `=`, `\=`, -1)
+				//ret = strings.Replace(ret, `'`, `\'`, -1)
 				ret = strings.Replace(ret, `"`, `\"`, -1)
 				return fmt.Sprintf(`%v="%v"`, envName, ret)
 			default:
+				if val == nil {
+					return fmt.Sprintf(`%v=`, envName)
+				}
 				return fmt.Sprintf(`%v="%v"`, envName, ret)
 			}
 		})
diff --git a/config/config_test.go b/config/config_test.go
index f46bb341..41f63be8 100644
--- a/config/config_test.go
+++ b/config/config_test.go
@@ -108,11 +108,11 @@ some unuseful message
 a blank
 AA="aaaa"
 BB="1#1"
-CC="1\"\'1"
+CC="1\"'1"
 DD="1\\\"1"
 
 # some comment
-EE="\="
+EE="="
 FF="1.01"
 GGG="gggg"
 H.278="278"
diff --git a/devops/lake-builder/Dockerfile b/devops/lake-builder/Dockerfile
index 05ec5f78..e81a8cc2 100644
--- a/devops/lake-builder/Dockerfile
+++ b/devops/lake-builder/Dockerfile
@@ -16,8 +16,6 @@
 
 # current tag: mericodev/lake-builder:0.0.5
 FROM golang:1.17-alpine3.15 as builder
-#RUN set -eux && sed -i 's/dl-cdn.alpinelinux.org/mirrors.ustc.edu.cn/g' /etc/apk/repositories
-RUN apk update
-RUN apk upgrade
 #RUN apk add --update gcc=130.2.1_pre1-r3 g++=10.2.1_pre1-r3
-RUN apk add --no-cache tzdata libgit2-dev gcc g++ make tar
+RUN apk update && apk upgrade && apk add --no-cache tzdata libgit2-dev gcc g++ make
+RUN go install github.com/vektra/mockery/v2@latest
diff --git a/testhelper/data_flow_tester.go b/helpers/e2ehelper/data_flow_tester.go
similarity index 96%
rename from testhelper/data_flow_tester.go
rename to helpers/e2ehelper/data_flow_tester.go
index 555073ba..df9950b4 100644
--- a/testhelper/data_flow_tester.go
+++ b/helpers/e2ehelper/data_flow_tester.go
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package testhelper
+package e2ehelper
 
 import (
 	"context"
@@ -24,6 +24,7 @@ import (
 	"time"
 
 	"github.com/apache/incubator-devlake/config"
+	"github.com/apache/incubator-devlake/helpers/pluginhelper"
 	"github.com/apache/incubator-devlake/logger"
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/apache/incubator-devlake/plugins/helper"
@@ -86,7 +87,7 @@ func NewDataFlowTester(t *testing.T, pluginName string, pluginMeta core.PluginMe
 
 // ImportCsv imports records from specified csv file into target table, note that existing data would be deleted first.
 func (t *DataFlowTester) ImportCsv(csvRelPath string, tableName string) {
-	csvIter := NewCsvFileIterator(csvRelPath)
+	csvIter := pluginhelper.NewCsvFileIterator(csvRelPath)
 	defer csvIter.Close()
 	// create table if not exists
 	err := t.Db.Table(tableName).AutoMigrate(&helper.RawData{})
@@ -127,7 +128,7 @@ func (t *DataFlowTester) Subtask(subtaskMeta core.SubTaskMeta, taskData interfac
 // Primary Key Fields with `pkfields` so DataFlowTester could select the exact record from database, as well as which
 // fields to compare with by specifying `targetfields` parameter.
 func (t *DataFlowTester) VerifyTable(tableName string, csvRelPath string, pkfields []string, targetfields []string) {
-	csvIter := NewCsvFileIterator(csvRelPath)
+	csvIter := pluginhelper.NewCsvFileIterator(csvRelPath)
 	defer csvIter.Close()
 
 	var expectedTotal int64
diff --git a/testhelper/data_flow_tester_test.go b/helpers/e2ehelper/data_flow_tester_test.go
similarity index 99%
rename from testhelper/data_flow_tester_test.go
rename to helpers/e2ehelper/data_flow_tester_test.go
index db1776f5..4348debf 100644
--- a/testhelper/data_flow_tester_test.go
+++ b/helpers/e2ehelper/data_flow_tester_test.go
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package testhelper
+package e2ehelper
 
 import (
 	"testing"
diff --git a/testhelper/csv_file_iterator.go b/helpers/pluginhelper/csv_file_iterator.go
similarity index 99%
rename from testhelper/csv_file_iterator.go
rename to helpers/pluginhelper/csv_file_iterator.go
index 55b228b8..2de777e0 100644
--- a/testhelper/csv_file_iterator.go
+++ b/helpers/pluginhelper/csv_file_iterator.go
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package testhelper
+package pluginhelper
 
 import (
 	"encoding/csv"
diff --git a/testhelper/csv_file_iterator_test.go b/helpers/pluginhelper/csv_file_iterator_test.go
similarity index 97%
rename from testhelper/csv_file_iterator_test.go
rename to helpers/pluginhelper/csv_file_iterator_test.go
index 9db0c587..9c6c6394 100644
--- a/testhelper/csv_file_iterator_test.go
+++ b/helpers/pluginhelper/csv_file_iterator_test.go
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package testhelper
+package pluginhelper
 
 func ExampleCsvFileIterator() {
 	iter := NewCsvFileIterator("/path/to/foobar.csv")
diff --git a/main.go b/helpers/unithelper/dummy_logger.go
similarity index 56%
copy from main.go
copy to helpers/unithelper/dummy_logger.go
index 918b738c..9ce45984 100644
--- a/main.go
+++ b/helpers/unithelper/dummy_logger.go
@@ -15,13 +15,21 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package main
+package unithelper
 
 import (
-	"github.com/apache/incubator-devlake/api"
-	_ "github.com/apache/incubator-devlake/version"
+	"github.com/apache/incubator-devlake/mocks"
+	"github.com/stretchr/testify/mock"
 )
 
-func main() {
-	api.CreateApiService()
+func DummyLogger() *mocks.Logger {
+	logger := new(mocks.Logger)
+	logger.On("IsLevelEnabled", mock.Anything).Return(false).Maybe()
+	logger.On("Printf", mock.Anything, mock.Anything).Maybe()
+	logger.On("Log", mock.Anything, mock.Anything, mock.Anything).Maybe()
+	logger.On("Debug", mock.Anything, mock.Anything).Maybe()
+	logger.On("Info", mock.Anything, mock.Anything).Maybe()
+	logger.On("Warn", mock.Anything, mock.Anything).Maybe()
+	logger.On("Error", mock.Anything, mock.Anything).Maybe()
+	return logger
 }
diff --git a/main.go b/helpers/unithelper/dummy_subtaskcontext.go
similarity index 60%
copy from main.go
copy to helpers/unithelper/dummy_subtaskcontext.go
index 918b738c..10a9fe3e 100644
--- a/main.go
+++ b/helpers/unithelper/dummy_subtaskcontext.go
@@ -15,13 +15,20 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package main
+package unithelper
 
 import (
-	"github.com/apache/incubator-devlake/api"
-	_ "github.com/apache/incubator-devlake/version"
+	"github.com/apache/incubator-devlake/mocks"
+	"github.com/apache/incubator-devlake/plugins/core/dal"
+	"github.com/stretchr/testify/mock"
 )
 
-func main() {
-	api.CreateApiService()
+func DummySubTaskContext(db dal.Dal) *mocks.SubTaskContext {
+	mockCtx := new(mocks.SubTaskContext)
+	mockCtx.On("GetDal").Return(db)
+	mockCtx.On("GetLogger").Return(DummyLogger())
+	mockCtx.On("SetProgress", mock.Anything, mock.Anything)
+	mockCtx.On("IncProgress", mock.Anything, mock.Anything)
+	mockCtx.On("GetName").Return("test")
+	return mockCtx
 }
diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
new file mode 100644
index 00000000..ce6e60bf
--- /dev/null
+++ b/impl/dalgorm/dalgorm.go
@@ -0,0 +1,117 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package dalgorm
+
+import (
+	"database/sql"
+
+	"github.com/apache/incubator-devlake/plugins/core/dal"
+	"gorm.io/gorm"
+	"gorm.io/gorm/clause"
+)
+
+type Dalgorm struct {
+	db *gorm.DB
+}
+
+// To accommodate gorm
+//type stubTable struct {
+//name string
+//}
+
+//func (s *stubTable) TableName() string {
+//return s.name
+//}
+
+func buildTx(tx *gorm.DB, clauses []interface{}) *gorm.DB {
+	for _, clause := range clauses {
+		switch clause := clause.(type) {
+		case *dal.JoinClause:
+			tx = tx.Joins(clause.Expr, clause.Params...)
+		case *dal.WhereClause:
+			tx = tx.Where(clause.Expr, clause.Params...)
+		case dal.OrderbyClause:
+			tx = tx.Order(string(clause))
+		case dal.LimitClause:
+			tx = tx.Limit(int(clause))
+		case dal.OffsetClause:
+			tx = tx.Offset(int(clause))
+		case dal.FromClause:
+			tx = tx.Table(string(clause))
+		case dal.SelectClause:
+			tx = tx.Select(string(clause))
+		}
+	}
+	return tx
+}
+
+var _ dal.Dal = (*Dalgorm)(nil)
+
+// Exec executes raw sql query
+func (d *Dalgorm) Exec(query string, params ...interface{}) error {
+	return d.db.Exec(query, params...).Error
+}
+
+// CreateTable creates a table with gorm definition from `entity`
+func (d *Dalgorm) AutoMigrate(entity interface{}, clauses ...interface{}) error {
+	return buildTx(d.db, clauses).AutoMigrate(entity)
+}
+
+// Cursor returns a database cursor, cursor is especially useful when handling big amount of rows of data
+func (d *Dalgorm) Cursor(clauses ...interface{}) (*sql.Rows, error) {
+	return buildTx(d.db, clauses).Rows()
+}
+
+// Fetch loads row data from `cursor` into `dst`
+func (d *Dalgorm) Fetch(cursor *sql.Rows, dst interface{}) error {
+	return d.db.ScanRows(cursor, dst)
+}
+
+// All loads matched rows from database to `dst`, USE IT WITH COUTIOUS!!
+func (d *Dalgorm) All(dst interface{}, clauses ...interface{}) error {
+	return buildTx(d.db, clauses).Find(dst).Error
+}
+
+// First loads first matched row from database to `dst`, error will be returned if no records were found
+func (d *Dalgorm) First(dst interface{}, clauses ...interface{}) error {
+	return buildTx(d.db, clauses).First(dst).Error
+}
+
+// Create insert record to database
+func (d *Dalgorm) Create(entity interface{}, clauses ...interface{}) error {
+	return buildTx(d.db, clauses).Create(entity).Error
+}
+
+// Update updates record
+func (d *Dalgorm) Update(entity interface{}, clauses ...interface{}) error {
+	return buildTx(d.db, clauses).Save(entity).Error
+}
+
+// CreateOrUpdate tries to create the record, or fallback to update all if failed
+func (d *Dalgorm) CreateOrUpdate(entity interface{}, clauses ...interface{}) error {
+	return buildTx(d.db, clauses).Clauses(clause.OnConflict{UpdateAll: true}).Create(entity).Error
+}
+
+// Delete records from database
+func (d *Dalgorm) Delete(entity interface{}, clauses ...interface{}) error {
+	return buildTx(d.db, clauses).Delete(entity).Error
+}
+
+func NewDalgorm(db *gorm.DB) *Dalgorm {
+	return &Dalgorm{db}
+}
diff --git a/main.go b/main.go
index 918b738c..7c3001f7 100644
--- a/main.go
+++ b/main.go
@@ -19,9 +19,22 @@ package main
 
 import (
 	"github.com/apache/incubator-devlake/api"
+	"github.com/apache/incubator-devlake/config"
+	"github.com/apache/incubator-devlake/plugins/core"
 	_ "github.com/apache/incubator-devlake/version"
 )
 
 func main() {
+	v := config.GetConfig()
+	encKey := v.GetString(core.EncodeKeyEnvStr)
+	if encKey == "" {
+		// Randomly generate a bunch of encryption keys and set them to config
+		encKey = core.RandomEncKey()
+		v.Set(core.EncodeKeyEnvStr, encKey)
+		err := config.WriteConfig(v)
+		if err != nil {
+			panic(err)
+		}
+	}
 	api.CreateApiService()
 }
diff --git a/models/migrationscripts/register.go b/models/migrationscripts/register.go
index ebbfcbd7..bb0896d7 100644
--- a/models/migrationscripts/register.go
+++ b/models/migrationscripts/register.go
@@ -20,12 +20,12 @@ package migrationscripts
 import "github.com/apache/incubator-devlake/migration"
 
 // RegisterAll register all the migration scripts of framework
-func RegisterAll() {
-	migration.Register([]migration.Script{
+func All() []migration.Script {
+	return []migration.Script{
 		new(initSchemas),
 		new(updateSchemas20220505), new(updateSchemas20220507), new(updateSchemas20220510),
 		new(updateSchemas20220513), new(updateSchemas20220524), new(updateSchemas20220526),
 		new(updateSchemas20220527), new(updateSchemas20220528), new(updateSchemas20220601),
 		new(updateSchemas20220602),
-	}, "Framework")
+	}
 }
diff --git a/main.go b/plugins/core/config.go
similarity index 83%
copy from main.go
copy to plugins/core/config.go
index 918b738c..97bf3ac1 100644
--- a/main.go
+++ b/plugins/core/config.go
@@ -15,13 +15,12 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package main
+package core
 
-import (
-	"github.com/apache/incubator-devlake/api"
-	_ "github.com/apache/incubator-devlake/version"
-)
+type ConfigGetter interface {
+	GetString(name string) string
+}
 
-func main() {
-	api.CreateApiService()
+type InjectConfigGetter interface {
+	SetConfigGetter(getter ConfigGetter)
 }
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
new file mode 100644
index 00000000..5318b6ca
--- /dev/null
+++ b/plugins/core/dal/dal.go
@@ -0,0 +1,107 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package dal
+
+import "database/sql"
+
+// Dal aims to facilitate an isolation of Database Access Layer by defining a set of operations should a
+// Database Access Layer provide
+// This is inroduced by the fact that mocking *gorm.DB is hard, and `gomonkey` is not working on macOS
+type Dal interface {
+	// Exec executes raw sql query
+	Exec(query string, params ...interface{}) error
+	// CreateTable creates a table with gorm definition from `entity`
+	AutoMigrate(entity interface{}, clauses ...interface{}) error
+	// Cursor returns a database cursor, cursor is especially useful when handling big amount of rows of data
+	Cursor(clauses ...interface{}) (*sql.Rows, error)
+	// Fetch loads row data from `cursor` into `dst`
+	Fetch(cursor *sql.Rows, dst interface{}) error
+	// All loads matched rows from database to `dst`, USE IT WITH COUTIOUS!!
+	All(dst interface{}, clauses ...interface{}) error
+	// First loads first matched row from database to `dst`, error will be returned if no records were found
+	First(dst interface{}, clauses ...interface{}) error
+	// Create insert record to database
+	Create(entity interface{}, clauses ...interface{}) error
+	// Update updates record
+	Update(entity interface{}, clauses ...interface{}) error
+	// CreateOrUpdate tries to create the record, or fallback to update all if failed
+	CreateOrUpdate(entity interface{}, clauses ...interface{}) error
+	// Delete records from database
+	Delete(entity interface{}, clauses ...interface{}) error
+}
+
+type dalClause struct {
+	Expr   string
+	Params []interface{}
+}
+
+// JoinClause represents a SQL `JOIN` clause
+type JoinClause dalClause
+
+// Join creates a new JoinClause
+func Join(clause string, params ...interface{}) *JoinClause {
+	return &JoinClause{clause, params}
+}
+
+// WhereClause represents a SQL `WHERE` clause
+type WhereClause dalClause
+
+// Where creates a new WhereClause
+func Where(clause string, params ...interface{}) *WhereClause {
+	return &WhereClause{clause, params}
+}
+
+// LimitClause represents a SQL `LIMIT` clause
+type LimitClause int
+
+// Limit creates a new LimitClause
+func Limit(limit int) LimitClause {
+	return LimitClause(limit)
+}
+
+// OffsetClause represents a SQL `OFFSET` clause
+type OffsetClause int
+
+// Offset creates a new OffsetClause
+func Offset(offset int) OffsetClause {
+	return OffsetClause(offset)
+}
+
+// FromClause represents a SQL `OFFSET` clause
+type FromClause string
+
+// From creates a new TableClause
+func From(table string) FromClause {
+	return FromClause(table)
+}
+
+// SelectClause represents a SQL `OFFSET` clause
+type SelectClause string
+
+// Select creates a new TableClause
+func Select(fields string) SelectClause {
+	return SelectClause(fields)
+}
+
+// OrderbyClause represents a SQL `ORDER BY` clause
+type OrderbyClause string
+
+// Orderby creates a new Orderby
+func Orderby(expr string) OrderbyClause {
+	return OrderbyClause(expr)
+}
diff --git a/plugins/core/logger.go b/plugins/core/logger.go
index 11155bb9..0b345426 100644
--- a/plugins/core/logger.go
+++ b/plugins/core/logger.go
@@ -40,3 +40,7 @@ type Logger interface {
 	// return a new logger which output nested log
 	Nested(name string) Logger
 }
+
+type InjectLogger interface {
+	SetLogger(logger Logger)
+}
diff --git a/plugins/core/plugin_task.go b/plugins/core/plugin_task.go
index bc1d9451..9ca9aec8 100644
--- a/plugins/core/plugin_task.go
+++ b/plugins/core/plugin_task.go
@@ -20,6 +20,7 @@ package core
 import (
 	"context"
 
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"gorm.io/gorm"
 )
 
@@ -41,13 +42,19 @@ type RunningProgress struct {
 	SubTaskNumber int
 }
 
+type BasicRes interface {
+	GetConfig(name string) string
+	GetLogger() Logger
+	// Deprecated: use dal instead
+	GetDb() *gorm.DB
+	GetDal() dal.Dal
+}
+
 // This interface define all resources that needed for task/subtask execution
 type ExecContext interface {
+	BasicRes
 	GetName() string
-	GetConfig(name string) string
-	GetDb() *gorm.DB
 	GetContext() context.Context
-	GetLogger() Logger
 	GetData() interface{}
 	SetProgress(current int, total int)
 	IncProgress(quantity int)
diff --git a/plugins/gitlab/e2e/project_test.go b/plugins/gitlab/e2e/project_test.go
index cccc01fa..32a7f02b 100644
--- a/plugins/gitlab/e2e/project_test.go
+++ b/plugins/gitlab/e2e/project_test.go
@@ -20,15 +20,15 @@ package e2e
 import (
 	"testing"
 
+	"github.com/apache/incubator-devlake/helpers/e2ehelper"
 	"github.com/apache/incubator-devlake/plugins/gitlab/impl"
 	"github.com/apache/incubator-devlake/plugins/gitlab/tasks"
-	"github.com/apache/incubator-devlake/testhelper"
 )
 
 func TestGitlabDataFlow(t *testing.T) {
 
 	var gitlab impl.Gitlab
-	dataflowTester := testhelper.NewDataFlowTester(t, "gitlab", gitlab)
+	dataflowTester := e2ehelper.NewDataFlowTester(t, "gitlab", gitlab)
 
 	taskData := &tasks.GitlabTaskData{
 		Options: &tasks.GitlabOptions{
diff --git a/plugins/helper/api_async_client.go b/plugins/helper/api_async_client.go
index 20d4f894..4add3dcf 100644
--- a/plugins/helper/api_async_client.go
+++ b/plugins/helper/api_async_client.go
@@ -28,11 +28,10 @@ import (
 	"time"
 
 	"github.com/apache/incubator-devlake/plugins/core"
+	"github.com/apache/incubator-devlake/plugins/helper/common"
 	"github.com/apache/incubator-devlake/utils"
 )
 
-type ApiAsyncCallback func(*http.Response, error) error
-
 var HttpMinStatusRetryCode = http.StatusBadRequest
 
 // ApiAsyncClient is built on top of ApiClient, to provide a asynchronous semantic
@@ -40,11 +39,13 @@ var HttpMinStatusRetryCode = http.StatusBadRequest
 // will be performed in parallel with rate-limit support
 type ApiAsyncClient struct {
 	*ApiClient
-	maxRetry  int
-	scheduler *WorkerScheduler
-	qps       float64
+	maxRetry     int
+	scheduler    *WorkerScheduler
+	hasError     bool
+	numOfWorkers int
 }
 
+// CreateAsyncApiClient creates a new ApiAsyncClient
 func CreateAsyncApiClient(
 	taskCtx core.TaskContext,
 	apiClient *ApiClient,
@@ -85,55 +86,69 @@ func CreateAsyncApiClient(
 	d := duration / RESPONSE_TIME
 	numOfWorkers := requests / int(d)
 
-	taskCtx.GetLogger().Info(
+	logger := taskCtx.GetLogger()
+	logger.Info(
 		"scheduler for api %s worker: %d, request: %d, duration: %v",
 		apiClient.GetEndpoint(),
 		numOfWorkers,
 		requests,
 		duration,
 	)
-	scheduler, err := NewWorkerScheduler(numOfWorkers, requests, duration, taskCtx.GetContext(), retry)
+	scheduler, err := NewWorkerScheduler(
+		numOfWorkers,
+		requests,
+		duration,
+		taskCtx.GetContext(),
+		retry,
+		logger,
+	)
 	if err != nil {
 		return nil, fmt.Errorf("failed to create scheduler: %w", err)
 	}
-	qps := float64(requests) / duration.Seconds()
 
 	// finally, wrap around api client with async sematic
 	return &ApiAsyncClient{
 		apiClient,
 		retry,
 		scheduler,
-		qps,
+		false,
+		numOfWorkers,
 	}, nil
 }
 
+// GetMaxRetry returns the maximum retry attempts for a request
 func (apiClient *ApiAsyncClient) GetMaxRetry() int {
 	return apiClient.maxRetry
 }
 
+// SetMaxRetry sets the maximum retry attempts for a request
 func (apiClient *ApiAsyncClient) SetMaxRetry(
 	maxRetry int,
 ) {
 	apiClient.maxRetry = maxRetry
 }
 
+// DoAsync would carry out a asynchronous request
 func (apiClient *ApiAsyncClient) DoAsync(
 	method string,
 	path string,
 	query url.Values,
 	body interface{},
 	header http.Header,
-	handler ApiAsyncCallback,
+	handler common.ApiAsyncCallback,
 	retry int,
-) error {
-	var subFunc func() error
-	subFunc = func() error {
+) {
+	var request func() error
+	request = func() error {
 		var err error
 		var res *http.Response
 		var body []byte
 		res, err = apiClient.Do(method, path, query, body, header)
+		// make sure response body is read successfully, or we might have to retry
 		if err == nil {
+			// make sure response.Body stream will be closed to avoid running out of file handle
 			defer func(body io.ReadCloser) { body.Close() }(res.Body)
+			// replace NetworkStream with MemoryBuffer
 			body, err = ioutil.ReadAll(res.Body)
 			if err == nil {
 				res.Body = io.NopCloser(bytes.NewBuffer(body))
@@ -153,18 +168,23 @@ func (apiClient *ApiAsyncClient) DoAsync(
 		if needRetry {
 			// check weather we still have retry times and not error from handler and canceled error
 			if retry < apiClient.maxRetry && err != context.Canceled {
-				apiClient.logError("retry #%d for %s", retry, err.Error())
+				apiClient.logger.Warn("retry #%d for %s", retry, err.Error())
 				retry++
-				return apiClient.scheduler.Submit(subFunc, apiClient.scheduler.subPool)
+				apiClient.scheduler.NextTick(request)
+				return nil
 			}
+		}
+
+		if err != nil {
+			apiClient.hasError = true
 			return err
 		}
 
 		// it is important to let handler have a chance to handle error, or it can hang indefinitely
 		// when error occurs
-		return handler(res, err)
+		return handler(res)
 	}
-	return apiClient.scheduler.Submit(subFunc)
+	apiClient.scheduler.SubmitBlocking(request)
 }
 
 // Enqueue an api get request, the request may be sent sometime in future in parallel with other api requests
@@ -172,32 +192,34 @@ func (apiClient *ApiAsyncClient) GetAsync(
 	path string,
 	query url.Values,
 	header http.Header,
-	handler ApiAsyncCallback,
-) error {
-	return apiClient.DoAsync(http.MethodGet, path, query, nil, header, handler, 0)
+	handler common.ApiAsyncCallback,
+) {
+	apiClient.DoAsync(http.MethodGet, path, query, nil, header, handler, 0)
 }
 
-// Wait until all async requests were done
+// WaitAsync blocks until all async requests were done
 func (apiClient *ApiAsyncClient) WaitAsync() error {
-	return apiClient.scheduler.WaitUntilFinish()
+	return apiClient.scheduler.Wait()
 }
 
-func (apiClient *ApiAsyncClient) GetQps() float64 {
-	return apiClient.qps
+func (apiClient *ApiAsyncClient) HasError() bool {
+	return apiClient.hasError
 }
-func (apiClient *ApiAsyncClient) Add(delta int) {
-	apiClient.scheduler.Add(delta)
+
+func (apiClient *ApiAsyncClient) NextTick(task func() error) {
+	apiClient.scheduler.NextTick(task)
 }
-func (apiClient *ApiAsyncClient) Done() {
-	apiClient.scheduler.Done()
+
+func (apiClient *ApiAsyncClient) GetNumOfWorkers() int {
+	return apiClient.numOfWorkers
 }
 
 type RateLimitedApiClient interface {
-	GetAsync(path string, query url.Values, header http.Header, handler ApiAsyncCallback) error
+	GetAsync(path string, query url.Values, header http.Header, handler common.ApiAsyncCallback)
 	WaitAsync() error
-	GetQps() float64
-	Add(delta int)
-	Done()
+	HasError() bool
+	NextTick(task func() error)
+	GetNumOfWorkers() int
 }
 
 var _ RateLimitedApiClient = (*ApiAsyncClient)(nil)
diff --git a/plugins/helper/api_async_client_test.go b/plugins/helper/api_async_client_test.go.old
similarity index 100%
rename from plugins/helper/api_async_client_test.go
rename to plugins/helper/api_async_client_test.go.old
diff --git a/plugins/helper/api_client_test.go b/plugins/helper/api_client_test.go
deleted file mode 100644
index 251179c1..00000000
--- a/plugins/helper/api_client_test.go
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package helper
-
-import (
-	"net/url"
-	"testing"
-
-	"github.com/stretchr/testify/assert"
-)
-
-func TestGetURIStringPointer_WithSlash(t *testing.T) {
-	baseUrl := "http://my-site.com/"
-	relativePath := "/api/stuff"
-	queryParams := url.Values{}
-	queryParams.Set("id", "1")
-	expected := "http://my-site.com/api/stuff?id=1"
-	actual, err := GetURIStringPointer(baseUrl, relativePath, queryParams)
-	assert.Equal(t, err == nil, true)
-	assert.Equal(t, expected, *actual)
-
-}
-func TestGetURIStringPointer_WithNoSlash(t *testing.T) {
-	baseUrl := "http://my-site.com"
-	relativePath := "api/stuff"
-	queryParams := url.Values{}
-	queryParams.Set("id", "1")
-	expected := "http://my-site.com/api/stuff?id=1"
-	actual, err := GetURIStringPointer(baseUrl, relativePath, queryParams)
-	assert.Equal(t, err == nil, true)
-	assert.Equal(t, expected, *actual)
-}
-func TestGetURIStringPointer_WithRelativePath(t *testing.T) {
-	baseUrl := "http://my-site.com/rest"
-	relativePath := "api/stuff"
-	queryParams := url.Values{}
-	queryParams.Set("id", "1")
-	expected := "http://my-site.com/rest/api/stuff?id=1"
-	actual, err := GetURIStringPointer(baseUrl, relativePath, queryParams)
-	assert.Equal(t, err == nil, true)
-	assert.Equal(t, expected, *actual)
-}
-func TestGetURIStringPointer_WithRelativePath2(t *testing.T) {
-	baseUrl := "https://my-site.com/api/v4/"
-	relativePath := "projects/stuff"
-	queryParams := url.Values{}
-	queryParams.Set("id", "1")
-	expected := "https://my-site.com/api/v4/projects/stuff?id=1"
-	actual, err := GetURIStringPointer(baseUrl, relativePath, queryParams)
-	assert.Equal(t, err == nil, true)
-	assert.Equal(t, expected, *actual)
-}
-
-func TestGetURIStringPointer_HandlesRelativePathStartingWithSlash(t *testing.T) {
-	baseUrl := "https://my-site.com/api/v4/"
-	relativePath := "/user"
-	expected := "https://my-site.com/api/v4/user"
-	actual, err := GetURIStringPointer(baseUrl, relativePath, nil)
-	assert.Equal(t, err == nil, true)
-	assert.Equal(t, expected, *actual)
-}
-
-func TestGetURIStringPointer_HandlesRelativePathStartingWithSlashWithParams(t *testing.T) {
-	baseUrl := "https://my-site.com/api/v4/"
-	relativePath := "/user"
-	queryParams := url.Values{}
-	queryParams.Set("id", "1")
-	expected := "https://my-site.com/api/v4/user?id=1"
-	actual, err := GetURIStringPointer(baseUrl, relativePath, queryParams)
-	assert.Equal(t, err == nil, true)
-	assert.Equal(t, expected, *actual)
-}
-
-func TestAddMissingSlashToURL_NoSlash(t *testing.T) {
-	baseUrl := "http://my-site.com/rest"
-	expected := "http://my-site.com/rest/"
-	AddMissingSlashToURL(&baseUrl)
-	assert.Equal(t, expected, baseUrl)
-}
-
-func TestAddMissingSlashToURL_WithSlash(t *testing.T) {
-	baseUrl := "http://my-site.com/rest/"
-	expected := "http://my-site.com/rest/"
-	AddMissingSlashToURL(&baseUrl)
-	assert.Equal(t, expected, baseUrl)
-}
-
-func TestRemoveStartingSlashFromPath(t *testing.T) {
-	testString := "/user/api"
-	expected := "user/api"
-	actual := RemoveStartingSlashFromPath(testString)
-	assert.Equal(t, expected, actual)
-}
-
-func TestRemoveStartingSlashFromPath_EmptyString(t *testing.T) {
-	testString := ""
-	expected := ""
-	actual := RemoveStartingSlashFromPath(testString)
-	assert.Equal(t, expected, actual)
-}
-
-func TestRemoveStartingSlashFromPath_NoStartingSlash(t *testing.T) {
-	testString := "user/api"
-	expected := "user/api"
-	actual := RemoveStartingSlashFromPath(testString)
-	assert.Equal(t, expected, actual)
-}
diff --git a/plugins/helper/api_collector.go b/plugins/helper/api_collector.go
index 23cbb099..a185b1a6 100644
--- a/plugins/helper/api_collector.go
+++ b/plugins/helper/api_collector.go
@@ -19,66 +19,58 @@ package helper
 
 import (
 	"bytes"
-	"context"
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
-	"math"
 	"net/http"
 	"net/url"
-	"sync"
 	"text/template"
 
 	"github.com/apache/incubator-devlake/plugins/core"
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 )
 
+// Pager contains pagination information for a api request
 type Pager struct {
 	Page int
 	Skip int
 	Size int
 }
 
+// RequestData is the input of `UrlTemplate` `Query` and `Header`, so we can generate them dynamically
 type RequestData struct {
-	Pager  *Pager
-	Params interface{}
-	Input  interface{}
+	Pager     *Pager
+	Params    interface{}
+	Input     interface{}
+	InputJSON []byte
 }
 
 type AsyncResponseHandler func(res *http.Response) error
 
 type ApiCollectorArgs struct {
 	RawDataSubTaskArgs
-	/*
-		url may use arbitrary variables from different source in any order, we need GoTemplate to allow more
-		flexible for all kinds of possibility.
-		Pager contains information for a particular page, calculated by ApiCollector, and will be passed into
-		GoTemplate to generate a url for that page.
-		We want to do page-fetching in ApiCollector, because the logic are highly similar, by doing so, we can
-		avoid duplicate logic for every tasks, and when we have a better idea like improving performance, we can
-		do it in one place
-	*/
+	// UrlTemplate is used to generate the final URL for Api Collector to request
+	// i.e. `api/3/issue/{{ .Input.IssueId }}/changelog`
+	// For detail of what variables can be used, please check `RequestData`
 	UrlTemplate string `comment:"GoTemplate for API url"`
-	// (Optional) Return query string for request, or you can plug them into UrlTemplate directly
-	Query func(reqData *RequestData) (url.Values, error) `comment:"Extra query string when requesting API, like 'Since' option for jira issues collection"`
-	// Some api might do pagination by http headers
-	Header      func(reqData *RequestData) (http.Header, error)
-	PageSize    int
-	Incremental bool `comment:"Indicate this is a incremental collection, so the existing data won't get flushed"`
-	ApiClient   RateLimitedApiClient
-	/*
-		Sometimes, we need to collect data based on previous collected data, like jira changelog, it requires
-		issue_id as part of the url.
-		We can mimic `stdin` design, to accept a `Input` function which produces a `Iterator`, collector
-		should iterate all records, and do data-fetching for each on, either in parallel or sequential order
-		UrlTemplate: "api/3/issue/{{ Input.ID }}/changelog"
-	*/
-	Input          Iterator
-	InputRateLimit int
-	/*
-		For api endpoint that returns number of total pages, ApiCollector can collect pages in parallel with ease,
-		or other techniques are required if this information was missing.
-	*/
-	GetTotalPages  func(res *http.Response, args *ApiCollectorArgs) (int, error)
+	// Query would be sent out as part of the request URL
+	Query func(reqData *RequestData) (url.Values, error) ``
+	// Header would be sent out along with request
+	Header func(reqData *RequestData) (http.Header, error)
+	// PageSize tells ApiCollector the page size
+	PageSize int
+	// Incremental indicate if this is a incremental collection, the existing data won't get deleted if it was true
+	Incremental bool `comment:""`
+	// ApiClient is a asynchronize api request client with qps
+	ApiClient RateLimitedApiClient
+	// Input helps us collect data based on previous collected data, like collecting changelogs based on jira
+	// issue ids
+	Input Iterator
+	// GetTotalPages is to tell `ApiCollector` total number of pages based on response of the first page.
+	// so `ApiCollector` could collect those pages in parallel for us
+	GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, error)
+	// Concurrency specify qps for api that doesn't return total number of pages/records
+	// NORMALLY, DO NOT SPECIFY THIS PARAMETER, unless you know what it means
 	Concurrency    int
 	ResponseParser func(res *http.Response) ([]json.RawMessage, error)
 }
@@ -89,9 +81,9 @@ type ApiCollector struct {
 	urlTemplate *template.Template
 }
 
-// NewApiCollector allocates a new ApiCollector  with the given args.
-// ApiCollector can help you collecting data from some api with ease, pass in a AsyncApiClient and tell it which part
-// of response you want to save, ApiCollector will collect them from remote server and store them into database.
+// NewApiCollector allocates a new ApiCollector with the given args.
+// ApiCollector can help us collecting data from api with ease, pass in a AsyncApiClient and tell it which part
+// of response we want to save, ApiCollector will collect them from remote server and store them into database.
 func NewApiCollector(args ApiCollectorArgs) (*ApiCollector, error) {
 	// process args
 	rawDataSubTask, err := newRawDataSubTask(args.RawDataSubTaskArgs)
@@ -112,12 +104,6 @@ func NewApiCollector(args ApiCollectorArgs) (*ApiCollector, error) {
 	if args.ResponseParser == nil {
 		return nil, fmt.Errorf("ResponseParser is required")
 	}
-	if args.InputRateLimit == 0 {
-		args.InputRateLimit = 50
-	}
-	if args.Concurrency < 1 {
-		args.Concurrency = 1
-	}
 	return &ApiCollector{
 		RawDataSubTask: rawDataSubTask,
 		args:           &args,
@@ -131,81 +117,35 @@ func (collector *ApiCollector) Execute() error {
 	logger.Info("start api collection")
 
 	// make sure table is created
-	db := collector.args.Ctx.GetDb()
-	err := db.Table(collector.table).AutoMigrate(&RawData{})
+	db := collector.args.Ctx.GetDal()
+	err := db.AutoMigrate(&RawData{}, dal.From(collector.table))
 	if err != nil {
 		return err
 	}
 
 	// flush data if not incremental collection
 	if !collector.args.Incremental {
-		err = db.Table(collector.table).Delete(&RawData{}, "params = ?", collector.params).Error
+		err = db.Delete(&RawData{}, dal.From(collector.table), dal.Where("params = ?", collector.params))
 		if err != nil {
 			return err
 		}
 	}
 
+	collector.args.Ctx.SetProgress(0, -1)
 	if collector.args.Input != nil {
-		collector.args.Ctx.SetProgress(0, -1)
-		// load all rows from iterator, and do multiple `exec` accordingly
-		// TODO: this loads all records into memory, we need lazy-load
 		iterator := collector.args.Input
+		apiClient := collector.args.ApiClient
 		defer iterator.Close()
-		// throttle input process speed so it can be canceled, create a channel to represent available slots
-		slots := int(math.Ceil(collector.args.ApiClient.GetQps())) * 2
-		if slots <= 0 {
-			return fmt.Errorf("RateLimit can't use the 0 Qps")
-		}
-		slotsChan := make(chan bool, slots)
-		defer close(slotsChan)
-		for i := 0; i < slots; i++ {
-			slotsChan <- true
-		}
-
-		errors := make(chan error, slots)
-		defer close(errors)
-
-		var wg sync.WaitGroup
-		ctx := collector.args.Ctx.GetContext()
-
-	out:
-		for iterator.HasNext() {
-			select {
-			// canceled by user, stop
-			case <-ctx.Done():
-				err = ctx.Err()
-				break out
-			// obtain a slot
-			case <-slotsChan:
-				input, err := iterator.Fetch()
-				if err != nil {
-					break out
-				}
-				wg.Add(1)
-				go func() {
-					defer func() {
-						wg.Done()
-						recover() //nolint TODO: check the return and do log if not nil
-					}()
-					e := collector.exec(input)
-					// propagate error
-					if e != nil {
-						errors <- e
-					} else {
-						// release 1 slot
-						slotsChan <- true
-					}
-				}()
-			case err = <-errors:
-				break out
+		for iterator.HasNext() && !apiClient.HasError() {
+			input, err := iterator.Fetch()
+			if err != nil {
+				break
 			}
-		}
-		if err == nil {
-			wg.Wait()
+			collector.exec(input)
 		}
 	} else {
 		// or we just did it once
-		err = collector.exec(nil)
+		collector.exec(nil)
 	}
 
 	if err != nil {
@@ -213,62 +153,104 @@ func (collector *ApiCollector) Execute() error {
 	}
 	logger.Debug("wait for all async api to finished")
 	err = collector.args.ApiClient.WaitAsync()
-	logger.Info("end api collection")
+	logger.Info("end api collection error: %w", err)
 	return err
 }
 
-func (collector *ApiCollector) exec(input interface{}) error {
+func (collector *ApiCollector) exec(input interface{}) {
+	inputJson, err := json.Marshal(input)
+	if err != nil {
+		panic(err)
+	}
 	reqData := new(RequestData)
 	reqData.Input = input
-	if collector.args.PageSize <= 0 {
-		// collect detail of a record
-		return collector.fetchAsync(reqData, collector.handleResponse(reqData))
+	reqData.InputJSON = inputJson
+	reqData.Pager = &Pager{
+		Page: 1,
+		Size: collector.args.PageSize,
 	}
-	// collect multiple pages
-	var err error
-	if collector.args.GetTotalPages != nil {
-		/* when total pages is available from api*/
-		// fetch the very first page
-		err = collector.fetchAsync(reqData, collector.handleResponseWithPages(reqData))
+	if collector.args.PageSize <= 0 {
+		collector.fetchAsync(reqData, nil)
+	} else if collector.args.GetTotalPages != nil {
+		collector.fetchPagesDetermined(reqData)
 	} else {
-		// if api doesn't return total number of pages, employ a step concurrent technique
-		// when `Concurrency` was set to 3:
-		// goroutine #1 fetches pages 1/4/7..
-		// goroutine #2 fetches pages 2/5/8...
-		// goroutine #3 fetches pages 3/6/9...
-		errs := make(chan error, collector.args.Concurrency)
-		var errCount int
-		// cancel can only be called when error occurs, because we are doomed anyway.
-		ctx, cancel := context.WithCancel(collector.args.Ctx.GetContext())
-		defer cancel()
-		for i := 0; i < collector.args.Concurrency; i++ {
-			reqDataTemp := RequestData{
-				Pager: &Pager{
-					Page: i + 1,
-					Size: collector.args.PageSize,
-					Skip: collector.args.PageSize * (i),
-				},
-				Input: reqData.Input,
-			}
-			go func() {
-				errs <- collector.stepFetch(ctx, cancel, reqDataTemp)
-			}()
+		collector.fetchPagesUndetermined(reqData)
+	}
+}
+
+// fetchPagesDetermined fetches data of all pages for APIs that return paging information
+func (collector *ApiCollector) fetchPagesDetermined(reqData *RequestData) {
+	// fetch first page
+	collector.fetchAsync(reqData, func(count int, body []byte, res *http.Response) error {
+		totalPages, err := collector.args.GetTotalPages(res, collector.args)
+		if err != nil {
+			return fmt.Errorf("fetchPagesDetermined get totalPages faileds: %s", err.Error())
 		}
-		for e := range errs {
-			errCount++
-			if err != nil || errCount == collector.args.Concurrency {
-				err = e
-				break
+		// spawn a none blocking go routine to fetch other pages
+		collector.args.ApiClient.NextTick(func() error {
+			for page := 2; page <= totalPages; page++ {
+				reqDataTemp := &RequestData{
+					Pager: &Pager{
+						Page: page,
+						Skip: collector.args.PageSize * (page - 1),
+						Size: collector.args.PageSize,
+					},
+					Input: reqData.Input,
+				}
+				collector.fetchAsync(reqDataTemp, nil)
 			}
+			return nil
+		})
+		return nil
+	})
+}
+
+// fetchPagesUndetermined fetches data of all pages for APIs that do NOT return paging information
+func (collector *ApiCollector) fetchPagesUndetermined(reqData *RequestData) {
+	//logger := collector.args.Ctx.GetLogger()
+	//logger.Debug("fetch all pages in parallel with specified concurrency: %d", collector.args.Concurrency)
+	// if api doesn't return total number of pages, employ a step concurrent technique
+	// when `Concurrency` was set to 3:
+	// goroutine #1 fetches pages 1/4/7..
+	// goroutine #2 fetches pages 2/5/8...
+	// goroutine #3 fetches pages 3/6/9...
+	apiClient := collector.args.ApiClient
+	concurrency := collector.args.Concurrency
+	if concurrency == 0 {
+		// normally when a multi-pages api depends on a another resource, like jira changelogs depend on issue ids
+		// it tend to have less page, like 1 or 2 pages in total
+		if collector.args.Input != nil {
+			concurrency = 2
+		} else {
+			concurrency = apiClient.GetNumOfWorkers()
 		}
 	}
-	if err != nil {
-		return err
-	}
-	if collector.args.Input != nil {
-		collector.args.Ctx.IncProgress(1)
+	for i := 0; i < concurrency; i++ {
+		reqDataCopy := RequestData{
+			Pager: &Pager{
+				Page: i + 1,
+				Size: collector.args.PageSize,
+				Skip: collector.args.PageSize * (i),
+			},
+			Input: reqData.Input,
+		}
+		var collect func() error
+		collect = func() error {
+			collector.fetchAsync(&reqDataCopy, func(count int, body []byte, res *http.Response) error {
+				if count < collector.args.PageSize {
+					return nil
+				}
+				apiClient.NextTick(func() error {
+					reqDataCopy.Pager.Skip += collector.args.PageSize
+					reqDataCopy.Pager.Page += concurrency
+					return collect()
+				})
+				return nil
+			})
+			return nil
+		}
+		apiClient.NextTick(collect)
 	}
-	return nil
 }
 
 func (collector *ApiCollector) generateUrl(pager *Pager, input interface{}) (string, error) {
@@ -284,84 +266,23 @@ func (collector *ApiCollector) generateUrl(pager *Pager, input interface{}) (str
 	return buf.String(), nil
 }
 
-// stepFetch collect pages synchronously. In practice, several stepFetch running concurrently, we could stop all of them by calling `cancel`.
-func (collector *ApiCollector) stepFetch(ctx context.Context, cancel func(), reqData RequestData) error {
-	// channel `c` is used to make sure fetchAsync is called serially
-	c := make(chan struct{})
-	var err1 error
-	handler := func(res *http.Response, err error) error {
-		select {
-		case <-ctx.Done():
-			err = ctx.Err()
-		default:
-
-		}
-		if err != nil {
-			err1 = err
-			close(c)
-			return err
-		}
-		count, err := collector.saveRawData(res, reqData.Input)
-		if err != nil {
-			err1 = err
-			close(c)
-			cancel()
-			return err
-		}
-		if count < collector.args.PageSize {
-			close(c)
-			return nil
-		}
-		reqData.Pager.Skip += collector.args.PageSize
-		reqData.Pager.Page += collector.args.Concurrency
-		c <- struct{}{}
-		return nil
-	}
-	// kick off
-	go func() { c <- struct{}{} }()
-	for {
-		select {
-		case <-ctx.Done():
-			return ctx.Err()
-		case _, ok := <-c:
-			if !ok || err1 != nil {
-				return err1
-			} else {
-				err := collector.fetchAsync(&reqData, handler)
-				if err != nil {
-					close(c)
-					cancel()
-					return err
-				}
-			}
-		}
-	}
-}
-
-func (collector *ApiCollector) fetchAsync(reqData *RequestData, handler ApiAsyncCallback) error {
+func (collector *ApiCollector) fetchAsync(reqData *RequestData, handler func(int, []byte, *http.Response) error) {
 	if reqData.Pager == nil {
 		reqData.Pager = &Pager{
 			Page: 1,
 			Size: 100,
 			Skip: 0,
 		}
-	}
-	ctx := collector.args.Ctx.GetContext()
-	select {
-	case <-ctx.Done():
-		return ctx.Err()
-	default:
-
 	}
 	apiUrl, err := collector.generateUrl(reqData.Pager, reqData.Input)
 	if err != nil {
-		return err
+		panic(err)
 	}
 	var apiQuery url.Values
 	if collector.args.Query != nil {
 		apiQuery, err = collector.args.Query(reqData)
 		if err != nil {
-			return err
+			panic(err)
 		}
 	}
 
@@ -369,129 +290,56 @@ func (collector *ApiCollector) fetchAsync(reqData *RequestData, handler ApiAsync
 	if collector.args.Header != nil {
 		apiHeader, err = collector.args.Header(reqData)
 		if err != nil {
-			return err
+			panic(err)
 		}
 	}
-	return collector.args.ApiClient.GetAsync(apiUrl, apiQuery, apiHeader, handler)
-}
-
-func (collector *ApiCollector) handleResponse(reqData *RequestData) ApiAsyncCallback {
-	return func(res *http.Response, err error) error {
+	logger := collector.args.Ctx.GetLogger()
+	logger.Debug("fetchAsync <<< enqueueing for %s %v", apiUrl, apiQuery)
+	collector.args.ApiClient.GetAsync(apiUrl, apiQuery, apiHeader, func(res *http.Response) error {
+		defer logger.Debug("fetchAsync >>> done for %s %v", apiUrl, apiQuery)
+		logger := collector.args.Ctx.GetLogger()
+		// read body to buffer
+		body, err := ioutil.ReadAll(res.Body)
 		if err != nil {
 			return err
 		}
-		_, err = collector.saveRawData(res, reqData.Input)
-		collector.args.Ctx.IncProgress(1)
-		return err
-	}
-}
-
-func (collector *ApiCollector) handleResponseWithPages(reqData *RequestData) ApiAsyncCallback {
-	return func(res *http.Response, e error) error {
-		if e != nil {
-			return e
-		}
-		// gather total pages
-		body, e := ioutil.ReadAll(res.Body)
-		if e != nil {
-			return e
-		}
 		res.Body.Close()
 		res.Body = ioutil.NopCloser(bytes.NewBuffer(body))
-		totalPages, e := collector.args.GetTotalPages(res, collector.args)
-		if e != nil {
-			return e
-		}
-		// save response body of first page
-		res.Body = ioutil.NopCloser(bytes.NewBuffer(body))
-		_, e = collector.saveRawData(res, reqData.Input)
-		if e != nil {
-			return e
+		// convert body to array of RawJSON
+		items, err := collector.args.ResponseParser(res)
+		if err != nil {
+			return err
 		}
-		if collector.args.Input == nil {
-			collector.args.Ctx.SetProgress(1, totalPages)
+		// save to db
+		count := len(items)
+		if count == 0 {
+			return nil
 		}
-		// fetch other pages in parallel
-		collector.args.ApiClient.Add(1)
-		go func() {
-			defer func() {
-				collector.args.ApiClient.Done()
-				recover() //nolint TODO: check the return and do log if not nil
-			}()
-			for page := 2; page <= totalPages; page++ {
-				reqDataTemp := &RequestData{
-					Pager: &Pager{
-						Page: page,
-						Size: collector.args.PageSize,
-						Skip: collector.args.PageSize * (page - 1),
-					},
-					Input: reqData.Input,
-				}
-				_ = collector.fetchAsync(reqDataTemp, collector.handleResponse(reqDataTemp))
+		db := collector.args.Ctx.GetDal()
+		url := res.Request.URL.String()
+		rows := make([]*RawData, count)
+		for i, msg := range items {
+			rows[i] = &RawData{
+				Params: collector.params,
+				Data:   msg,
+				Url:    url,
+				Input:  reqData.InputJSON,
 			}
-		}()
-		return nil
-	}
-}
-
-func (collector *ApiCollector) saveRawData(res *http.Response, input interface{}) (int, error) {
-	items, err := collector.args.ResponseParser(res)
-	logger := collector.args.Ctx.GetLogger()
-	if err != nil {
-		return 0, err
-	}
-	res.Body.Close()
-
-	inputJson, _ := json.Marshal(input)
-
-	if len(items) == 0 {
-		return 0, nil
-	}
-	db := collector.args.Ctx.GetDb()
-	u := res.Request.URL.String()
-	dd := make([]*RawData, len(items))
-	for i, msg := range items {
-		dd[i] = &RawData{
-			Params: collector.params,
-			Data:   msg,
-			Url:    u,
-			Input:  inputJson,
 		}
-	}
-	err = db.Table(collector.table).Create(dd).Error
-	if err != nil {
-		logger.Error("failed to save raw data: %s", err)
-	}
-	return len(dd), err
-}
-
-func GetRawMessageDirectFromResponse(res *http.Response) ([]json.RawMessage, error) {
-	body, err := ioutil.ReadAll(res.Body)
-	res.Body.Close()
-	if err != nil {
-		return nil, err
-	}
-	return []json.RawMessage{body}, nil
-}
-
-func GetRawMessageArrayFromResponse(res *http.Response) ([]json.RawMessage, error) {
-	rawMessages := []json.RawMessage{}
-
-	if res == nil {
-		return nil, fmt.Errorf("res is nil")
-	}
-	defer res.Body.Close()
-	resBody, err := ioutil.ReadAll(res.Body)
-	if err != nil {
-		return nil, fmt.Errorf("%w %s", err, res.Request.URL.String())
-	}
-
-	err = json.Unmarshal(resBody, &rawMessages)
-	if err != nil {
-		return nil, fmt.Errorf("%w %s %s", err, res.Request.URL.String(), string(resBody))
-	}
-
-	return rawMessages, nil
+		err = db.Create(rows, dal.From(collector.table))
+		if err != nil {
+			return err
+		}
+		logger.Debug("fetchAsync === total %d rows were saved into database", count)
+		// increase progress only when it was not nested
+		collector.args.Ctx.IncProgress(1)
+		if handler != nil {
+			res.Body = ioutil.NopCloser(bytes.NewBuffer(body))
+			return handler(count, body, res)
+		}
+		return nil
+	})
+	logger.Debug("fetchAsync === enqueued for %s %v", apiUrl, apiQuery)
 }
 
 var _ core.SubTask = (*ApiCollector)(nil)
diff --git a/utils/callframes.go b/plugins/helper/api_collector_func.go
similarity index 50%
copy from utils/callframes.go
copy to plugins/helper/api_collector_func.go
index 44cda2d6..690e7a65 100644
--- a/utils/callframes.go
+++ b/plugins/helper/api_collector_func.go
@@ -15,38 +15,40 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package utils
+package helper
 
 import (
+	"encoding/json"
 	"fmt"
-	"runtime"
-	"strings"
+	"io/ioutil"
+	"net/http"
 )
 
-func GatherCallFrames() string {
-	var name, file string
-	var line int
-	var pc [16]uintptr
-
-	n := runtime.Callers(3, pc[:])
-	for _, pc := range pc[:n] {
-		fn := runtime.FuncForPC(pc)
-		if fn == nil {
-			continue
-		}
-		file, line = fn.FileLine(pc)
-		name = fn.Name()
-		if !strings.HasPrefix(name, "runtime.") {
-			break
-		}
+func GetRawMessageDirectFromResponse(res *http.Response) ([]json.RawMessage, error) {
+	body, err := ioutil.ReadAll(res.Body)
+	res.Body.Close()
+	if err != nil {
+		return nil, err
+	}
+	return []json.RawMessage{body}, nil
+}
+
+func GetRawMessageArrayFromResponse(res *http.Response) ([]json.RawMessage, error) {
+	rawMessages := []json.RawMessage{}
+
+	if res == nil {
+		return nil, fmt.Errorf("res is nil")
+	}
+	defer res.Body.Close()
+	resBody, err := ioutil.ReadAll(res.Body)
+	if err != nil {
+		return nil, fmt.Errorf("%w %s", err, res.Request.URL.String())
 	}
 
-	switch {
-	case name != "":
-		return fmt.Sprintf("%v:%v", name, line)
-	case file != "":
-		return fmt.Sprintf("%v:%v", file, line)
+	err = json.Unmarshal(resBody, &rawMessages)
+	if err != nil {
+		return nil, fmt.Errorf("%w %s %s", err, res.Request.URL.String(), string(resBody))
 	}
 
-	return fmt.Sprintf("pc:%x", pc)
+	return rawMessages, nil
 }
diff --git a/plugins/helper/api_collector_test.go b/plugins/helper/api_collector_test.go
index c8f89286..aa8343c2 100644
--- a/plugins/helper/api_collector_test.go
+++ b/plugins/helper/api_collector_test.go
@@ -1,1142 +1,81 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
 package helper
 
 import (
 	"bytes"
-	"context"
-	"database/sql"
-	"encoding/json"
-	"fmt"
 	"io/ioutil"
 	"net/http"
 	"net/url"
-	"reflect"
-	"runtime/debug"
-	"sync/atomic"
 	"testing"
-	"time"
 
-	"github.com/agiledragon/gomonkey/v2"
-	"github.com/apache/incubator-devlake/logger"
-	"github.com/apache/incubator-devlake/models/common"
-	"github.com/sirupsen/logrus"
+	"github.com/apache/incubator-devlake/helpers/unithelper"
+	"github.com/apache/incubator-devlake/mocks"
+	"github.com/apache/incubator-devlake/plugins/helper/common"
 	"github.com/stretchr/testify/assert"
-	"gorm.io/gorm"
+	"github.com/stretchr/testify/mock"
 )
 
-// go test -gcflags=all=-l
-
-type TestTable struct {
-	Email string `gorm:"primaryKey;type:varchar(255)"`
-	Name  string `gorm:"type:varchar(255)"`
-	common.NoPKModel
-}
-
-type TestTable2 struct {
-	Email string `gorm:"primaryKey;type:varchar(255)"`
-	Name  string `gorm:"type:varchar(255)"`
-	common.NoPKModel
-}
-
-var TestTableData *TestTable = &TestTable{
-	Email: "test@test.com",
-	Name:  "test",
-}
-
-type TestParam struct {
-	Test string
-}
-
-func (TestTable) TableName() string {
-	return "_tool_test"
-}
-
-var TestError error = fmt.Errorf("Error For Test")
-
-var gt *gomonkey.Patches
-var gc *gomonkey.Patches
-var gd *gomonkey.Patches
-var ga *gomonkey.Patches
-var gs *gomonkey.Patches
-var god *gomonkey.Patches
-var gw *gomonkey.Patches
-var gr *gomonkey.Patches
-
-var TestUrlBefor string = "test1"
-var TestUrlParam string = "test2"
-var TestUrlAfter string = "test3"
-var TestUrl string = "https://" + TestUrlBefor + TestUrlParam + TestUrlAfter
-
-var TestRawMessage string = "{\"message\":\"TestRawMessage\"}"
-var TestUrlValueKey string = "TestKey"
-var TestUrlValueValue string = "TestValue"
-var TestNoRunHere string = "should not run to this line of code"
-
-var TestDataCount int = 100
-var TestTotalPage int = 100
-var TestDataCountNotFull int = 50
-var TestPage int = 110
-var TestSkip int = 100100
-var TestSize int = 116102
-var TestTimeOut time.Duration = time.Duration(10) * time.Second
-
-var Cancel context.CancelFunc
-
-var TestHttpResponse_Suc http.Response = http.Response{
-	Status:     "200 OK",
-	StatusCode: http.StatusOK,
-	Proto:      "HTTP/1.0",
-	ProtoMajor: 1,
-	ProtoMinor: 0,
-}
-
-var TestHttpResponse_404 http.Response = http.Response{
-	Status:     "404 Not Found",
-	StatusCode: http.StatusNotFound,
-	Proto:      "HTTP/1.0",
-	ProtoMajor: 1,
-	ProtoMinor: 0,
-}
-
-// Assert http.Response base test data
-func AssertBaseResponse(t *testing.T, A *http.Response, B *http.Response) {
-	assert.Equal(t, A.Status, B.Status)
-	assert.Equal(t, A.StatusCode, B.StatusCode)
-	assert.Equal(t, A.Proto, B.Proto)
-	assert.Equal(t, A.ProtoMajor, B.ProtoMajor)
-	assert.Equal(t, A.ProtoMinor, B.ProtoMinor)
-}
-
-func SetTimeOut(timeout time.Duration, handleer func()) error {
-	stack := string(debug.Stack())
-	t := time.After(timeout)
-	done := make(chan bool)
-	go func() {
-		defer func() {
-			if r := recover(); r != nil {
-				fmt.Printf("%s\r\n", stack)
-				fmt.Printf("%v\r\n", r)
-			}
-		}()
-
-		if handleer != nil {
-			handleer()
-		}
-		done <- true
-	}()
-
-	select {
-	case <-t:
-		return fmt.Errorf("[time:%s]\r\n[Time limit for %f seconed]\r\n[stack]\r\n%s\r\n",
-			time.Now().String(),
-			float64(timeout)/float64(time.Second),
-			stack)
-	case <-done:
-		return nil
-	}
-}
-
-func AddBodyData(res *http.Response, count int) {
-	data := "["
-	for i := 0; i < count; i++ {
-		data += TestRawMessage
-		if i != count-1 {
-			data += ","
-		}
-	}
-	data += "]"
-	res.Body = ioutil.NopCloser(bytes.NewReader([]byte(data)))
-}
-
-func SetUrl(res *http.Response, rawURL string) {
-	u, _ := url.Parse(rawURL)
-	res.Request = &http.Request{
-		URL: u,
-	}
-}
-
-// Mock the DB api
-// Need be released by UnMockDB
-func MockDB(t *testing.T) {
-	gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
-		assert.Equal(t, name, TestTableData.TableName())
-		return db
-	})
-
-	gc = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Create", func(db *gorm.DB, value interface{}) *gorm.DB {
-		assert.Equal(t, TestTableData, value.(*TestTable))
-		return db
-	})
-
-	gd = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Delete", func(db *gorm.DB, value interface{}, conds ...interface{}) (tx *gorm.DB) {
-		return db
-	})
-
-	ga = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "AutoMigrate", func(db *gorm.DB, dst ...interface{}) error {
-		return nil
-	})
-
-	god = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Order", func(db *gorm.DB, value interface{}) (tx *gorm.DB) {
-		return db
-	})
-
-	gw = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Where", func(db *gorm.DB, query interface{}, args ...interface{}) (tx *gorm.DB) {
-		return db
-	})
-
-	gr = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Count", func(db *gorm.DB, count *int64) (tx *gorm.DB) {
-		return db
-	})
-
-	gr = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Rows", func(db *gorm.DB) (*sql.Rows, error) {
-		return &sql.Rows{}, nil
-	})
-
-	gs = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "ScanRows", func(db *gorm.DB, rows *sql.Rows, dest interface{}) error {
-		dest = TestRawMessage
-		return nil
-	})
-
-}
-
-// released MockDB
-func UnMockDB() {
-	gt.Reset()
-	gc.Reset()
-	gd.Reset()
-	ga.Reset()
-	god.Reset()
-	gw.Reset()
-	gr.Reset()
-	gs.Reset()
-}
-
-type TestIterator struct {
-	data         TestTable
-	count        int
-	hasNextTimes int
-	fetchTimes   int
-	closeTimes   int
-	unlimit      bool
-}
-
-func (it *TestIterator) HasNext() bool {
-	it.hasNextTimes++
-	return it.count > 0
-}
-
-func (it *TestIterator) Fetch() (interface{}, error) {
-	it.fetchTimes++
-	if it.count > 0 {
-		if it.unlimit == false {
-			it.count--
-		}
-		ret := it.data
-		return &ret, nil
-	}
-	return nil, TestError
-}
-
-func (it *TestIterator) Close() error {
-	it.closeTimes++
-	return nil
-}
-
-func CreateTestApiCollector() (*ApiCollector, error) {
-	db := &gorm.DB{}
-	var ctx context.Context
-	ctx, Cancel = context.WithCancel(context.Background())
-	return NewApiCollector(ApiCollectorArgs{
-		RawDataSubTaskArgs: RawDataSubTaskArgs{
-			Ctx: &DefaultSubTaskContext{
-				defaultExecContext: newDefaultExecContext(GetConfigForTest("../../"), logger.NewDefaultLogger(logrus.New(), "Test", make(map[string]*logrus.Logger)), db, ctx, "Test", nil, nil),
-			},
-			Table: TestTable{}.TableName(),
-			Params: &TestParam{
-				Test: TestUrlParam,
+func TestFetchPageUndetermined(t *testing.T) {
+	mockDal := new(mocks.Dal)
+	mockDal.On("AutoMigrate", mock.Anything, mock.Anything).Return(nil).Once()
+	mockDal.On("Delete", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
+	mockDal.On("Create", mock.Anything, mock.Anything).Return(nil).Once()
+
+	mockCtx := unithelper.DummySubTaskContext(mockDal)
+
+	mockInput := new(mocks.Iterator)
+	mockInput.On("HasNext").Return(true).Once()
+	mockInput.On("HasNext").Return(false).Once()
+	mockInput.On("Fetch").Return(nil, nil).Once()
+	mockInput.On("Close").Return(nil)
+
+	// simulate fetching all pages of jira changelogs for 1 issue id with 1 concurrency,
+	// assuming api doesn't return total number of pages.
+	// then, we are expecting 2 calls for GetAsync and NextTick each, otherwise, deadlock happens
+	getAsyncCounter := 0
+	mockApi := new(mocks.RateLimitedApiClient)
+	mockApi.On("GetAsync", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
+		// fake records for first page, no records for second page
+		body := "[1,2,3]"
+		if getAsyncCounter > 0 {
+			body = "[]"
+		}
+		getAsyncCounter += 1
+		res := &http.Response{
+			Request: &http.Request{
+				URL: &url.URL{},
 			},
+			Body: ioutil.NopCloser(bytes.NewBufferString(body)),
+		}
+		handler := args.Get(3).(common.ApiAsyncCallback)
+		handler(res)
+	}).Twice()
+	mockApi.On("NextTick", mock.Anything).Run(func(args mock.Arguments) {
+		handler := args.Get(0).(func() error)
+		assert.Nil(t, handler())
+	}).Twice()
+	mockApi.On("HasError").Return(false)
+	mockApi.On("WaitAsync").Return(nil)
+
+	params := struct {
+		Name string
+	}{Name: "testparams"}
+
+	collector, err := NewApiCollector(ApiCollectorArgs{
+		RawDataSubTaskArgs: RawDataSubTaskArgs{
+			Ctx:    mockCtx,
+			Table:  "whatever rawtable",
+			Params: params,
 		},
-		ApiClient:   &ApiAsyncClient{qps: 10},
-		PageSize:    100,
-		Incremental: false,
-		UrlTemplate: TestUrlBefor + "{{ .Params.Test }}" + TestUrlAfter,
-		Query: func(reqData *RequestData) (url.Values, error) {
-			u := url.Values{}
-			json, err := json.Marshal(reqData.Input)
-			u.Add("Vjson", string(json))
-			if err != nil {
-				u.Add("Verr", err.Error())
-			} else {
-				u.Add("Verr", "")
-			}
-			return u, nil
-		},
-		Header: func(reqData *RequestData) (http.Header, error) {
-			h := http.Header{}
-			json, err := json.Marshal(reqData.Input)
-			h.Add("Hjson", string(json))
-			if err != nil {
-				h.Add("Herr", err.Error())
-			} else {
-				h.Add("Herr", "")
-			}
-			return h, nil
-		},
-		GetTotalPages:  func(res *http.Response, args *ApiCollectorArgs) (int, error) { return TestTotalPage, nil },
+		ApiClient:      mockApi,
+		Input:          mockInput,
+		UrlTemplate:    "whatever url",
+		Concurrency:    1,
+		PageSize:       3,
 		ResponseParser: GetRawMessageArrayFromResponse,
 	})
-}
-
-func TestGormDB(t *testing.T) {
-	ts := &TestTable{
-		Email: "test@test.com",
-		Name:  "test",
-	}
-
-	db := &gorm.DB{}
-	MockDB(t)
-	defer UnMockDB()
-
-	db.Table(ts.TableName()).Order(ts).Where(ts).Create(ts).Delete(ts).AutoMigrate()
-	db.Table(ts.TableName()).Order(ts).Where(ts).Create(ts).Delete(ts).Rows()
-	db.Table(ts.TableName()).Order(ts).Where(ts).Create(ts).Delete(ts).ScanRows(nil, nil)
-}
-
-func TestSaveRawData(t *testing.T) {
-	gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
-		assert.Equal(t, name, "_raw_"+TestTableData.TableName())
-		return db
-	},
-	)
-	defer gt.Reset()
-
-	gc = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Create", func(db *gorm.DB, value interface{}) *gorm.DB {
-		rd := value.([]*RawData)
-		params, _ := json.Marshal(&TestParam{
-			Test: TestUrlParam,
-		})
-		input, _ := json.Marshal(TestTableData)
-		for _, v := range rd {
-			// check data and url
-			assert.Equal(t, v.Params, string(params))
-			assert.Equal(t, string(v.Data), TestRawMessage)
-			assert.Equal(t, v.Url, TestUrl)
-			assert.Equal(t, v.Input.String(), string(input))
-		}
-		return db
-	},
-	)
-	defer gc.Reset()
-
-	apiCollector, _ := CreateTestApiCollector()
-
-	resBase := TestHttpResponse_Suc
-	res := &resBase
-
-	// build data and url
-	AddBodyData(res, TestDataCount)
-	SetUrl(res, TestUrl)
-
-	i, err := apiCollector.saveRawData(res, TestTableData)
-	assert.Equal(t, i, TestDataCount)
-	assert.Equal(t, err, nil)
-}
-
-func TestSaveRawData_Fail(t *testing.T) {
-	gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
-		assert.Empty(t, TestNoRunHere)
-		return db
-	},
-	)
-	defer gt.Reset()
-
-	gc = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Create", func(db *gorm.DB, value interface{}) *gorm.DB {
-		assert.Empty(t, TestNoRunHere)
-		return db
-	},
-	)
-	defer gc.Reset()
-
-	apiCollector, _ := CreateTestApiCollector()
-
-	// build data and url
-	resBase := TestHttpResponse_404
-	res := &resBase
-
-	AddBodyData(res, 0)
-	SetUrl(res, TestUrl)
-
-	//run testing
-	i, err := apiCollector.saveRawData(res, TestTableData)
-	assert.Equal(t, i, 0)
-	assert.Equal(t, err, nil)
-}
-
-func TestHandleResponse(t *testing.T) {
-	apiCollector, _ := CreateTestApiCollector()
-
-	gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
-		items, err := collector.args.ResponseParser(res)
-		assert.Equal(t, err, nil)
-		// check items data
-		for _, v := range items {
-			jsondata, err := json.Marshal(v)
-			assert.Equal(t, err, nil)
-			assert.Equal(t, string(jsondata), TestRawMessage)
-		}
-		assert.Equal(t, input, TestTableData)
-		AssertBaseResponse(t, res, &TestHttpResponse_Suc)
-		return len(items), nil
-	})
-	defer gs.Reset()
-
-	// build requeset input
-	reqData := new(RequestData)
-	reqData.Input = TestTableData
-	handle := apiCollector.handleResponse(reqData)
-
-	resBase := TestHttpResponse_Suc
-	res := &resBase
-
-	// build data and url
-	AddBodyData(res, TestDataCount)
-	SetUrl(res, TestUrl)
-
-	// run testing
-	err := handle(res, nil)
-	assert.Equal(t, err, nil)
-}
-
-func TestHandleResponse_Fail(t *testing.T) {
-	apiCollector, _ := CreateTestApiCollector()
-
-	gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
-		items, err := collector.args.ResponseParser(res)
-		assert.Equal(t, err, nil)
-		for _, v := range items {
-			jsondata, err := json.Marshal(v)
-			assert.Equal(t, err, nil)
-			assert.Equal(t, string(jsondata), TestRawMessage)
-		}
-		assert.Equal(t, input, TestTableData)
-		AssertBaseResponse(t, res, &TestHttpResponse_404)
-		return len(items), TestError
-	})
-	defer gs.Reset()
-
-	// build requeset input
-	reqData := new(RequestData)
-	reqData.Input = TestTableData
-	handle := apiCollector.handleResponse(reqData)
-
-	// build data and url
-	resBase := TestHttpResponse_404
-	res := &resBase
-
-	AddBodyData(res, 0)
-	SetUrl(res, TestUrl)
-
-	//  run testing
-	err := handle(res, nil)
-	assert.Equal(t, err, TestError)
-}
-
-func TestFetchAsync(t *testing.T) {
-	apiCollector, _ := CreateTestApiCollector()
-
-	gg := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "GetAsync", func(apiAsyncClient *ApiAsyncClient, path string, query url.Values, header http.Header, handler ApiAsyncCallback) error {
-		assert.Equal(t, path, TestUrlBefor+TestUrlParam+TestUrlAfter)
-
-		json, err := json.Marshal(TestTableData)
-		assert.Equal(t, query.Get("Vjson"), string(json))
-		assert.Equal(t, header.Get("Hjson"), string(json))
-		if err != nil {
-			assert.Equal(t, query.Get("Verr"), err.Error())
-			assert.Equal(t, header.Get("Herr"), err.Error())
-		} else {
-			assert.Equal(t, query.Get("Verr"), "")
-			assert.Equal(t, header.Get("Herr"), "")
-		}
-
-		res := TestHttpResponse_Suc
-		handler(&res, TestError)
-		return nil
-	})
-	defer gg.Reset()
-
-	// build request Input
-	reqData := new(RequestData)
-	reqData.Input = TestTableData
-
-	// run testing
-	err := apiCollector.fetchAsync(reqData, func(r *http.Response, err error) error {
-		AssertBaseResponse(t, r, &TestHttpResponse_Suc)
-		assert.Equal(t, err, TestError)
-		return err
-	})
-
-	assert.Equal(t, err, nil)
-}
-
-func TestFetchAsync_Fail(t *testing.T) {
-	apiCollector, _ := CreateTestApiCollector()
-
-	gg := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "GetAsync", func(apiAsyncClient *ApiAsyncClient, path string, query url.Values, header http.Header, handler ApiAsyncCallback) error {
-		assert.Equal(t, path, TestUrlBefor+TestUrlParam+TestUrlAfter)
-
-		json, err := json.Marshal(TestTableData)
-		assert.Equal(t, query.Get("Vjson"), string(json))
-		assert.Equal(t, header.Get("Hjson"), string(json))
-		if err != nil {
-			assert.Equal(t, query.Get("Verr"), err.Error())
-			assert.Equal(t, header.Get("Herr"), err.Error())
-		} else {
-			assert.Equal(t, query.Get("Verr"), "")
-			assert.Equal(t, header.Get("Herr"), "")
-		}
-
-		res := TestHttpResponse_404
-		handler(&res, TestError)
-		return TestError
-	})
-	defer gg.Reset()
-
-	// build request Input
-	reqData := new(RequestData)
-	reqData.Input = TestTableData
-
-	// run testing
-	err := apiCollector.fetchAsync(reqData, func(r *http.Response, err error) error {
-		AssertBaseResponse(t, r, &TestHttpResponse_404)
-		assert.Equal(t, err, TestError)
-		return err
-	})
-
-	assert.Equal(t, err, TestError)
-}
-
-func TestHandleResponseWithPages(t *testing.T) {
-	apiCollector, _ := CreateTestApiCollector()
-	pages := make([]bool, TestTotalPage+1)
-	for i := 1; i <= TestTotalPage; i++ {
-		pages[i] = false
-	}
-
-	gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
-		page := reqData.Pager.Page
-		pages[page] = true
-		return nil
-	})
-	defer gf.Reset()
-
-	gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
-		items, err := collector.args.ResponseParser(res)
-		assert.Equal(t, err, nil)
-		for _, v := range items {
-			jsondata, err := json.Marshal(v)
-			assert.Equal(t, err, nil)
-			assert.Equal(t, string(jsondata), TestRawMessage)
-		}
-		assert.Equal(t, input, TestTableData)
-		AssertBaseResponse(t, res, &TestHttpResponse_Suc)
-		return len(items), nil
-	})
-	defer gs.Reset()
-
-	NeedWait := int64(0)
-	gad := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "Add", func(apiClient *ApiAsyncClient, delta int) {
-		atomic.AddInt64(&NeedWait, int64(delta))
-	})
-	defer gad.Reset()
-
-	gdo := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "Done", func(apiClient *ApiAsyncClient) {
-		atomic.AddInt64(&NeedWait, -1)
-	})
-	defer gdo.Reset()
 
-	// build request Input
-	reqData := new(RequestData)
-	reqData.Input = TestTableData
-	handle := apiCollector.handleResponseWithPages(reqData)
+	assert.Nil(t, err)
+	assert.Nil(t, collector.Execute())
 
-	// build data and url
-	resBase := TestHttpResponse_Suc
-	res := &resBase
-
-	AddBodyData(res, TestDataCount)
-	SetUrl(res, TestUrl)
-
-	// run testing
-	err := handle(res, nil)
-
-	// wait run finished
-	for atomic.LoadInt64(&NeedWait) > 0 {
-		time.Sleep(time.Millisecond)
-	}
-
-	assert.Equal(t, err, nil)
-	for i := 2; i <= TestTotalPage; i++ {
-		assert.True(t, pages[i], i)
-	}
-}
-
-func TestHandleResponseWithPages_Fail(t *testing.T) {
-	apiCollector, _ := CreateTestApiCollector()
-
-	gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
-		return TestError
-	})
-	defer gf.Reset()
-
-	gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
-		items, err := collector.args.ResponseParser(res)
-		assert.Equal(t, err, nil)
-		for _, v := range items {
-			jsondata, err := json.Marshal(v)
-			assert.Equal(t, err, nil)
-			assert.Equal(t, string(jsondata), TestRawMessage)
-		}
-		assert.Equal(t, input, TestTableData)
-		AssertBaseResponse(t, res, &TestHttpResponse_404)
-		return len(items), TestError
-	})
-	defer gs.Reset()
-
-	// build request Input
-	reqData := new(RequestData)
-	reqData.Input = TestTableData
-	handle := apiCollector.handleResponseWithPages(reqData)
-
-	// build data and url
-	resBase := TestHttpResponse_404
-	res := &resBase
-
-	AddBodyData(res, 0)
-	SetUrl(res, TestUrl)
-
-	// run testing
-	err := handle(res, nil)
-	assert.Equal(t, err, TestError)
-}
-
-func TestStepFetch(t *testing.T) {
-	apiCollector, _ := CreateTestApiCollector()
-
-	init := false
-	noFullTimes := 0
-
-	gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
-		items, err := collector.args.ResponseParser(res)
-		assert.Equal(t, err, nil)
-		for _, v := range items {
-			jsondata, err := json.Marshal(v)
-			assert.Equal(t, err, nil)
-			assert.Equal(t, string(jsondata), TestRawMessage)
-		}
-		// full page
-		assert.Equal(t, input, TestTableData)
-		AssertBaseResponse(t, res, &TestHttpResponse_Suc)
-		return len(items), nil
-	})
-	defer gs.Reset()
-
-	gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
-		resBase := TestHttpResponse_Suc
-		res := &resBase
-		SetUrl(res, TestUrl)
-
-		// full page for continue
-		if reqData.Pager.Page == TestPage {
-			init = true
-			assert.Equal(t, reqData.Pager.Skip, TestSkip)
-			assert.Equal(t, reqData.Pager.Size, TestSize)
-			AddBodyData(res, TestDataCount)
-		} else {
-			// not full page for stop
-			AddBodyData(res, TestDataCountNotFull)
-			noFullTimes++
-		}
-
-		go handler(res, nil)
-		return nil
-	})
-	defer gf.Reset()
-
-	// build request Input
-	reqData := new(RequestData)
-	reqData.Input = TestTableData
-	reqData.Pager = &Pager{
-		Page: TestPage,
-		Skip: TestSkip,
-		Size: TestSize,
-	}
-
-	// cancel can only be called when error occurs, because we are doomed anyway.
-	ctx, cancel := context.WithCancel(apiCollector.args.Ctx.GetContext())
-
-	// run testing
-	err := apiCollector.stepFetch(ctx, cancel, *reqData)
-
-	assert.Equal(t, noFullTimes, 1)
-	assert.Equal(t, init, true)
-	assert.Equal(t, err, nil)
-}
-
-func TestStepFetch_Fail(t *testing.T) {
-	apiCollector, _ := CreateTestApiCollector()
-
-	init := false
-	noFullTimes := 0
-
-	gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
-		items, err := collector.args.ResponseParser(res)
-		assert.Equal(t, err, nil)
-		for _, v := range items {
-			jsondata, err := json.Marshal(v)
-			assert.Equal(t, err, nil)
-			assert.Equal(t, string(jsondata), TestRawMessage)
-		}
-		// full page
-		assert.Equal(t, input, TestTableData)
-		AssertBaseResponse(t, res, &TestHttpResponse_404)
-		return len(items), nil
-	})
-	defer gs.Reset()
-
-	gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
-		resBase := TestHttpResponse_404
-		res := &resBase
-		SetUrl(res, TestUrl)
-		// full page for continue
-		if reqData.Pager.Page == TestPage {
-			init = true
-			assert.Equal(t, reqData.Pager.Skip, TestSkip)
-			assert.Equal(t, reqData.Pager.Size, TestSize)
-			AddBodyData(res, TestDataCount)
-			go handler(res, nil)
-		} else {
-			// not full page for stop
-			AddBodyData(res, TestDataCountNotFull)
-			noFullTimes++
-			return TestError
-		}
-		return nil
-	})
-	defer gf.Reset()
-
-	// build request Input
-	reqData := new(RequestData)
-	reqData.Input = TestTableData
-	reqData.Pager = &Pager{
-		Page: TestPage,
-		Skip: TestSkip,
-		Size: TestSize,
-	}
-
-	// cancel can only be called when error occurs, because we are doomed anyway.
-	ctx, cancel := context.WithCancel(apiCollector.args.Ctx.GetContext())
-
-	err := apiCollector.stepFetch(ctx, cancel, *reqData)
-
-	assert.Equal(t, noFullTimes, 1)
-	assert.Equal(t, init, true)
-	assert.Equal(t, err, TestError)
-}
-
-func TestStepFetch_Cancel(t *testing.T) {
-	apiCollector, _ := CreateTestApiCollector()
-
-	gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
-		items, err := collector.args.ResponseParser(res)
-		assert.Equal(t, err, nil)
-		for _, v := range items {
-			jsondata, err := json.Marshal(v)
-			assert.Equal(t, err, nil)
-			assert.Equal(t, string(jsondata), TestRawMessage)
-		}
-		// full page
-		assert.Equal(t, input, TestTableData)
-		AssertBaseResponse(t, res, &TestHttpResponse_Suc)
-		return len(items), nil
-	})
-	defer gs.Reset()
-
-	gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
-		resBase := TestHttpResponse_Suc
-		res := &resBase
-		SetUrl(res, TestUrl)
-		// always to continue
-		assert.Equal(t, reqData.Pager.Size, TestSize)
-		AddBodyData(res, TestDataCount)
-		go handler(res, nil)
-
-		return nil
-	})
-	defer gf.Reset()
-
-	// build request Input
-	reqData := new(RequestData)
-	reqData.Input = TestTableData
-	reqData.Pager = &Pager{
-		Page: TestPage,
-		Skip: TestSkip,
-		Size: TestSize,
-	}
-
-	// cancel can only be called when error occurs, because we are doomed anyway.
-
-	ctx, cancel := context.WithCancel(apiCollector.args.Ctx.GetContext())
-
-	go func() {
-		time.Sleep(time.Duration(500) * time.Microsecond)
-		Cancel()
-	}()
-
-	err := SetTimeOut(TestTimeOut, func() {
-		err := apiCollector.stepFetch(ctx, cancel, *reqData)
-		assert.Equal(t, err, fmt.Errorf("context canceled"))
-	})
-	assert.Equal(t, err, nil)
-
-}
-
-func TestExecWithOutPageSize(t *testing.T) {
-	apiCollector, _ := CreateTestApiCollector()
-	apiCollector.args.PageSize = 0
-
-	gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
-		assert.Equal(t, reqData.Input, TestTableData)
-		return nil
-	})
-	defer gf.Reset()
-
-	gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "stepFetch", func(collector *ApiCollector, ctx context.Context, cancel func(), reqData RequestData) error {
-		assert.Empty(t, TestNoRunHere)
-		return TestError
-	})
-	defer gs.Reset()
-
-	// run testing
-	err := apiCollector.exec(TestTableData)
-	assert.Equal(t, err, nil)
-}
-
-func TestExecWithGetTotalPages(t *testing.T) {
-	apiCollector, _ := CreateTestApiCollector()
-
-	gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
-		assert.Equal(t, reqData.Input, TestTableData)
-		return nil
-	})
-	defer gf.Reset()
-
-	gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "stepFetch", func(collector *ApiCollector, ctx context.Context, cancel func(), reqData RequestData) error {
-		assert.Empty(t, TestNoRunHere)
-		return TestError
-	})
-	defer gs.Reset()
-
-	// run testing
-	err := apiCollector.exec(TestTableData)
-	assert.Equal(t, err, nil)
-}
-
-func TestExecWithOutGetTotalPages(t *testing.T) {
-	apiCollector, _ := CreateTestApiCollector()
-	apiCollector.args.GetTotalPages = nil
-	apiCollector.args.Concurrency = TestTotalPage
-
-	pages := make([]bool, TestTotalPage+1)
-	for i := 1; i <= TestTotalPage; i++ {
-		pages[i] = false
-	}
-
-	gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
-		assert.Equal(t, reqData.Input, TestTableData)
-		return nil
-	})
-	defer gf.Reset()
-
-	gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "stepFetch", func(collector *ApiCollector, ctx context.Context, cancel func(), reqData RequestData) error {
-		assert.Equal(t, reqData.Input, TestTableData)
-		page := reqData.Pager.Page
-		pages[page] = true
-		assert.Equal(t, reqData.Pager.Size, apiCollector.args.PageSize)
-		assert.Equal(t, reqData.Pager.Skip, apiCollector.args.PageSize*(page-1))
-		return nil
-	})
-	defer gs.Reset()
-
-	// run testing
-	err := apiCollector.exec(TestTableData)
-	assert.Equal(t, err, nil)
-
-	for i := 2; i <= TestTotalPage; i++ {
-		assert.True(t, pages[i], i)
-	}
-}
-
-func TestExec_Cancel(t *testing.T) {
-	apiCollector, _ := CreateTestApiCollector()
-	apiCollector.args.GetTotalPages = nil
-	apiCollector.args.Concurrency = TestTotalPage
-
-	pages := make([]bool, TestTotalPage+1)
-	for i := 1; i <= TestTotalPage; i++ {
-		pages[i] = false
-	}
-
-	gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
-		assert.Equal(t, reqData.Input, TestTableData)
-		return nil
-	})
-	defer gf.Reset()
-
-	gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "stepFetch", func(collector *ApiCollector, ctx context.Context, cancel func(), reqData RequestData) error {
-		assert.Equal(t, reqData.Input, TestTableData)
-		page := reqData.Pager.Page
-		pages[page] = true
-		assert.Equal(t, reqData.Pager.Size, apiCollector.args.PageSize)
-		assert.Equal(t, reqData.Pager.Skip, apiCollector.args.PageSize*(page-1))
-
-		// check if it can get cancel command
-		for range ctx.Done() {
-		}
-
-		return nil
-	})
-	defer gs.Reset()
-
-	go func() {
-		time.Sleep(time.Duration(500) * time.Microsecond)
-		Cancel()
-	}()
-
-	err := SetTimeOut(TestTimeOut, func() {
-		// run testing
-		err := apiCollector.exec(TestTableData)
-		assert.Equal(t, err, nil)
-		for i := 2; i <= TestTotalPage; i++ {
-			assert.True(t, pages[i], i)
-		}
-	})
-	assert.Equal(t, err, nil)
-}
-
-func TestExecute(t *testing.T) {
-	MockDB(t)
-	defer UnMockDB()
-	apiCollector, _ := CreateTestApiCollector()
-
-	apiCollector.args.Input = &TestIterator{
-		data:         *TestTableData,
-		count:        TestDataCount,
-		hasNextTimes: 0,
-		fetchTimes:   0,
-		closeTimes:   0,
-		unlimit:      false,
-	}
-
-	gt.Reset()
-	gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
-		assert.Equal(t, name, "_raw_"+TestTableData.TableName())
-		return db
-	},
-	)
-
-	NeedWait := int64(0)
-	execTimes := 0
-
-	ge := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "exec", func(collector *ApiCollector, input interface{}) error {
-		atomic.AddInt64(&NeedWait, 1)
-		execTimes++
-		assert.Equal(t, input.(*TestTable).Email, TestTableData.Email)
-		assert.Equal(t, input.(*TestTable).Name, TestTableData.Name)
-		atomic.AddInt64(&NeedWait, -1)
-		return nil
-	})
-	defer ge.Reset()
-
-	gw := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "WaitAsync", func(apiClient *ApiAsyncClient) error {
-		for atomic.LoadInt64(&NeedWait) > 0 {
-			time.Sleep(time.Millisecond)
-		}
-		return nil
-	})
-	defer gw.Reset()
-
-	// run testing
-	err := apiCollector.Execute()
-	assert.Equal(t, err, nil)
-	assert.Equal(t, execTimes, TestDataCount)
-
-	input := apiCollector.args.Input.(*TestIterator)
-	assert.Equal(t, input.fetchTimes, TestDataCount)
-	assert.Equal(t, input.hasNextTimes >= input.fetchTimes, true)
-	assert.Equal(t, input.closeTimes > 0, true)
-}
-
-func TestExecute_Cancel(t *testing.T) {
-	MockDB(t)
-	defer UnMockDB()
-	apiCollector, _ := CreateTestApiCollector()
-
-	apiCollector.args.Input = &TestIterator{
-		data:         *TestTableData,
-		count:        TestDataCount,
-		hasNextTimes: 0,
-		fetchTimes:   0,
-		closeTimes:   0,
-		unlimit:      true,
-	}
-
-	apiCollector.args.Input.HasNext()
-
-	gt.Reset()
-	gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
-		assert.Equal(t, name, "_raw_"+TestTableData.TableName())
-		return db
-	},
-	)
-
-	NeedWait := int64(0)
-	execTimes := 0
-
-	ge := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "exec", func(collector *ApiCollector, input interface{}) error {
-		atomic.AddInt64(&NeedWait, 1)
-		execTimes++
-		assert.Equal(t, input.(*TestTable).Email, TestTableData.Email)
-		assert.Equal(t, input.(*TestTable).Name, TestTableData.Name)
-		atomic.AddInt64(&NeedWait, -1)
-		return nil
-	})
-	defer ge.Reset()
-
-	gw := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "WaitAsync", func(apiClient *ApiAsyncClient) error {
-		for atomic.LoadInt64(&NeedWait) > 0 {
-			time.Sleep(time.Millisecond)
-		}
-		return nil
-	})
-	defer gw.Reset()
-
-	go func() {
-		time.Sleep(time.Duration(500) * time.Microsecond)
-		Cancel()
-	}()
-
-	err := SetTimeOut(TestTimeOut, func() {
-		// run testing
-		err := apiCollector.Execute()
-		assert.Equal(t, err, fmt.Errorf("context canceled"))
-
-		input := apiCollector.args.Input.(*TestIterator)
-		assert.Equal(t, input.hasNextTimes >= input.fetchTimes, true)
-		assert.Equal(t, input.closeTimes > 0, true)
-	})
-	assert.Equal(t, err, nil)
-}
-
-func TestExecute_Total(t *testing.T) {
-	MockDB(t)
-	defer UnMockDB()
-	apiCollector, _ := CreateTestApiCollector()
-	// less count for more quick test
-	TestDataCount = 10
-	// ReLimit the workNum to test the block
-	reWorkNum := 1
-
-	apiCollector.args.Input = &TestIterator{
-		data:         *TestTableData,
-		count:        TestDataCount,
-		hasNextTimes: 0,
-		fetchTimes:   0,
-		closeTimes:   0,
-		unlimit:      false,
-	}
-
-	gt.Reset()
-	gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
-		assert.Equal(t, name, "_raw_"+TestTableData.TableName())
-		return db
-	},
-	)
-	defer gw.Reset()
-
-	gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
-		items, err := collector.args.ResponseParser(res)
-		assert.Equal(t, err, nil)
-		for _, v := range items {
-			jsondata, err := json.Marshal(v)
-			assert.Equal(t, err, nil)
-			assert.Equal(t, string(jsondata), TestRawMessage)
-		}
-		assert.Equal(t, input, TestTableData)
-		AssertBaseResponse(t, res, &TestHttpResponse_Suc)
-		return len(items), nil
-	})
-	defer gs.Reset()
-
-	gin := gomonkey.ApplyMethod(reflect.TypeOf(&logger.DefaultLogger{}), "Info", func(_ *logger.DefaultLogger, _ string, _ ...interface{}) {
-	})
-	defer gin.Reset()
-
-	gdo := gomonkey.ApplyMethod(reflect.TypeOf(&ApiClient{}), "Do", func(
-		apiClient *ApiClient,
-		method string,
-		path string,
-		query url.Values,
-		body interface{},
-		headers http.Header,
-	) (*http.Response, error) {
-		res := TestHttpResponse_Suc
-
-		AddBodyData(&res, TestDataCount)
-		SetUrl(&res, TestUrl)
-
-		return &res, nil
-	})
-	defer gdo.Reset()
-
-	var gse *gomonkey.Patches
-	gse = gomonkey.ApplyFunc(NewWorkerScheduler, func(workerNum int, maxWork int, maxWorkDuration time.Duration, ctx context.Context, maxRetry int) (*WorkerScheduler, error) {
-		gse.Reset()
-		workerNum = reWorkNum
-		return NewWorkerScheduler(workerNum, maxWork, maxWorkDuration, ctx, maxRetry)
-	})
-	defer gse.Reset()
-
-	// create rate limit calculator
-	rateLimiter := &ApiRateLimitCalculator{
-		UserRateLimitPerHour: 360000000, // 100000 times each seconed
-	}
-
-	apiCollector.args.ApiClient, _ = CreateTestAsyncApiClientWithRateLimitAndCtx(t, rateLimiter, apiCollector.args.Ctx.GetContext())
-
-	err := SetTimeOut(TestTimeOut, func() {
-		err := apiCollector.Execute()
-		assert.Equal(t, err, nil)
-
-		input := apiCollector.args.Input.(*TestIterator)
-		assert.Equal(t, input.fetchTimes, TestDataCount)
-		assert.Equal(t, input.hasNextTimes >= input.fetchTimes, true)
-		assert.Equal(t, input.closeTimes > 0, true)
-	})
-	assert.Equal(t, err, nil)
+	mockDal.AssertExpectations(t)
 }
diff --git a/plugins/helper/api_extractor.go b/plugins/helper/api_extractor.go
index c96e33ed..fc09c27e 100644
--- a/plugins/helper/api_extractor.go
+++ b/plugins/helper/api_extractor.go
@@ -26,13 +26,10 @@ import (
 	"github.com/apache/incubator-devlake/plugins/core"
 )
 
-// Accept raw json body and params, return list of entities that need to be stored
-type RawDataExtractor func(row *RawData) ([]interface{}, error)
-
 type ApiExtractorArgs struct {
 	RawDataSubTaskArgs
 	Params    interface{}
-	Extract   RawDataExtractor
+	Extract   func(row *RawData) ([]interface{}, error)
 	BatchSize int
 }
 
diff --git a/plugins/helper/api_extractor_test.go b/plugins/helper/api_extractor_test.go.old
similarity index 100%
rename from plugins/helper/api_extractor_test.go
rename to plugins/helper/api_extractor_test.go.old
diff --git a/plugins/helper/batch_save_divider_test.go b/plugins/helper/batch_save_divider_test.go.old
similarity index 100%
rename from plugins/helper/batch_save_divider_test.go
rename to plugins/helper/batch_save_divider_test.go.old
diff --git a/plugins/helper/batch_save_test.go b/plugins/helper/batch_save_test.go.old
similarity index 100%
rename from plugins/helper/batch_save_test.go
rename to plugins/helper/batch_save_test.go.old
diff --git a/plugins/helper/subtask_flow_test_helper.go b/plugins/helper/common/callbacks.go
similarity index 90%
rename from plugins/helper/subtask_flow_test_helper.go
rename to plugins/helper/common/callbacks.go
index 4723e764..e7dd5633 100644
--- a/plugins/helper/subtask_flow_test_helper.go
+++ b/plugins/helper/common/callbacks.go
@@ -15,4 +15,8 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package helper
+package common
+
+import "net/http"
+
+type ApiAsyncCallback func(*http.Response) error
diff --git a/plugins/helper/connection.go b/plugins/helper/connection.go
index 30e9ab0d..26379be5 100644
--- a/plugins/helper/connection.go
+++ b/plugins/helper/connection.go
@@ -20,15 +20,15 @@ package helper
 import (
 	"encoding/base64"
 	"fmt"
-	"github.com/apache/incubator-devlake/config"
+	"reflect"
+	"strconv"
+
 	"github.com/apache/incubator-devlake/models/common"
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/go-playground/validator/v10"
 	"github.com/mitchellh/mapstructure"
 	"gorm.io/gorm"
 	"gorm.io/gorm/clause"
-	"reflect"
-	"strconv"
 )
 
 type BaseConnection struct {
@@ -56,183 +56,173 @@ type RestConnection struct {
 	RateLimit      int    `comment:"api request rate limt per hour" json:"rateLimit"`
 }
 
-// CreateConnection populate from request input into connection which come from REST functions to connection struct and save to DB
-// and only change value which `data` has
-// mergeFieldsToConnection merges fields from data
-// `connection` is the pointer of a plugin connection
-// `data` is http request input param
-func CreateConnection(data map[string]interface{}, connection interface{}, db *gorm.DB) error {
-	var err error
-	// update fields from request body
-	err = mergeFieldsToConnection(connection, data)
-	if err != nil {
-		return err
+type ConnectionApiHelper struct {
+	encKey    string
+	log       core.Logger
+	db        *gorm.DB
+	validator *validator.Validate
+}
+
+func NewConnectionHelper(
+	basicRes core.BasicRes,
+	vld *validator.Validate,
+) *ConnectionApiHelper {
+	if vld == nil {
+		vld = validator.New()
+	}
+	return &ConnectionApiHelper{
+		encKey:    basicRes.GetConfig(core.EncodeKeyEnvStr),
+		log:       basicRes.GetLogger(),
+		db:        basicRes.GetDb(),
+		validator: vld,
 	}
-	err = saveToDb(connection, db)
+}
+
+// Create a connection record based on request body
+func (c *ConnectionApiHelper) Create(connection interface{}, input *core.ApiResourceInput) error {
+	// update fields from request body
+	err := c.merge(connection, input.Body)
 	if err != nil {
 		return err
 	}
-	return nil
+	return c.save(connection)
 }
 
-func PatchConnection(input *core.ApiResourceInput, connection interface{}, db *gorm.DB) error {
-	err := GetConnection(input.Params, connection, db)
+func (c *ConnectionApiHelper) Patch(connection interface{}, input *core.ApiResourceInput) error {
+	err := c.First(connection, input.Params)
 	if err != nil {
 		return err
 	}
 
-	err = CreateConnection(input.Body, connection, db)
+	err = c.merge(connection, input.Body)
 	if err != nil {
 		return err
 	}
-
-	return nil
+	return c.save(connection)
 }
 
-func saveToDb(connection interface{}, db *gorm.DB) error {
-	err := EncryptConnection(connection)
-	if err != nil {
-		return err
+// First finds connection from db  by parsing request input and decrypt it
+func (c *ConnectionApiHelper) First(connection interface{}, params map[string]string) error {
+	connectionId := params["connectionId"]
+	if connectionId == "" {
+		return fmt.Errorf("missing connectionId")
+	}
+	id, err := strconv.ParseUint(connectionId, 10, 64)
+	if err != nil || id < 1 {
+		return fmt.Errorf("invalid connectionId")
 	}
-	err = db.Clauses(clause.OnConflict{UpdateAll: true}).Save(connection).Error
+	return c.FirstById(connection, id)
+}
+
+func (c *ConnectionApiHelper) FirstById(connection interface{}, id uint64) error {
+	err := c.db.First(connection, "id = ?", id).Error
 	if err != nil {
 		return err
 	}
-
-	return DecryptConnection(connection)
+	c.decrypt(connection)
+	return nil
 }
 
-// mergeFieldsToConnection will populate all value in map to connection struct and validate the struct
-func mergeFieldsToConnection(specificConnection interface{}, connections ...map[string]interface{}) error {
-	// decode
-	for _, connection := range connections {
-		err := mapstructure.Decode(connection, specificConnection)
-		if err != nil {
-			return err
-		}
-	}
-	// validate
-	vld := validator.New()
-	err := vld.Struct(specificConnection)
+// List returns all connections with password/token decrypted
+func (c *ConnectionApiHelper) List(connections interface{}) error {
+	err := c.db.Find(connections).Error
 	if err != nil {
 		return err
 	}
-
+	conns := reflect.ValueOf(connections).Elem()
+	for i := 0; i < conns.Len(); i++ {
+		c.decrypt(conns.Index(i).Addr().Interface())
+	}
 	return nil
 }
 
-func getEncKey() (string, error) {
-	v := config.GetConfig()
-	encKey := v.GetString(core.EncodeKeyEnvStr)
-	if encKey == "" {
-		// Randomly generate a bunch of encryption keys and set them to config
-		encKey = core.RandomEncKey()
-		v.Set(core.EncodeKeyEnvStr, encKey)
-		err := config.WriteConfig(v)
-		if err != nil {
-			return encKey, err
-		}
-	}
-	return encKey, nil
+// Delete connection
+func (c *ConnectionApiHelper) Delete(connection interface{}) error {
+	return c.db.Delete(connection).Error
 }
 
-// GetConnection finds connection from db  by parsing request input and decrypt it
-func GetConnection(data map[string]string, connection interface{}, db *gorm.DB) error {
-	id, err := GetConnectionIdByInputParam(data)
+func (c *ConnectionApiHelper) merge(connection interface{}, body map[string]interface{}) error {
+	// merge
+	err := mapstructure.Decode(body, connection)
 	if err != nil {
-		return fmt.Errorf("invalid connectionId")
+		return err
 	}
-
-	err = db.First(connection, id).Error
+	// validate
+	err = c.validator.Struct(connection)
 	if err != nil {
 		return err
 	}
 
-	return DecryptConnection(connection)
-
+	return nil
 }
 
-// ListConnections returns all connections with password/token decrypted
-func ListConnections(connections interface{}, db *gorm.DB) error {
-	err := db.Find(connections).Error
-	connPtr := reflect.ValueOf(connections)
-	connVal := reflect.Indirect(connPtr)
+func (c *ConnectionApiHelper) save(connection interface{}) error {
+	c.encrypt(connection)
+
+	err := c.db.Clauses(clause.OnConflict{UpdateAll: true}).Create(connection).Error
 	if err != nil {
 		return err
 	}
-	for i := 0; i < connVal.Len(); i++ {
-		//connVal.Index(i) returns value of ith elem in connections, .Elem() reutrns the original elem
-		tmp := connVal.Index(i).Elem()
-		err = DecryptConnection(tmp.Addr().Interface())
-		if err != nil {
-			return err
-		}
-	}
-	return nil
-}
 
-// GetConnectionIdByInputParam gets connectionId by parsing request input
-func GetConnectionIdByInputParam(data map[string]string) (uint64, error) {
-	connectionId := data["connectionId"]
-	if connectionId == "" {
-		return 0, fmt.Errorf("missing connectionId")
-	}
-	return strconv.ParseUint(connectionId, 10, 64)
+	c.decrypt(connection)
+	return nil
 }
 
-func firstFieldNameWithTag(t reflect.Type, tag string) string {
-	fieldName := ""
-	for i := 0; i < t.NumField(); i++ {
-		field := t.Field(i)
-		if field.Type.Kind() == reflect.Struct {
-			fieldName = firstFieldNameWithTag(field.Type, tag)
-		} else {
-			if field.Tag.Get(tag) == "yes" {
-				fieldName = field.Name
-			}
-		}
+func (c *ConnectionApiHelper) decrypt(connection interface{}) {
+	err := UpdateEncryptFields(connection, func(encrypted string) (string, error) {
+		return core.Decrypt(c.encKey, encrypted)
+	})
+	if err != nil {
+		c.log.Error("failed to decrypt: %w", err)
 	}
-	return fieldName
 }
 
-// DecryptConnection decrypts password/token field for connection
-func DecryptConnection(connection interface{}) error {
-	dataVal := reflect.ValueOf(connection)
-	if dataVal.Kind() != reflect.Ptr {
-		panic("connection is not a pointer")
-	}
-	encKey, err := getEncKey()
+func (c *ConnectionApiHelper) encrypt(connection interface{}) {
+	err := UpdateEncryptFields(connection, func(plaintext string) (string, error) {
+		return core.Encrypt(c.encKey, plaintext)
+	})
 	if err != nil {
-		return nil
+		c.log.Error("failed to encrypt: %w", err)
 	}
-	dataType := reflect.Indirect(dataVal).Type()
-	fieldName := firstFieldNameWithTag(dataType, "encrypt")
-	if len(fieldName) > 0 {
-		decryptStr, _ := core.Decrypt(encKey, dataVal.Elem().FieldByName(fieldName).String())
-		dataVal.Elem().FieldByName(fieldName).Set(reflect.ValueOf(decryptStr))
-	}
-	return nil
 }
 
-func EncryptConnection(connection interface{}) error {
-	dataVal := reflect.ValueOf(connection)
-	if dataVal.Kind() != reflect.Ptr {
-		panic("connection is not a pointer")
+// UpdateEncryptFields update fields of val with tag `encrypt:"yes|true"`
+func UpdateEncryptFields(val interface{}, update func(in string) (string, error)) error {
+	v := reflect.ValueOf(val)
+	if v.Kind() != reflect.Ptr {
+		panic(fmt.Errorf("val is not a pointer: %v", val))
 	}
-	encKey, err := getEncKey()
-	if err != nil {
-		return err
+	e := v.Elem()
+	if e.Kind() != reflect.Struct {
+		panic(fmt.Errorf("*val is not a struct: %v", val))
 	}
-	dataType := reflect.Indirect(dataVal).Type()
-	fieldName := firstFieldNameWithTag(dataType, "encrypt")
-	if len(fieldName) > 0 {
-		plainPwd := dataVal.Elem().FieldByName(fieldName).String()
-		encyptedStr, err := core.Encrypt(encKey, plainPwd)
-
-		if err != nil {
-			return err
+	t := e.Type()
+	for i := 0; i < t.NumField(); i++ {
+		field := t.Field(i)
+		if !field.IsExported() {
+			continue
+		}
+		if field.Type.Kind() == reflect.Struct {
+			err := UpdateEncryptFields(e.Field(i).Addr().Interface(), update)
+			if err != nil {
+				return err
+			}
+		} else if field.Type.Kind() == reflect.Ptr && field.Type.Elem().Kind() == reflect.Struct {
+			fmt.Printf("field : %v\n", e.Field(i).Interface())
+			err := UpdateEncryptFields(e.Field(i).Interface(), update)
+			if err != nil {
+				return err
+			}
+		} else if field.Type.Kind() == reflect.String {
+			tagValue := field.Tag.Get("encrypt")
+			if tagValue == "yes" || tagValue == "true" {
+				out, err := update(e.Field(i).String())
+				if err != nil {
+					return err
+				}
+				e.Field(i).Set(reflect.ValueOf(out))
+			}
 		}
-		dataVal.Elem().FieldByName(fieldName).Set(reflect.ValueOf(encyptedStr))
 	}
 	return nil
 }
diff --git a/plugins/helper/connection_test.go b/plugins/helper/connection_test.go
index bd1c7cc2..f04f4a92 100644
--- a/plugins/helper/connection_test.go
+++ b/plugins/helper/connection_test.go
@@ -18,149 +18,92 @@ limitations under the License.
 package helper
 
 import (
-	"github.com/stretchr/testify/assert"
-	"reflect"
+	"fmt"
 	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
 )
 
-type MockConnection struct {
-	RestConnection             `mapstructure:",squash"`
-	BasicAuth                  `mapstructure:",squash"`
-	StoryPointField            string `gorm:"type:varchar(50);" json:"storyPointField"`
-	RemotelinkCommitShaPattern string `gorm:"type:varchar(255);comment='golang regexp, the first group will be recognized as commit sha, ref https://github.com/google/re2/wiki/Syntax'" json:"remotelinkCommitShaPattern"`
+type MockAuth struct {
+	Username string
+	Password string `encrypt:"yes"`
 }
 
-func (MockConnection) TableName() string {
-	return "_tool_jira_connections"
+type MockConnection struct {
+	MockAuth
+	Name      string `mapstructure:"name"`
+	BasicAuth string `encrypt:"true"`
+	BearToken struct {
+		AccessToken string `encrypt:"true"`
+	}
+	MockAuth2 *MockAuth
+	Age       int
+	Since     *time.Time
 }
 
+/*
 func TestMergeFieldsToConnection(t *testing.T) {
 	v := &MockConnection{
-		RestConnection: RestConnection{
-			BaseConnection: BaseConnection{
-				Name: "1",
-			},
-			Endpoint:  "2",
-			Proxy:     "3",
-			RateLimit: 0,
+		Name: "1",
+		BearToken: struct {
+			AccessToken string "encrypt:\"true\""
+		}{
+			AccessToken: "2",
 		},
-		BasicAuth: BasicAuth{
-			Username: "4",
-			Password: "5",
+		MockAuth: &MockAuth{
+			Username: "3",
+			Password: "4",
 		},
-		RemotelinkCommitShaPattern: "8",
+		Age: 5,
 	}
 	data := make(map[string]interface{})
-	data["Endpoint"] = "2-2"
-	data["Username"] = "4-4"
-	data["Password"] = "5-5"
+	data["name"] = "1a"
+	data["BasicAuth"] = map[string]interface{}{
+		"AccessToken": "2a",
+	}
+	data["Username"] = "3a"
 
 	err := mergeFieldsToConnection(v, data)
 	assert.Nil(t, err)
 
-	assert.Equal(t, "4-4", v.Username)
-	assert.Equal(t, "2-2", v.Endpoint)
-	assert.Equal(t, "5-5", v.Password)
+	assert.Equal(t, "1a", v.Name)
+	assert.Equal(t, "2a", v.BearToken.AccessToken)
+	assert.Equal(t, "3a", v.Username)
+	assert.Equal(t, "4", v.Password)
+	assert.Equal(t, 5, v.Age)
 }
+*/
 
-func TestDecryptAndEncrypt(t *testing.T) {
+func TestUpdateEncryptFields(t *testing.T) {
+	sinc := time.Now()
 	v := &MockConnection{
-		RestConnection: RestConnection{
-			BaseConnection: BaseConnection{
-				Name: "1",
-			},
-			Endpoint:  "2",
-			Proxy:     "3",
-			RateLimit: 0,
+		MockAuth: MockAuth{
+			Username: "1",
+			Password: "2",
 		},
-		BasicAuth: BasicAuth{
-			Username: "4",
-			Password: "5",
-		},
-		RemotelinkCommitShaPattern: "8",
-	}
-	err := EncryptConnection(v)
-	assert.Nil(t, err)
-
-	assert.NotEqual(t, "5", v.Password)
-	err = DecryptConnection(v)
-	assert.Nil(t, err)
-
-	assert.Equal(t, "5", v.Password)
-
-}
-
-func TestDecryptConnection(t *testing.T) {
-	v := &MockConnection{
-		RestConnection: RestConnection{
-			BaseConnection: BaseConnection{
-				Name: "1",
-			},
-			Endpoint:  "2",
-			Proxy:     "3",
-			RateLimit: 0,
+		Name: "3",
+		BearToken: struct {
+			AccessToken string `encrypt:"true"`
+		}{
+			AccessToken: "4",
 		},
-		BasicAuth: BasicAuth{
-			Username: "4",
-			Password: "5",
+		MockAuth2: &MockAuth{
+			Username: "5",
+			Password: "6",
 		},
-		RemotelinkCommitShaPattern: "8",
+		Age:   7,
+		Since: &sinc,
 	}
-	err := EncryptConnection(v)
-	assert.Nil(t, err)
-
-	encryptedPwd := v.Password
-	err = DecryptConnection(v)
+	err := UpdateEncryptFields(v, func(in string) (string, error) {
+		return fmt.Sprintf("%s-asdf", in), nil
+	})
 	assert.Nil(t, err)
-	assert.NotEqual(t, encryptedPwd, v.Password)
-	assert.Equal(t, "5", v.Password)
+	assert.Equal(t, "1", v.Username)
+	assert.Equal(t, "2-asdf", v.Password)
+	assert.Equal(t, "3", v.Name)
+	assert.Equal(t, "4-asdf", v.BearToken.AccessToken)
+	assert.Equal(t, "5", v.MockAuth2.Username)
+	assert.Equal(t, "6-asdf", v.MockAuth2.Password)
+	assert.Equal(t, 7, v.Age)
 }
-
-func TestFirstFieldNameWithTag(t *testing.T) {
-	v := &MockConnection{
-		RestConnection: RestConnection{
-			BaseConnection: BaseConnection{
-				Name: "1",
-			},
-			Endpoint:  "2",
-			Proxy:     "3",
-			RateLimit: 0,
-		},
-		BasicAuth: BasicAuth{
-			Username: "4",
-			Password: "5",
-		},
-		StoryPointField:            "7",
-		RemotelinkCommitShaPattern: "8",
-	}
-	dataVal := reflect.ValueOf(v)
-	dataType := reflect.Indirect(dataVal).Type()
-	fieldName := firstFieldNameWithTag(dataType, "encrypt")
-	assert.Equal(t, "Password", fieldName)
-}
-
-//func TestListConnections(t *testing.T) {
-//	jiraConnections := make([]*MockConnection, 0)
-//	cfg := config.GetConfig()
-//	dbUrl := cfg.GetString("DB_URL")
-//	u, err := url.Parse(dbUrl)
-//	dbUrl = fmt.Sprintf("%s@tcp(%s)%s?%s", u.User.String(), u.Host, u.Path, u.RawQuery)
-//	dbConfig := &gorm.Config{
-//		Logger: gormLogger.New(
-//			log.Default(),
-//			gormLogger.Config{
-//				SlowThreshold:             time.Second,      // Slow SQL threshold
-//				LogLevel:                  gormLogger.Error, // Log level
-//				IgnoreRecordNotFoundError: true,             // Ignore ErrRecordNotFound error for logger
-//				Colorful:                  true,             // Disable color
-//			},
-//		),
-//		// most of our operation are in batch, this can improve performance
-//		PrepareStmt: true,
-//	}
-//	db, err := gorm.Open(mysql.Open(dbUrl), dbConfig)
-//
-//	err = ListConnections(&jiraConnections, db)
-//
-//	assert.Nil(t, err)
-//}
diff --git a/plugins/helper/data_convertor_test.go b/plugins/helper/data_convertor_test.go.old
similarity index 100%
rename from plugins/helper/data_convertor_test.go
rename to plugins/helper/data_convertor_test.go.old
diff --git a/plugins/helper/default_task_context.go b/plugins/helper/default_task_context.go
index 3b14b0d0..9f4b6747 100644
--- a/plugins/helper/default_task_context.go
+++ b/plugins/helper/default_task_context.go
@@ -24,7 +24,9 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/apache/incubator-devlake/impl/dalgorm"
 	"github.com/apache/incubator-devlake/plugins/core"
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"github.com/spf13/viper"
 	"gorm.io/gorm"
 )
@@ -32,11 +34,45 @@ import (
 // bridge to current implementation at this point
 // TODO: implement another TaskContext for distributed runner/worker
 
+type DefaultBasicRes struct {
+	cfg    *viper.Viper
+	logger core.Logger
+	db     *gorm.DB
+	dal    dal.Dal
+}
+
+func (c *DefaultBasicRes) GetConfig(name string) string {
+	return c.cfg.GetString(name)
+}
+
+func (c *DefaultBasicRes) GetDb() *gorm.DB {
+	return c.db
+}
+
+func (c *DefaultBasicRes) GetDal() dal.Dal {
+	return c.dal
+}
+
+func (c *DefaultBasicRes) GetLogger() core.Logger {
+	return c.logger
+}
+
+func NewDefaultBasicRes(
+	cfg *viper.Viper,
+	logger core.Logger,
+	db *gorm.DB,
+) *DefaultBasicRes {
+	return &DefaultBasicRes{
+		cfg:    cfg,
+		logger: logger,
+		db:     db,
+		dal:    dalgorm.NewDalgorm(db),
+	}
+}
+
 // shared by TasContext and SubTaskContext
 type defaultExecContext struct {
-	cfg      *viper.Viper
-	logger   core.Logger
-	db       *gorm.DB
+	*DefaultBasicRes
 	ctx      context.Context
 	name     string
 	data     interface{}
@@ -56,9 +92,11 @@ func newDefaultExecContext(
 	progress chan core.RunningProgress,
 ) *defaultExecContext {
 	return &defaultExecContext{
-		cfg:      cfg,
-		logger:   logger,
-		db:       db,
+		DefaultBasicRes: NewDefaultBasicRes(
+			cfg,
+			logger,
+			db,
+		),
 		ctx:      ctx,
 		name:     name,
 		data:     data,
@@ -70,14 +108,6 @@ func (c *defaultExecContext) GetName() string {
 	return c.name
 }
 
-func (c *defaultExecContext) GetConfig(name string) string {
-	return c.cfg.GetString(name)
-}
-
-func (c *defaultExecContext) GetDb() *gorm.DB {
-	return c.db
-}
-
 func (c *defaultExecContext) GetContext() context.Context {
 	return c.ctx
 }
@@ -86,10 +116,6 @@ func (c *defaultExecContext) GetData() interface{} {
 	return c.data
 }
 
-func (c *defaultExecContext) GetLogger() core.Logger {
-	return c.logger
-}
-
 func (c *defaultExecContext) SetProgress(progressType core.ProgressType, current int, total int) {
 	c.current = int64(current)
 	c.total = total
@@ -160,7 +186,7 @@ type DefaultSubTaskContext struct {
 func (c *DefaultSubTaskContext) SetProgress(current int, total int) {
 	c.defaultExecContext.SetProgress(core.SubTaskSetProgress, current, total)
 	if total > -1 {
-		c.logger.Info("total records: %d", c.total)
+		c.logger.Info("total jobs: %d", c.total)
 	}
 }
 
diff --git a/plugins/helper/iterator.go b/plugins/helper/iterator.go
index d2cd5b24..b84b500b 100644
--- a/plugins/helper/iterator.go
+++ b/plugins/helper/iterator.go
@@ -22,6 +22,7 @@ import (
 	"reflect"
 	"time"
 
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"gorm.io/gorm"
 )
 
@@ -31,12 +32,14 @@ type Iterator interface {
 	Close() error
 }
 
+// Deprecated: use DalCursorIterator instead
 type CursorIterator struct {
 	db       *gorm.DB
 	cursor   *sql.Rows
 	elemType reflect.Type
 }
 
+// Deprecated: use NewDalCursorIterator instead
 func NewCursorIterator(db *gorm.DB, cursor *sql.Rows, elemType reflect.Type) (*CursorIterator, error) {
 	return &CursorIterator{
 		db:       db,
@@ -64,6 +67,41 @@ func (c *CursorIterator) Close() error {
 
 var _ Iterator = (*CursorIterator)(nil)
 
+// DalCursorIterator
+type DalCursorIterator struct {
+	db       dal.Dal
+	cursor   *sql.Rows
+	elemType reflect.Type
+}
+
+func NewDalCursorIterator(db dal.Dal, cursor *sql.Rows, elemType reflect.Type) (*DalCursorIterator, error) {
+	return &DalCursorIterator{
+		db:       db,
+		cursor:   cursor,
+		elemType: elemType,
+	}, nil
+}
+
+func (c *DalCursorIterator) HasNext() bool {
+	return c.cursor.Next()
+}
+
+func (c *DalCursorIterator) Fetch() (interface{}, error) {
+	elem := reflect.New(c.elemType).Interface()
+	err := c.db.Fetch(c.cursor, elem)
+	if err != nil {
+		return nil, err
+	}
+	return elem, nil
+}
+
+func (c *DalCursorIterator) Close() error {
+	return c.cursor.Close()
+}
+
+var _ Iterator = (*DalCursorIterator)(nil)
+
+// DateIterator
 type DateIterator struct {
 	startTime time.Time
 	endTime   time.Time
diff --git a/plugins/helper/worker_scheduler.go b/plugins/helper/worker_scheduler.go
index e42bfe33..71d7574e 100644
--- a/plugins/helper/worker_scheduler.go
+++ b/plugins/helper/worker_scheduler.go
@@ -22,133 +22,155 @@ import (
 	"fmt"
 	"os"
 	"sync"
+	"sync/atomic"
 	"time"
 
+	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/apache/incubator-devlake/utils"
 	"github.com/panjf2000/ants/v2"
 )
 
+// WorkerScheduler runs asynchronous tasks in parallel with throttling support
 type WorkerScheduler struct {
-	waitGroup    *sync.WaitGroup
+	waitGroup    sync.WaitGroup
 	pool         *ants.Pool
-	subPool      *ants.Pool
 	ticker       *time.Ticker
-	workerErrors *[]error
+	workerErrors []error
 	ctx          context.Context
+	mu           sync.Mutex
+	counter      int32
+	logger       core.Logger
 }
 
-// NewWorkerScheduler 创建一个并行执行的调度器,控制最大运行数和每秒最大运行数量
-// NewWorkerScheduler Create a parallel scheduler to control the maximum number of runs and the maximum number of runs per second
-// 注意: task执行是无序的
-// Warning: task execution is out of order
-func NewWorkerScheduler(workerNum int, maxWork int, maxWorkDuration time.Duration, ctx context.Context, maxRetry int) (*WorkerScheduler, error) {
-	var waitGroup sync.WaitGroup
-	workerErrors := make([]error, 0)
-	pWorkerErrors := &workerErrors
-	var mux sync.Mutex
-	pool, err := ants.NewPool(workerNum, ants.WithPanicHandler(func(i interface{}) {
-		mux.Lock()
-		defer mux.Unlock()
-		workerErrors = append(*pWorkerErrors, i.(error))
-		pWorkerErrors = &workerErrors
-	}))
-	if err != nil {
-		return nil, err
+var callframeEnabled = os.Getenv("ASYNC_CF") == "true"
+
+// NewWorkerScheduler creates a WorkerScheduler
+func NewWorkerScheduler(
+	workerNum int,
+	maxWork int,
+	maxWorkDuration time.Duration,
+	ctx context.Context,
+	maxRetry int,
+	logger core.Logger,
+) (*WorkerScheduler, error) {
+	if maxWork <= 0 {
+		return nil, fmt.Errorf("maxWork less than 1")
+	}
+	if maxWorkDuration <= 0 {
+		return nil, fmt.Errorf("maxWorkDuration less than 1")
 	}
-	subPool, err := ants.NewPool(workerNum*maxRetry, ants.WithPanicHandler(func(i interface{}) {
-		mux.Lock()
-		defer mux.Unlock()
-		workerErrors = append(*pWorkerErrors, i.(error))
-		pWorkerErrors = &workerErrors
+	s := &WorkerScheduler{
+		ctx:    ctx,
+		ticker: time.NewTicker(maxWorkDuration / time.Duration(maxWork)),
+		logger: logger,
+	}
+	pool, err := ants.NewPool(workerNum, ants.WithPanicHandler(func(i interface{}) {
+		s.mu.Lock()
+		defer s.mu.Unlock()
+		s.workerErrors = append(s.workerErrors, i.(error))
 	}))
 	if err != nil {
 		return nil, err
 	}
-	var ticker *time.Ticker
-	if maxWork > 0 {
-		ticker = time.NewTicker(maxWorkDuration / time.Duration(maxWork))
-	}
-	scheduler := &WorkerScheduler{
-		waitGroup:    &waitGroup,
-		pool:         pool,
-		subPool:      subPool,
-		ticker:       ticker,
-		workerErrors: pWorkerErrors,
-		ctx:          ctx,
-	}
-	return scheduler, nil
+	s.pool = pool
+	return s, nil
 }
 
-func (s *WorkerScheduler) Submit(task func() error, pool ...*ants.Pool) error {
-	select {
-	case <-s.ctx.Done():
-		return s.ctx.Err()
-	default:
-	}
-	s.waitGroup.Add(1)
+// SubmitBlocking enqueues a async task to ants, the task will be executed in future when timing is right.
+// It doesn't return error because it wouldn't be any when with a Blocking semantic, returned error does nothing but
+// causing confusion, more often, people thought it is returned by the task.
+// Since it is async task, the callframes would not be available for production mode, you can export Environment
+// Varaible ASYNC_CF=true to enable callframes capturing when debugging.
+// IMPORTANT: do NOT call SubmitBlocking inside the async task, it is likely to cause a deadlock, call
+// SubmitNonBlocking instead when number of tasks is relatively small.
+func (s *WorkerScheduler) SubmitBlocking(task func() error) {
 	// this is expensive, enable by EnvVar
-	cf := "set Environment Varaible ASYNC_CF=true to enable callframes information"
-	if os.Getenv("ASYNC_CF") == "true" {
-		cf = utils.GatherCallFrames()
-	}
-	var currentPool *ants.Pool
-	if pool == nil {
-		currentPool = s.pool
-	} else {
-		currentPool = pool[0]
+	cf := s.gatherCallFrames()
+	// to make sure task is done
+	if len(s.workerErrors) > 0 {
+		// not point to continue
+		return
 	}
-
-	return currentPool.Submit(func() {
-		var err error
+	s.waitGroup.Add(1)
+	err := s.pool.Submit(func() {
 		defer s.waitGroup.Done()
-		defer func() {
-			if err == nil {
-				r := recover()
-				if r != nil {
-					err = fmt.Errorf("%s", r)
-				}
-			}
-			if err != nil {
-				panic(fmt.Errorf("%s\n%s", err, cf))
-			}
-		}()
-		if pool == nil && s.ticker != nil {
-			for s.subPool.Running() != 0 {
-				<-s.ticker.C
-			}
-		}
-		if s.ticker != nil {
-			<-s.ticker.C
+
+		id := atomic.AddInt32(&s.counter, 1)
+		s.logger.Debug("schedulerJob >>> %d started", id)
+		defer s.logger.Debug("schedulerJob <<< %d ended", id)
+
+		if len(s.workerErrors) > 0 {
+			// not point to continue
+			return
 		}
+		// wait for rate limit throttling
+
+		var err error
+		defer s.gatherError(err, cf)
 		select {
 		case <-s.ctx.Done():
 			err = s.ctx.Err()
-		default:
-			err = task()
+		case <-s.ticker.C:
+			err = task() // nolint
 		}
 	})
+	// failed to submit, note that this is not task erro
+	if err != nil {
+		s.gatherError(err, cf)
+	}
+}
+
+func (s *WorkerScheduler) gatherCallFrames() string {
+	cf := "set Environment Varaible ASYNC_CF=true to enable callframes capturing"
+	if callframeEnabled {
+		cf = utils.GatherCallFrames(1)
+	}
+	return cf
+}
+
+func (s *WorkerScheduler) gatherError(err error, callframs string) {
+	if err == nil {
+		r := recover()
+		if r != nil {
+			err = fmt.Errorf("%s\n%s", r, callframs)
+		}
+	}
+	if err != nil {
+		s.mu.Lock()
+		defer s.mu.Unlock()
+		s.workerErrors = append(s.workerErrors, err)
+	}
 }
 
-func (s *WorkerScheduler) WaitUntilFinish() error {
+// NextTick enqueues task in a NonBlocking manner, you should only call this method within task submitted by
+// SubmitBlocking method
+// IMPORTANT: do NOT call this method with a huge number of tasks, it is likely to eat up all available memory
+func (s *WorkerScheduler) NextTick(task func() error) {
+	cf := s.gatherCallFrames()
+	// to make sure task will be enqueued
+	s.waitGroup.Add(1)
+	go func() {
+		var err error
+		defer s.waitGroup.Done()
+		defer s.gatherError(err, cf)
+		err = task() // nolint
+	}()
+}
+
+// Wait blocks current go-routine until all workers returned
+func (s *WorkerScheduler) Wait() error {
 	s.waitGroup.Wait()
-	if s.workerErrors != nil && len(*s.workerErrors) > 0 {
-		return fmt.Errorf("%s", *s.workerErrors)
+	if len(s.workerErrors) > 0 {
+		return fmt.Errorf("%s", s.workerErrors)
 	}
 	return nil
 }
 
+// Release resources
 func (s *WorkerScheduler) Release() {
+	s.waitGroup.Wait()
 	s.pool.Release()
-	s.subPool.Release()
 	if s.ticker != nil {
 		s.ticker.Stop()
 	}
 }
-
-func (s *WorkerScheduler) Add(delta int) {
-	s.waitGroup.Add(delta)
-}
-
-func (s *WorkerScheduler) Done() {
-	s.waitGroup.Done()
-}
diff --git a/plugins/helper/worker_scheduler_test.go b/plugins/helper/worker_scheduler_test.go
index 4f65e7a0..aa1ad22c 100644
--- a/plugins/helper/worker_scheduler_test.go
+++ b/plugins/helper/worker_scheduler_test.go
@@ -22,21 +22,24 @@ import (
 	"testing"
 	"time"
 
+	"github.com/apache/incubator-devlake/helpers/unithelper"
 	"github.com/stretchr/testify/assert"
 )
 
-func TestNewWorkerScheduler(t *testing.T) {
+func TestWorkerSchedulerQpsControl(t *testing.T) {
+	// assuming we want 2 requests per second
 	testChannel := make(chan int, 100)
 	ctx, cancel := context.WithCancel(context.Background())
-	s, _ := NewWorkerScheduler(5, 2, 1*time.Second, ctx, 0)
+	s, _ := NewWorkerScheduler(5, 2, 1*time.Second, ctx, 0, unithelper.DummyLogger())
 	defer s.Release()
 	for i := 1; i <= 5; i++ {
 		t := i
-		_ = s.Submit(func() error {
+		s.SubmitBlocking(func() error {
 			testChannel <- t
 			return nil
 		})
 	}
+	// after 1 second, 2 requerts should be issued
 	time.Sleep(1200 * time.Millisecond)
 	if len(testChannel) < 2 {
 		t.Fatal(`worker not start`)
@@ -44,6 +47,7 @@ func TestNewWorkerScheduler(t *testing.T) {
 	if len(testChannel) > 2 {
 		t.Fatal(`worker run too fast`)
 	}
+	// after 2 seconds, 4 requests should be issued
 	time.Sleep(time.Second)
 	if len(testChannel) < 4 {
 		t.Fatal(`worker not run after a second`)
@@ -51,53 +55,9 @@ func TestNewWorkerScheduler(t *testing.T) {
 	if len(testChannel) > 4 {
 		t.Fatal(`worker run too fast after a second`)
 	}
-	assert.Nil(t, s.WaitUntilFinish())
-	if len(*s.workerErrors) != 0 {
-		t.Fatal(`worker got panic`)
-	}
+	assert.Nil(t, s.Wait())
 	if len(testChannel) != 5 {
 		t.Fatal(`worker not wait until finish`)
 	}
 	cancel()
 }
-
-func TestNewWorkerSchedulerWithoutSecond(t *testing.T) {
-	testChannel := make(chan int, 100)
-	ctx, cancel := context.WithCancel(context.Background())
-	s, _ := NewWorkerScheduler(5, 0, 1*time.Second, ctx, 0)
-	defer s.Release()
-	for i := 1; i <= 5; i++ {
-		t := i
-		_ = s.Submit(func() error {
-			testChannel <- t
-			return nil
-		})
-	}
-	time.Sleep(5 * time.Millisecond)
-	if len(testChannel) != 5 {
-		t.Fatal(`worker not finish`)
-	}
-	assert.Nil(t, s.WaitUntilFinish())
-	if len(testChannel) != 5 {
-		t.Fatal(`worker not finish`)
-	}
-	cancel()
-}
-
-/*
-func TestNewWorkerSchedulerWithPanic(t *testing.T) {
-	testChannel := make(chan int, 100)
-	ctx, cancel := context.WithCancel(context.Background())
-	s,_ := NewWorkerScheduler(1, 1, ctx)
-	defer s.Release()
-	_ = s.Submit(func() error {
-		testChannel <- 1
-		return errors.New(`error message`)
-	})
-	s.WaitUntilFinish()
-	if len(*s.workerErrors) != 1 {
-		t.Fatal(`worker not got panic`)
-	}
-	cancel()
-}
-*/
diff --git a/plugins/jira/api/connection.go b/plugins/jira/api/connection.go
index 0c422bbe..f11c5a26 100644
--- a/plugins/jira/api/connection.go
+++ b/plugins/jira/api/connection.go
@@ -28,12 +28,9 @@ import (
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/models"
-	"github.com/go-playground/validator/v10"
 	"github.com/mitchellh/mapstructure"
 )
 
-var vld = validator.New()
-
 func TestConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, error) {
 
 	// decode
@@ -107,16 +104,13 @@ POST /plugins/jira/connections
 }
 */
 func PostConnections(input *core.ApiResourceInput) (*core.ApiResourceOutput, error) {
-	// create a new connection
-	jiraConnection := &models.JiraConnection{}
-
 	// update from request and save to database
-	err := helper.CreateConnection(input.Body, jiraConnection, db)
+	connection := &models.JiraConnection{}
+	err := connectionHelper.Create(connection, input)
 	if err != nil {
 		return nil, err
 	}
-
-	return &core.ApiResourceOutput{Body: jiraConnection, Status: http.StatusOK}, nil
+	return &core.ApiResourceOutput{Body: connection, Status: http.StatusOK}, nil
 }
 
 /*
@@ -129,43 +123,37 @@ PATCH /plugins/jira/connections/:connectionId
 }
 */
 func PatchConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, error) {
-	jiraConnection := &models.JiraConnection{}
-	err := helper.PatchConnection(input, jiraConnection, db)
+	connection := &models.JiraConnection{}
+	err := connectionHelper.Patch(connection, input)
 	if err != nil {
 		return nil, err
 	}
-
-	return &core.ApiResourceOutput{Body: jiraConnection}, nil
+	return &core.ApiResourceOutput{Body: connection}, nil
 }
 
 /*
 DELETE /plugins/jira/connections/:connectionId
 */
 func DeleteConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, error) {
-	// load from db
-	jiraConnectionID, err := helper.GetConnectionIdByInputParam(input.Params)
-	if err != nil {
-		return nil, err
-	}
-	// cascading delete
-	err = db.Where("id = ?", jiraConnectionID).Delete(&models.JiraConnection{}).Error
+	connection := &models.JiraConnection{}
+	err := connectionHelper.First(connection, input.Params)
 	if err != nil {
 		return nil, err
 	}
-	return &core.ApiResourceOutput{Body: jiraConnectionID}, nil
+	err = connectionHelper.Delete(connection)
+	return &core.ApiResourceOutput{Body: connection}, err
 }
 
 /*
 GET /plugins/jira/connections
 */
 func ListConnections(input *core.ApiResourceInput) (*core.ApiResourceOutput, error) {
-	jiraConnections := make([]*models.JiraConnection, 0)
-
-	err := helper.ListConnections(&jiraConnections, db)
+	var connections []models.JiraConnection
+	err := connectionHelper.List(&connections)
 	if err != nil {
 		return nil, err
 	}
-	return &core.ApiResourceOutput{Body: jiraConnections, Status: http.StatusOK}, nil
+	return &core.ApiResourceOutput{Body: connections, Status: http.StatusOK}, nil
 }
 
 /*
@@ -180,11 +168,7 @@ GET /plugins/jira/connections/:connectionId
 }
 */
 func GetConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, error) {
-	jiraConnection := &models.JiraConnection{}
-	err := helper.GetConnection(input.Params, jiraConnection, db)
-	if err != nil {
-		return nil, err
-	}
-
-	return &core.ApiResourceOutput{Body: jiraConnection}, nil
+	connection := &models.JiraConnection{}
+	err := connectionHelper.First(connection, input.Params)
+	return &core.ApiResourceOutput{Body: connection}, err
 }
diff --git a/plugins/jira/api/init.go b/plugins/jira/api/init.go
index acaa495e..6774e148 100644
--- a/plugins/jira/api/init.go
+++ b/plugins/jira/api/init.go
@@ -19,12 +19,21 @@ package api
 
 import (
 	"github.com/apache/incubator-devlake/plugins/core"
+	"github.com/apache/incubator-devlake/plugins/helper"
+	"github.com/go-playground/validator/v10"
 	"github.com/spf13/viper"
 	"gorm.io/gorm"
 )
 
-var db *gorm.DB
+var vld *validator.Validate
+var connectionHelper *helper.ConnectionApiHelper
+var basicRes core.BasicRes
 
 func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-	db = database
+	basicRes = helper.NewDefaultBasicRes(config, logger, database)
+	vld = validator.New()
+	connectionHelper = helper.NewConnectionHelper(
+		basicRes,
+		vld,
+	)
 }
diff --git a/plugins/jira/api/proxy.go b/plugins/jira/api/proxy.go
index 398f4f1f..f477ff26 100644
--- a/plugins/jira/api/proxy.go
+++ b/plugins/jira/api/proxy.go
@@ -20,7 +20,6 @@ package api
 import (
 	"encoding/json"
 	"fmt"
-	"github.com/apache/incubator-devlake/utils"
 	"io/ioutil"
 	"time"
 
@@ -34,23 +33,18 @@ const (
 )
 
 func Proxy(input *core.ApiResourceInput) (*core.ApiResourceOutput, error) {
-	jiraConnection := &models.JiraConnection{}
-	err := helper.GetConnection(input.Params, jiraConnection, db)
-	if err != nil {
-		return nil, err
-	}
-	basicAuth := utils.GetEncodedToken(jiraConnection.Username, jiraConnection.Password)
-
+	connection := &models.JiraConnection{}
+	err := connectionHelper.First(connection, input.Params)
 	if err != nil {
 		return nil, err
 	}
 	apiClient, err := helper.NewApiClient(
-		jiraConnection.Endpoint,
+		connection.Endpoint,
 		map[string]string{
-			"Authorization": fmt.Sprintf("Basic %v", basicAuth),
+			"Authorization": fmt.Sprintf("Basic %v", connection.GetEncodedToken()),
 		},
 		30*time.Second,
-		jiraConnection.Proxy,
+		connection.Proxy,
 		nil,
 	)
 	if err != nil {
diff --git a/plugins/jira/jira.go b/plugins/jira/jira.go
index 1e0d23fb..0dc791ff 100644
--- a/plugins/jira/jira.go
+++ b/plugins/jira/jira.go
@@ -24,6 +24,7 @@ import (
 
 	"github.com/apache/incubator-devlake/migration"
 	"github.com/apache/incubator-devlake/plugins/core"
+	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/api"
 	"github.com/apache/incubator-devlake/plugins/jira/models"
 	"github.com/apache/incubator-devlake/plugins/jira/models/migrationscripts"
@@ -99,7 +100,8 @@ func (plugin Jira) SubTaskMetas() []core.SubTaskMeta {
 func (plugin Jira) PrepareTaskData(taskCtx core.TaskContext, options map[string]interface{}) (interface{}, error) {
 	var op tasks.JiraOptions
 	var err error
-	db := taskCtx.GetDb()
+	logger := taskCtx.GetLogger()
+	logger.Debug("%v", options)
 	err = mapstructure.Decode(options, &op)
 	if err != nil {
 		return nil, err
@@ -108,7 +110,14 @@ func (plugin Jira) PrepareTaskData(taskCtx core.TaskContext, options map[string]
 		return nil, fmt.Errorf("connectionId is invalid")
 	}
 	connection := &models.JiraConnection{}
-	err = db.First(connection, op.ConnectionId).Error
+	connectionHelper := helper.NewConnectionHelper(
+		taskCtx,
+		nil,
+	)
+	if err != nil {
+		return nil, err
+	}
+	err = connectionHelper.FirstById(connection, op.ConnectionId)
 	if err != nil {
 		return nil, err
 	}
@@ -136,6 +145,7 @@ func (plugin Jira) PrepareTaskData(taskCtx core.TaskContext, options map[string]
 	}
 	if !since.IsZero() {
 		taskData.Since = &since
+		logger.Debug("collect data updated since %s", since)
 	}
 	return taskData, nil
 }
@@ -188,14 +198,16 @@ var PluginEntry Jira //nolint
 // standalone mode for debugging
 func main() {
 	cmd := &cobra.Command{Use: "jira"}
-	connectionId := cmd.Flags().Uint64P("connection", "s", 0, "jira connection id")
+	connectionId := cmd.Flags().Uint64P("connection", "c", 0, "jira connection id")
 	boardId := cmd.Flags().Uint64P("board", "b", 0, "jira board id")
 	_ = cmd.MarkFlagRequired("connection")
 	_ = cmd.MarkFlagRequired("board")
+	since := cmd.Flags().StringP("since", "s", "", "collect data that are updated after specified time, ie 2006-05-06T07:08:09Z")
 	cmd.Run = func(c *cobra.Command, args []string) {
 		runner.DirectRun(c, args, PluginEntry, map[string]interface{}{
 			"connectionId": *connectionId,
 			"boardId":      *boardId,
+			"since":        *since,
 		})
 	}
 	runner.RunCmd(cmd)
diff --git a/plugins/jira/models/migrationscripts/updateSchemas20220601.go b/plugins/jira/models/migrationscripts/updateSchemas20220601.go
index 5043cfca..ba9e9554 100644
--- a/plugins/jira/models/migrationscripts/updateSchemas20220601.go
+++ b/plugins/jira/models/migrationscripts/updateSchemas20220601.go
@@ -20,10 +20,11 @@ package migrationscripts
 import (
 	"context"
 	"encoding/base64"
+	"strings"
+
+	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"gorm.io/gorm"
-	"gorm.io/gorm/clause"
-	"strings"
 )
 
 type JiraConnection20220601 struct {
@@ -38,9 +39,20 @@ func (JiraConnection20220601) TableName() string {
 	return "_tool_jira_connections"
 }
 
-type UpdateSchemas20220601 struct{}
+type UpdateSchemas20220601 struct {
+	config core.ConfigGetter
+	logger core.Logger
+}
+
+func (u *UpdateSchemas20220601) SetConfigGetter(getter core.ConfigGetter) {
+	u.config = getter
+}
 
-func (*UpdateSchemas20220601) Up(ctx context.Context, db *gorm.DB) error {
+func (u *UpdateSchemas20220601) SetLogger(logger core.Logger) {
+	u.logger = logger
+}
+
+func (u *UpdateSchemas20220601) Up(ctx context.Context, db *gorm.DB) error {
 	var err error
 	if !db.Migrator().HasColumn(&JiraConnection20220505{}, "password") {
 		err = db.Migrator().AddColumn(&JiraConnection20220601{}, "password")
@@ -62,39 +74,42 @@ func (*UpdateSchemas20220601) Up(ctx context.Context, db *gorm.DB) error {
 		if err != nil {
 			return err
 		}
-		for i := range connections {
-			err = helper.DecryptConnection(connections[i])
+		encKey := u.config.GetString(core.EncodeKeyEnvStr)
+		for _, connection := range connections {
+			basicAuthEncoded, err := core.Decrypt(encKey, connection.BasicAuthEncoded)
 			if err != nil {
-				return err
+				u.logger.Warn("failed to decrypt basicAuth for connection %d", connection.ID)
+				continue
 			}
-			decodedStr, err := base64.StdEncoding.DecodeString(connections[i].BasicAuthEncoded)
+			basicAuth, err := base64.StdEncoding.DecodeString(basicAuthEncoded)
 			if err != nil {
 				return err
 			}
-			strList := strings.Split(string(decodedStr), ":")
+			strList := strings.Split(string(basicAuth), ":")
 			if len(strList) > 1 {
 				newConnection := JiraConnection20220601{
 					RestConnection: helper.RestConnection{
 						BaseConnection: helper.BaseConnection{
-							Name:  connections[i].Name,
-							Model: connections[i].Model,
+							Name:  connection.Name,
+							Model: connection.Model,
 						},
-						Endpoint:  connections[i].Endpoint,
-						Proxy:     connections[i].Proxy,
-						RateLimit: connections[i].RateLimit,
+						Endpoint:  connection.Endpoint,
+						Proxy:     connection.Proxy,
+						RateLimit: connection.RateLimit,
 					},
 					BasicAuth: helper.BasicAuth{
 						Username: strList[0],
 						Password: strList[1],
 					},
-					EpicKeyField:               connections[i].EpicKeyField,
-					StoryPointField:            connections[i].StoryPointField,
-					RemotelinkCommitShaPattern: connections[i].RemotelinkCommitShaPattern,
+					EpicKeyField:               connection.EpicKeyField,
+					StoryPointField:            connection.StoryPointField,
+					RemotelinkCommitShaPattern: connection.RemotelinkCommitShaPattern,
+				}
+				err := db.Save(newConnection).Error
+				if err != nil {
+					return err
 				}
-				db.Clauses(clause.OnConflict{UpdateAll: true}).Create(newConnection)
-
 			}
-			connections[i].Name = strList[0]
 		}
 		err = db.Migrator().DropColumn(&JiraConnection20220505{}, "basic_auth_encoded")
 		if err != nil {
diff --git a/plugins/jira/tasks/api_client.go b/plugins/jira/tasks/api_client.go
index b44b024b..b6eddd8e 100644
--- a/plugins/jira/tasks/api_client.go
+++ b/plugins/jira/tasks/api_client.go
@@ -27,12 +27,6 @@ import (
 )
 
 func NewJiraApiClient(taskCtx core.TaskContext, connection *models.JiraConnection) (*helper.ApiAsyncClient, error) {
-	// decrypt connection first
-	err := helper.DecryptConnection(connection)
-	if err != nil {
-		return nil, fmt.Errorf("Failed to decrypt Auth AccessToken: %w", err)
-	}
-
 	// create synchronize api client so we can calculate api rate limit dynamically
 	headers := map[string]string{
 		"Authorization": fmt.Sprintf("Basic %v", connection.GetEncodedToken()),
diff --git a/plugins/jira/tasks/changelog_collector.go b/plugins/jira/tasks/changelog_collector.go
index bd3f37bf..b4150735 100644
--- a/plugins/jira/tasks/changelog_collector.go
+++ b/plugins/jira/tasks/changelog_collector.go
@@ -25,6 +25,7 @@ import (
 	"reflect"
 
 	"github.com/apache/incubator-devlake/plugins/core"
+	. "github.com/apache/incubator-devlake/plugins/core/dal"
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/models"
 	"github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
@@ -39,28 +40,35 @@ func CollectChangelogs(taskCtx core.SubTaskContext) error {
 	if data.JiraServerInfo.DeploymentType == models.DeploymentServer {
 		return nil
 	}
-	db := taskCtx.GetDb()
+	db := taskCtx.GetDal()
 	// figure out the time range
 	since := data.Since
 
 	// filter out issue_ids that needed collection
-	tx := db.Table("_tool_jira_board_issues bi").
-		Select("bi.issue_id, NOW() AS update_time").
-		Joins("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)").
-		Where("bi.connection_id = ? AND bi.board_id = ? AND (i.changelog_updated IS NULL OR i.changelog_updated < i.updated)", data.Options.ConnectionId, data.Options.BoardId)
-
+	clauses := []interface{}{
+		Select("bi.issue_id, NOW() AS update_time"),
+		From("_tool_jira_board_issues bi"),
+		Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
+		Where(
+			`bi.connection_id = ?
+			   AND bi.board_id = ?
+			   AND (i.changelog_updated IS NULL OR i.changelog_updated < i.updated)`,
+			data.Options.ConnectionId,
+			data.Options.BoardId,
+		),
+	}
 	// apply time range if any
 	if since != nil {
-		tx = tx.Where("i.updated > ?", *since)
+		clauses = append(clauses, Where("i.updated > ?", *since))
 	}
 
 	// construct the input iterator
-	cursor, err := tx.Rows()
+	cursor, err := db.Cursor(clauses...)
 	if err != nil {
 		return err
 	}
 	// smaller struct can reduce memory footprint, we should try to avoid using big struct
-	iterator, err := helper.NewCursorIterator(db, cursor, reflect.TypeOf(apiv2models.Input{}))
+	iterator, err := helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(apiv2models.Input{}))
 	if err != nil {
 		return err
 	}
@@ -76,7 +84,7 @@ func CollectChangelogs(taskCtx core.SubTaskContext) error {
 			Table: RAW_CHANGELOG_TABLE,
 		},
 		ApiClient:     data.ApiClient,
-		PageSize:      50,
+		PageSize:      100,
 		Incremental:   true,
 		GetTotalPages: GetTotalPagesFromResponse,
 		Input:         iterator,
diff --git a/plugins/jira/tasks/remotelink_collector.go b/plugins/jira/tasks/remotelink_collector.go
index a38d8ba1..3c6ac045 100644
--- a/plugins/jira/tasks/remotelink_collector.go
+++ b/plugins/jira/tasks/remotelink_collector.go
@@ -23,8 +23,8 @@ import (
 	"reflect"
 
 	"github.com/apache/incubator-devlake/plugins/core"
+	. "github.com/apache/incubator-devlake/plugins/core/dal"
 	"github.com/apache/incubator-devlake/plugins/helper"
-	"github.com/apache/incubator-devlake/plugins/jira/models"
 	"github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
 )
 
@@ -34,10 +34,9 @@ var _ core.SubTaskEntryPoint = CollectRemotelinks
 
 func CollectRemotelinks(taskCtx core.SubTaskContext) error {
 	data := taskCtx.GetData().(*JiraTaskData)
-	db := taskCtx.GetDb()
+	db := taskCtx.GetDal()
 	logger := taskCtx.GetLogger()
 	logger.Info("collect remotelink")
-	jiraIssue := &models.JiraIssue{}
 
 	/*
 		`CollectIssues` will take into account of `since` option and set the `updated` field for issues that have
@@ -45,29 +44,29 @@ func CollectRemotelinks(taskCtx core.SubTaskContext) error {
 		`remotelink_updated` field. If `remotelink_updated` is older, then we'll collect remotelinks for this issue and
 		set its `remotelink_updated` to `updated` at the end.
 	*/
-	cursor, err := db.Model(jiraIssue).
-		Select("_tool_jira_issues.issue_id", "NOW() AS update_time").
-		Joins(`LEFT JOIN _tool_jira_board_issues ON (
-			_tool_jira_board_issues.connection_id = _tool_jira_issues.connection_id AND
-			_tool_jira_board_issues.issue_id = _tool_jira_issues.issue_id
-		)`).
+	cursor, err := db.Cursor(
+		Select("i.issue_id, NOW() AS update_time"),
+		From("_tool_jira_remotelinks"),
+		Join(`LEFT JOIN bi ON (
+			bi.connection_id = i.connection_id AND
+			bi.issue_id = i.issue_id
+		)`),
 		Where(`
-			_tool_jira_board_issues.connection_id = ? AND
-			_tool_jira_board_issues.board_id = ? AND
-			(_tool_jira_issues.remotelink_updated IS NULL OR _tool_jira_issues.remotelink_updated < _tool_jira_issues.updated)
+			bi.connection_id = ? AND
+			bi.board_id = ? AND
+			(i.remotelink_updated IS NULL OR i.remotelink_updated < i.updated)
 			`,
 			data.Options.ConnectionId,
 			data.Options.BoardId,
-		).
-		Rows()
+		),
+	)
 	if err != nil {
 		logger.Error("collect remotelink error:%v", err)
 		return err
 	}
-	defer cursor.Close()
 
 	// smaller struct can reduce memory footprint, we should try to avoid using big struct
-	iterator, err := helper.NewCursorIterator(db, cursor, reflect.TypeOf(apiv2models.Input{}))
+	iterator, err := helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(apiv2models.Input{}))
 	if err != nil {
 		return err
 	}
@@ -84,7 +83,6 @@ func CollectRemotelinks(taskCtx core.SubTaskContext) error {
 		ApiClient:   data.ApiClient,
 		Input:       iterator,
 		UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/remotelink",
-		Concurrency: 10,
 		ResponseParser: func(res *http.Response) ([]json.RawMessage, error) {
 			if res.StatusCode == http.StatusNotFound {
 				return nil, nil
diff --git a/plugins/jira/tasks/user_collector.go b/plugins/jira/tasks/user_collector.go
index 051a62b9..6c9e62c2 100644
--- a/plugins/jira/tasks/user_collector.go
+++ b/plugins/jira/tasks/user_collector.go
@@ -24,6 +24,7 @@ import (
 	"reflect"
 
 	"github.com/apache/incubator-devlake/plugins/core"
+	. "github.com/apache/incubator-devlake/plugins/core/dal"
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/models"
 )
@@ -32,14 +33,18 @@ const RAW_USERS_TABLE = "jira_api_users"
 
 func CollectUsers(taskCtx core.SubTaskContext) error {
 	data := taskCtx.GetData().(*JiraTaskData)
-	db := taskCtx.GetDb()
+	db := taskCtx.GetDal()
 	logger := taskCtx.GetLogger()
 	logger.Info("collect user")
-	cursor, err := db.Model(&models.JiraUser{}).Where("connection_id = ?", data.Options.ConnectionId).Rows()
+	cursor, err := db.Cursor(
+		Select("account_id"),
+		From("_tool_jira_users"),
+		Where("connection_id = ?", data.Options.ConnectionId),
+	)
 	if err != nil {
 		return err
 	}
-	iterator, err := helper.NewCursorIterator(db, cursor, reflect.TypeOf(models.JiraUser{}))
+	iterator, err := helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.JiraUser{}))
 	if err != nil {
 		return err
 	}
@@ -65,7 +70,6 @@ func CollectUsers(taskCtx core.SubTaskContext) error {
 			query.Set(queryKey, user.AccountId)
 			return query, nil
 		},
-		Concurrency: 10,
 		ResponseParser: func(res *http.Response) ([]json.RawMessage, error) {
 			var result json.RawMessage
 			err := helper.UnmarshalResponse(res, &result)
diff --git a/plugins/jira/tasks/worklog_collector.go b/plugins/jira/tasks/worklog_collector.go
index 70e873d7..2ec8fea2 100644
--- a/plugins/jira/tasks/worklog_collector.go
+++ b/plugins/jira/tasks/worklog_collector.go
@@ -23,6 +23,7 @@ import (
 	"reflect"
 
 	"github.com/apache/incubator-devlake/plugins/core"
+	. "github.com/apache/incubator-devlake/plugins/core/dal"
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
 )
@@ -30,26 +31,36 @@ import (
 const RAW_WORKLOGS_TABLE = "jira_api_worklogs"
 
 func CollectWorklogs(taskCtx core.SubTaskContext) error {
-	db := taskCtx.GetDb()
+	db := taskCtx.GetDal()
 	data := taskCtx.GetData().(*JiraTaskData)
 	since := data.Since
 
 	logger := taskCtx.GetLogger()
-	connectionId := data.Connection.ID
-	boardId := data.Options.BoardId
-	tx := db.Table("_tool_jira_board_issues bi").
-		Select("bi.issue_id, NOW() AS update_time").
-		Joins("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)").
-		Where("bi.connection_id = ? AND bi.board_id = ? AND (i.worklog_updated IS NULL OR i.worklog_updated < i.updated)", connectionId, boardId)
 
+	// filter out issue_ids that needed collection
+	clauses := []interface{}{
+		Select("bi.issue_id, NOW() AS update_time"),
+		From("_tool_jira_board_issues bi"),
+		Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
+		Where(
+			`bi.connection_id = ?
+			   AND bi.board_id = ?
+			   AND (i.worklog_updated IS NULL OR i.worklog_updated < i.updated)`,
+			data.Options.ConnectionId,
+			data.Options.BoardId,
+		),
+	}
+	// apply time range if any
 	if since != nil {
-		tx = tx.Where("i.updated > ?", since)
+		clauses = append(clauses, Where("i.updated > ?", *since))
 	}
-	cursor, err := tx.Rows()
+
+	// construct the input iterator
+	cursor, err := db.Cursor(clauses...)
 	if err != nil {
 		return err
 	}
-	iterator, err := helper.NewCursorIterator(db, cursor, reflect.TypeOf(apiv2models.Input{}))
+	iterator, err := helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(apiv2models.Input{}))
 	if err != nil {
 		return err
 	}
diff --git a/runner/directrun.go b/runner/directrun.go
index 86d4609c..8c0802ef 100644
--- a/runner/directrun.go
+++ b/runner/directrun.go
@@ -68,7 +68,7 @@ func DirectRun(cmd *cobra.Command, args []string, pluginTask core.PluginTask, op
 	// collect migration and run
 	migration.Init(db)
 	if migratable, ok := pluginTask.(core.Migratable); ok {
-		migration.Register(migratable.MigrationScripts(), cmd.Use)
+		RegisterMigrationScripts(migratable.MigrationScripts(), cmd.Use, cfg, log)
 	}
 	err = migration.Execute(context.Background())
 	if err != nil {
diff --git a/runner/loader.go b/runner/loader.go
index c801fc12..e5da95ca 100644
--- a/runner/loader.go
+++ b/runner/loader.go
@@ -19,7 +19,6 @@ package runner
 
 import (
 	"fmt"
-	"github.com/apache/incubator-devlake/migration"
 	"io/fs"
 	"path/filepath"
 	"plugin"
@@ -58,7 +57,7 @@ func LoadPlugins(pluginsDir string, config *viper.Viper, logger core.Logger, db
 				}
 			}
 			if migratable, ok := symPluginEntry.(core.Migratable); ok {
-				migration.Register(migratable.MigrationScripts(), pluginName)
+				RegisterMigrationScripts(migratable.MigrationScripts(), pluginName, config, logger)
 			}
 			err = core.RegisterPlugin(pluginName, pluginMeta)
 			if err != nil {
diff --git a/plugins/jira/api/init.go b/runner/migration.go
similarity index 66%
copy from plugins/jira/api/init.go
copy to runner/migration.go
index acaa495e..d189f57a 100644
--- a/plugins/jira/api/init.go
+++ b/runner/migration.go
@@ -15,16 +15,21 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package api
+package runner
 
 import (
+	"github.com/apache/incubator-devlake/migration"
 	"github.com/apache/incubator-devlake/plugins/core"
-	"github.com/spf13/viper"
-	"gorm.io/gorm"
 )
 
-var db *gorm.DB
-
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
-	db = database
+func RegisterMigrationScripts(scripts []migration.Script, comment string, config core.ConfigGetter, logger core.Logger) {
+	for _, script := range scripts {
+		if s, ok := script.(core.InjectConfigGetter); ok {
+			s.SetConfigGetter(config)
+		}
+		if s, ok := script.(core.InjectLogger); ok {
+			s.SetLogger(logger)
+		}
+	}
+	migration.Register(scripts, comment)
 }
diff --git a/runner/run_task.go b/runner/run_task.go
index 568d47b6..24c8fd24 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -56,7 +56,7 @@ func RunTask(
 	// make sure task status always correct even if it panicked
 	defer func() {
 		if r := recover(); r != nil {
-			err = fmt.Errorf("run task failed with panic (%s): %v", utils.GatherCallFrames(), r)
+			err = fmt.Errorf("run task failed with panic (%s): %v", utils.GatherCallFrames(0), r)
 		}
 		finishedAt := time.Now()
 		spentSeconds := finishedAt.Unix() - beganAt.Unix()
diff --git a/services/init.go b/services/init.go
index 995d1aec..4843e887 100644
--- a/services/init.go
+++ b/services/init.go
@@ -19,8 +19,11 @@ package services
 
 import (
 	"context"
+
 	"github.com/apache/incubator-devlake/models/migrationscripts"
 
+	"time"
+
 	"github.com/apache/incubator-devlake/config"
 	"github.com/apache/incubator-devlake/logger"
 	"github.com/apache/incubator-devlake/migration"
@@ -28,7 +31,6 @@ import (
 	"github.com/robfig/cron/v3"
 	"github.com/spf13/viper"
 	"gorm.io/gorm"
-	"time"
 )
 
 var cfg *viper.Viper
@@ -45,7 +47,7 @@ func init() {
 		panic(err)
 	}
 	migration.Init(db)
-	migrationscripts.RegisterAll()
+	runner.RegisterMigrationScripts(migrationscripts.All(), "Framework", cfg, logger.Global)
 	// load plugins
 	err = runner.LoadPlugins(
 		cfg.GetString("PLUGIN_DIR"),
diff --git a/utils/callframes.go b/utils/callframes.go
index 44cda2d6..611ad346 100644
--- a/utils/callframes.go
+++ b/utils/callframes.go
@@ -23,12 +23,12 @@ import (
 	"strings"
 )
 
-func GatherCallFrames() string {
+func GatherCallFrames(delta int) string {
 	var name, file string
 	var line int
 	var pc [16]uintptr
 
-	n := runtime.Callers(3, pc[:])
+	n := runtime.Callers(3+delta, pc[:])
 	for _, pc := range pc[:n] {
 		fn := runtime.FuncForPC(pc)
 		if fn == nil {