You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2022/06/08 06:50:32 UTC
[incubator-devlake] branch main updated: Refactor `ApiCollector` to simplify deadlock/canceling logic PoC (#2053)
This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 1be33a5b Refactor `ApiCollector` to simplify deadlock/canceling logic PoC (#2053)
1be33a5b is described below
commit 1be33a5bc600771005390f17a4bc39c360eb8cf5
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Wed Jun 8 14:50:28 2022 +0800
Refactor `ApiCollector` to simplify deadlock/canceling logic PoC (#2053)
* refactor: simplify api collector deadlock logic
* refactor: improve dal sql expression
* fix: jira users collection
* feat: setup mockery
1. remove mock files from repo
2. generate mock files before unit-test
* fix: avoid `<nil>` appearing in `.env`
* refactor: added debug logging to print `since`
* refactor: backup old `gomonkey` unit-tests
* refactor: relocate helpers to avoid cycle import
* fix: unit test
* feat: auto build lake-builder on tags `builder-v*`
* feat: front-lint run only for config-ui folder and pr
* fix: update tester image
* fix: remove invalid escaping
* fix: config unit test
* fix: rebase
* refactor: connection helper
* fix: make mock error messages
* fix: linting
* fix: linting
---
.github/workflows/build-builder.yml | 24 +
.github/workflows/build.yml | 2 +-
.github/workflows/front-end-lint.yml | 6 +-
.github/workflows/golangci-lint.yml | 2 +
.github/workflows/test-e2e.yml | 2 +-
.github/workflows/test.yml | 2 +-
.gitignore | 5 +-
Makefile | 8 +-
config/config.go | 7 +-
config/config_test.go | 4 +-
devops/lake-builder/Dockerfile | 6 +-
.../e2ehelper}/data_flow_tester.go | 7 +-
.../e2ehelper}/data_flow_tester_test.go | 2 +-
.../pluginhelper}/csv_file_iterator.go | 2 +-
.../pluginhelper}/csv_file_iterator_test.go | 2 +-
main.go => helpers/unithelper/dummy_logger.go | 18 +-
.../unithelper/dummy_subtaskcontext.go | 17 +-
impl/dalgorm/dalgorm.go | 117 ++
main.go | 13 +
models/migrationscripts/register.go | 6 +-
main.go => plugins/core/config.go | 13 +-
plugins/core/dal/dal.go | 107 ++
plugins/core/logger.go | 4 +
plugins/core/plugin_task.go | 13 +-
plugins/gitlab/e2e/project_test.go | 4 +-
plugins/helper/api_async_client.go | 86 +-
...client_test.go => api_async_client_test.go.old} | 0
plugins/helper/api_client_test.go | 122 --
plugins/helper/api_collector.go | 492 +++-----
.../helper/api_collector_func.go | 52 +-
plugins/helper/api_collector_test.go | 1185 +-------------------
plugins/helper/api_extractor.go | 5 +-
...extractor_test.go => api_extractor_test.go.old} | 0
...ider_test.go => batch_save_divider_test.go.old} | 0
.../{batch_save_test.go => batch_save_test.go.old} | 0
.../callbacks.go} | 6 +-
plugins/helper/connection.go | 244 ++--
plugins/helper/connection_test.go | 183 ++-
...onvertor_test.go => data_convertor_test.go.old} | 0
plugins/helper/default_task_context.go | 64 +-
plugins/helper/iterator.go | 38 +
plugins/helper/worker_scheduler.go | 200 ++--
plugins/helper/worker_scheduler_test.go | 56 +-
plugins/jira/api/connection.go | 48 +-
plugins/jira/api/init.go | 13 +-
plugins/jira/api/proxy.go | 16 +-
plugins/jira/jira.go | 18 +-
.../migrationscripts/updateSchemas20220601.go | 55 +-
plugins/jira/tasks/api_client.go | 6 -
plugins/jira/tasks/changelog_collector.go | 28 +-
plugins/jira/tasks/remotelink_collector.go | 32 +-
plugins/jira/tasks/user_collector.go | 12 +-
plugins/jira/tasks/worklog_collector.go | 31 +-
runner/directrun.go | 2 +-
runner/loader.go | 3 +-
plugins/jira/api/init.go => runner/migration.go | 19 +-
runner/run_task.go | 2 +-
services/init.go | 6 +-
utils/callframes.go | 4 +-
59 files changed, 1214 insertions(+), 2207 deletions(-)
diff --git a/.github/workflows/build-builder.yml b/.github/workflows/build-builder.yml
new file mode 100644
index 00000000..27b7861b
--- /dev/null
+++ b/.github/workflows/build-builder.yml
@@ -0,0 +1,24 @@
+name: Build-BuilderImage-Push-Docker
+on:
+ push:
+ tags:
+ - 'builder-*'
+jobs:
+ build-lake:
+ name: Build lake-builder image
+ runs-on: ubuntu-20.04
+ steps:
+ - uses: actions/checkout@v2
+ - uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9
+ with:
+ username: ${{ secrets.DOCKER_REGISTRY_USERNAME }}
+ password: ${{ secrets.DOCKER_REGISTRY_PASSWORD }}
+ - name: Build lake image
+ run: |
+ cd devops/lake-builder/
+ export IMAGE_LAKE=mericodev/lake-builder
+ export IMAGE_VER=${GITHUB_REF:18}
+ docker build -t $IMAGE_LAKE:latest --file ./Dockerfile .
+ docker tag $IMAGE_LAKE:latest $IMAGE_LAKE:$IMAGE_VER
+ docker push $IMAGE_LAKE:$IMAGE_VER
+ docker push $IMAGE_LAKE:latest
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 5544bfa5..f7395585 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -2,7 +2,7 @@ name: Build-Images-Push-Docker
on:
push:
tags:
- - '*'
+ - 'v*'
jobs:
build-lake:
name: Build lake image
diff --git a/.github/workflows/front-end-lint.yml b/.github/workflows/front-end-lint.yml
index 5bc8fd62..81debb42 100644
--- a/.github/workflows/front-end-lint.yml
+++ b/.github/workflows/front-end-lint.yml
@@ -1,5 +1,9 @@
name: Frontend-Lint
-on: push
+on:
+ pull_request:
+ branches: [ main ]
+ paths:
+ - 'config-ui/**'
jobs:
build:
runs-on: ubuntu-latest
diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml
index 856344b7..1fe0ba2b 100644
--- a/.github/workflows/golangci-lint.yml
+++ b/.github/workflows/golangci-lint.yml
@@ -19,6 +19,8 @@ jobs:
with:
path: golangci-lint
key: ${{ runner.os }}-golangci-lint
+ - name: generate mock
+ run: make mock
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
diff --git a/.github/workflows/test-e2e.yml b/.github/workflows/test-e2e.yml
index bf4c4d8c..1a083811 100644
--- a/.github/workflows/test-e2e.yml
+++ b/.github/workflows/test-e2e.yml
@@ -24,7 +24,7 @@ jobs:
MYSQL_USER: merico
MYSQL_PASSWORD: merico
MYSQL_ROOT_PASSWORD: root
- container: mericodev/lake-builder:0.0.5
+ container: mericodev/lake-builder:v0.0.5
steps:
- uses: actions/checkout@v3
- name: Cache test-e2e
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 7fa02e0f..2c6f5e51 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -17,7 +17,7 @@ jobs:
os: [ubuntu-latest]
runs-on: ${{ matrix.os }}
container:
- image: mericodev/lake-builder:0.0.5
+ image: mericodev/lake-builder:v0.0.5
steps:
- name: Checkout code
uses: actions/checkout@v3
diff --git a/.gitignore b/.gitignore
index 8c3f60d0..408e6e1a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -148,6 +148,9 @@ bin
libgit2
.air.toml
-#swagger .json and .yaml
+# swagger .json and .yaml
api/docs/swagger.json
api/docs/swagger.yaml
+
+# mocks
+mocks/
diff --git a/Makefile b/Makefile
index 186ef726..17ff5d67 100644
--- a/Makefile
+++ b/Makefile
@@ -51,9 +51,15 @@ configure-dev:
commit:
git cz
+mock:
+ rm -rf mocks
+ mockery --dir=./plugins/core --unroll-variadic=false --name='.*'
+ mockery --dir=./plugins/core/dal --unroll-variadic=false --name='.*'
+ mockery --dir=./plugins/helper --unroll-variadic=false --name='.*'
+
test: unit-test e2e-test
-unit-test: build
+unit-test: mock build
set -e; for m in $$(go list ./... | egrep -v 'test|models|e2e'); do echo $$m; go test -timeout 60s -gcflags=all=-l -v $$m; done
e2e-test: build
diff --git a/config/config.go b/config/config.go
index 39126e4a..4ecaedb6 100644
--- a/config/config.go
+++ b/config/config.go
@@ -70,11 +70,14 @@ func replaceNewEnvItemInOldContent(v *viper.Viper, envFileContent string) (error
switch ret := val.(type) {
case string:
ret = strings.Replace(ret, `\`, `\\`, -1)
- ret = strings.Replace(ret, `=`, `\=`, -1)
- ret = strings.Replace(ret, `'`, `\'`, -1)
+ //ret = strings.Replace(ret, `=`, `\=`, -1)
+ //ret = strings.Replace(ret, `'`, `\'`, -1)
ret = strings.Replace(ret, `"`, `\"`, -1)
return fmt.Sprintf(`%v="%v"`, envName, ret)
default:
+ if val == nil {
+ return fmt.Sprintf(`%v=`, envName)
+ }
return fmt.Sprintf(`%v="%v"`, envName, ret)
}
})
diff --git a/config/config_test.go b/config/config_test.go
index f46bb341..41f63be8 100644
--- a/config/config_test.go
+++ b/config/config_test.go
@@ -108,11 +108,11 @@ some unuseful message
a blank
AA="aaaa"
BB="1#1"
-CC="1\"\'1"
+CC="1\"'1"
DD="1\\\"1"
# some comment
-EE="\="
+EE="="
FF="1.01"
GGG="gggg"
H.278="278"
diff --git a/devops/lake-builder/Dockerfile b/devops/lake-builder/Dockerfile
index 05ec5f78..e81a8cc2 100644
--- a/devops/lake-builder/Dockerfile
+++ b/devops/lake-builder/Dockerfile
@@ -16,8 +16,6 @@
# current tag: mericodev/lake-builder:0.0.5
FROM golang:1.17-alpine3.15 as builder
-#RUN set -eux && sed -i 's/dl-cdn.alpinelinux.org/mirrors.ustc.edu.cn/g' /etc/apk/repositories
-RUN apk update
-RUN apk upgrade
#RUN apk add --update gcc=130.2.1_pre1-r3 g++=10.2.1_pre1-r3
-RUN apk add --no-cache tzdata libgit2-dev gcc g++ make tar
+RUN apk update && apk upgrade && apk add --no-cache tzdata libgit2-dev gcc g++ make
+RUN go install github.com/vektra/mockery/v2@latest
diff --git a/testhelper/data_flow_tester.go b/helpers/e2ehelper/data_flow_tester.go
similarity index 96%
rename from testhelper/data_flow_tester.go
rename to helpers/e2ehelper/data_flow_tester.go
index 555073ba..df9950b4 100644
--- a/testhelper/data_flow_tester.go
+++ b/helpers/e2ehelper/data_flow_tester.go
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package testhelper
+package e2ehelper
import (
"context"
@@ -24,6 +24,7 @@ import (
"time"
"github.com/apache/incubator-devlake/config"
+ "github.com/apache/incubator-devlake/helpers/pluginhelper"
"github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
@@ -86,7 +87,7 @@ func NewDataFlowTester(t *testing.T, pluginName string, pluginMeta core.PluginMe
// ImportCsv imports records from specified csv file into target table, note that existing data would be deleted first.
func (t *DataFlowTester) ImportCsv(csvRelPath string, tableName string) {
- csvIter := NewCsvFileIterator(csvRelPath)
+ csvIter := pluginhelper.NewCsvFileIterator(csvRelPath)
defer csvIter.Close()
// create table if not exists
err := t.Db.Table(tableName).AutoMigrate(&helper.RawData{})
@@ -127,7 +128,7 @@ func (t *DataFlowTester) Subtask(subtaskMeta core.SubTaskMeta, taskData interfac
// Primary Key Fields with `pkfields` so DataFlowTester could select the exact record from database, as well as which
// fields to compare with by specifying `targetfields` parameter.
func (t *DataFlowTester) VerifyTable(tableName string, csvRelPath string, pkfields []string, targetfields []string) {
- csvIter := NewCsvFileIterator(csvRelPath)
+ csvIter := pluginhelper.NewCsvFileIterator(csvRelPath)
defer csvIter.Close()
var expectedTotal int64
diff --git a/testhelper/data_flow_tester_test.go b/helpers/e2ehelper/data_flow_tester_test.go
similarity index 99%
rename from testhelper/data_flow_tester_test.go
rename to helpers/e2ehelper/data_flow_tester_test.go
index db1776f5..4348debf 100644
--- a/testhelper/data_flow_tester_test.go
+++ b/helpers/e2ehelper/data_flow_tester_test.go
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package testhelper
+package e2ehelper
import (
"testing"
diff --git a/testhelper/csv_file_iterator.go b/helpers/pluginhelper/csv_file_iterator.go
similarity index 99%
rename from testhelper/csv_file_iterator.go
rename to helpers/pluginhelper/csv_file_iterator.go
index 55b228b8..2de777e0 100644
--- a/testhelper/csv_file_iterator.go
+++ b/helpers/pluginhelper/csv_file_iterator.go
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package testhelper
+package pluginhelper
import (
"encoding/csv"
diff --git a/testhelper/csv_file_iterator_test.go b/helpers/pluginhelper/csv_file_iterator_test.go
similarity index 97%
rename from testhelper/csv_file_iterator_test.go
rename to helpers/pluginhelper/csv_file_iterator_test.go
index 9db0c587..9c6c6394 100644
--- a/testhelper/csv_file_iterator_test.go
+++ b/helpers/pluginhelper/csv_file_iterator_test.go
@@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package testhelper
+package pluginhelper
func ExampleCsvFileIterator() {
iter := NewCsvFileIterator("/path/to/foobar.csv")
diff --git a/main.go b/helpers/unithelper/dummy_logger.go
similarity index 56%
copy from main.go
copy to helpers/unithelper/dummy_logger.go
index 918b738c..9ce45984 100644
--- a/main.go
+++ b/helpers/unithelper/dummy_logger.go
@@ -15,13 +15,21 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package main
+package unithelper
import (
- "github.com/apache/incubator-devlake/api"
- _ "github.com/apache/incubator-devlake/version"
+ "github.com/apache/incubator-devlake/mocks"
+ "github.com/stretchr/testify/mock"
)
-func main() {
- api.CreateApiService()
+func DummyLogger() *mocks.Logger {
+ logger := new(mocks.Logger)
+ logger.On("IsLevelEnabled", mock.Anything).Return(false).Maybe()
+ logger.On("Printf", mock.Anything, mock.Anything).Maybe()
+ logger.On("Log", mock.Anything, mock.Anything, mock.Anything).Maybe()
+ logger.On("Debug", mock.Anything, mock.Anything).Maybe()
+ logger.On("Info", mock.Anything, mock.Anything).Maybe()
+ logger.On("Warn", mock.Anything, mock.Anything).Maybe()
+ logger.On("Error", mock.Anything, mock.Anything).Maybe()
+ return logger
}
diff --git a/main.go b/helpers/unithelper/dummy_subtaskcontext.go
similarity index 60%
copy from main.go
copy to helpers/unithelper/dummy_subtaskcontext.go
index 918b738c..10a9fe3e 100644
--- a/main.go
+++ b/helpers/unithelper/dummy_subtaskcontext.go
@@ -15,13 +15,20 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package main
+package unithelper
import (
- "github.com/apache/incubator-devlake/api"
- _ "github.com/apache/incubator-devlake/version"
+ "github.com/apache/incubator-devlake/mocks"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
+ "github.com/stretchr/testify/mock"
)
-func main() {
- api.CreateApiService()
+func DummySubTaskContext(db dal.Dal) *mocks.SubTaskContext {
+ mockCtx := new(mocks.SubTaskContext)
+ mockCtx.On("GetDal").Return(db)
+ mockCtx.On("GetLogger").Return(DummyLogger())
+ mockCtx.On("SetProgress", mock.Anything, mock.Anything)
+ mockCtx.On("IncProgress", mock.Anything, mock.Anything)
+ mockCtx.On("GetName").Return("test")
+ return mockCtx
}
diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
new file mode 100644
index 00000000..ce6e60bf
--- /dev/null
+++ b/impl/dalgorm/dalgorm.go
@@ -0,0 +1,117 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package dalgorm
+
+import (
+ "database/sql"
+
+ "github.com/apache/incubator-devlake/plugins/core/dal"
+ "gorm.io/gorm"
+ "gorm.io/gorm/clause"
+)
+
+type Dalgorm struct {
+ db *gorm.DB
+}
+
+// To accommodate gorm
+//type stubTable struct {
+//name string
+//}
+
+//func (s *stubTable) TableName() string {
+//return s.name
+//}
+
+func buildTx(tx *gorm.DB, clauses []interface{}) *gorm.DB {
+ for _, clause := range clauses {
+ switch clause := clause.(type) {
+ case *dal.JoinClause:
+ tx = tx.Joins(clause.Expr, clause.Params...)
+ case *dal.WhereClause:
+ tx = tx.Where(clause.Expr, clause.Params...)
+ case dal.OrderbyClause:
+ tx = tx.Order(string(clause))
+ case dal.LimitClause:
+ tx = tx.Limit(int(clause))
+ case dal.OffsetClause:
+ tx = tx.Offset(int(clause))
+ case dal.FromClause:
+ tx = tx.Table(string(clause))
+ case dal.SelectClause:
+ tx = tx.Select(string(clause))
+ }
+ }
+ return tx
+}
+
+var _ dal.Dal = (*Dalgorm)(nil)
+
+// Exec executes raw sql query
+func (d *Dalgorm) Exec(query string, params ...interface{}) error {
+ return d.db.Exec(query, params...).Error
+}
+
+// CreateTable creates a table with gorm definition from `entity`
+func (d *Dalgorm) AutoMigrate(entity interface{}, clauses ...interface{}) error {
+ return buildTx(d.db, clauses).AutoMigrate(entity)
+}
+
+// Cursor returns a database cursor, cursor is especially useful when handling big amount of rows of data
+func (d *Dalgorm) Cursor(clauses ...interface{}) (*sql.Rows, error) {
+ return buildTx(d.db, clauses).Rows()
+}
+
+// Fetch loads row data from `cursor` into `dst`
+func (d *Dalgorm) Fetch(cursor *sql.Rows, dst interface{}) error {
+ return d.db.ScanRows(cursor, dst)
+}
+
+// All loads matched rows from database to `dst`, USE IT WITH COUTIOUS!!
+func (d *Dalgorm) All(dst interface{}, clauses ...interface{}) error {
+ return buildTx(d.db, clauses).Find(dst).Error
+}
+
+// First loads first matched row from database to `dst`, error will be returned if no records were found
+func (d *Dalgorm) First(dst interface{}, clauses ...interface{}) error {
+ return buildTx(d.db, clauses).First(dst).Error
+}
+
+// Create insert record to database
+func (d *Dalgorm) Create(entity interface{}, clauses ...interface{}) error {
+ return buildTx(d.db, clauses).Create(entity).Error
+}
+
+// Update updates record
+func (d *Dalgorm) Update(entity interface{}, clauses ...interface{}) error {
+ return buildTx(d.db, clauses).Save(entity).Error
+}
+
+// CreateOrUpdate tries to create the record, or fallback to update all if failed
+func (d *Dalgorm) CreateOrUpdate(entity interface{}, clauses ...interface{}) error {
+ return buildTx(d.db, clauses).Clauses(clause.OnConflict{UpdateAll: true}).Create(entity).Error
+}
+
+// Delete records from database
+func (d *Dalgorm) Delete(entity interface{}, clauses ...interface{}) error {
+ return buildTx(d.db, clauses).Delete(entity).Error
+}
+
+func NewDalgorm(db *gorm.DB) *Dalgorm {
+ return &Dalgorm{db}
+}
diff --git a/main.go b/main.go
index 918b738c..7c3001f7 100644
--- a/main.go
+++ b/main.go
@@ -19,9 +19,22 @@ package main
import (
"github.com/apache/incubator-devlake/api"
+ "github.com/apache/incubator-devlake/config"
+ "github.com/apache/incubator-devlake/plugins/core"
_ "github.com/apache/incubator-devlake/version"
)
func main() {
+ v := config.GetConfig()
+ encKey := v.GetString(core.EncodeKeyEnvStr)
+ if encKey == "" {
+ // Randomly generate a bunch of encryption keys and set them to config
+ encKey = core.RandomEncKey()
+ v.Set(core.EncodeKeyEnvStr, encKey)
+ err := config.WriteConfig(v)
+ if err != nil {
+ panic(err)
+ }
+ }
api.CreateApiService()
}
diff --git a/models/migrationscripts/register.go b/models/migrationscripts/register.go
index ebbfcbd7..bb0896d7 100644
--- a/models/migrationscripts/register.go
+++ b/models/migrationscripts/register.go
@@ -20,12 +20,12 @@ package migrationscripts
import "github.com/apache/incubator-devlake/migration"
// RegisterAll register all the migration scripts of framework
-func RegisterAll() {
- migration.Register([]migration.Script{
+func All() []migration.Script {
+ return []migration.Script{
new(initSchemas),
new(updateSchemas20220505), new(updateSchemas20220507), new(updateSchemas20220510),
new(updateSchemas20220513), new(updateSchemas20220524), new(updateSchemas20220526),
new(updateSchemas20220527), new(updateSchemas20220528), new(updateSchemas20220601),
new(updateSchemas20220602),
- }, "Framework")
+ }
}
diff --git a/main.go b/plugins/core/config.go
similarity index 83%
copy from main.go
copy to plugins/core/config.go
index 918b738c..97bf3ac1 100644
--- a/main.go
+++ b/plugins/core/config.go
@@ -15,13 +15,12 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package main
+package core
-import (
- "github.com/apache/incubator-devlake/api"
- _ "github.com/apache/incubator-devlake/version"
-)
+type ConfigGetter interface {
+ GetString(name string) string
+}
-func main() {
- api.CreateApiService()
+type InjectConfigGetter interface {
+ SetConfigGetter(getter ConfigGetter)
}
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
new file mode 100644
index 00000000..5318b6ca
--- /dev/null
+++ b/plugins/core/dal/dal.go
@@ -0,0 +1,107 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package dal
+
+import "database/sql"
+
+// Dal aims to facilitate an isolation of Database Access Layer by defining a set of operations should a
+// Database Access Layer provide
+// This is inroduced by the fact that mocking *gorm.DB is hard, and `gomonkey` is not working on macOS
+type Dal interface {
+ // Exec executes raw sql query
+ Exec(query string, params ...interface{}) error
+ // CreateTable creates a table with gorm definition from `entity`
+ AutoMigrate(entity interface{}, clauses ...interface{}) error
+ // Cursor returns a database cursor, cursor is especially useful when handling big amount of rows of data
+ Cursor(clauses ...interface{}) (*sql.Rows, error)
+ // Fetch loads row data from `cursor` into `dst`
+ Fetch(cursor *sql.Rows, dst interface{}) error
+ // All loads matched rows from database to `dst`, USE IT WITH COUTIOUS!!
+ All(dst interface{}, clauses ...interface{}) error
+ // First loads first matched row from database to `dst`, error will be returned if no records were found
+ First(dst interface{}, clauses ...interface{}) error
+ // Create insert record to database
+ Create(entity interface{}, clauses ...interface{}) error
+ // Update updates record
+ Update(entity interface{}, clauses ...interface{}) error
+ // CreateOrUpdate tries to create the record, or fallback to update all if failed
+ CreateOrUpdate(entity interface{}, clauses ...interface{}) error
+ // Delete records from database
+ Delete(entity interface{}, clauses ...interface{}) error
+}
+
+type dalClause struct {
+ Expr string
+ Params []interface{}
+}
+
+// JoinClause represents a SQL `JOIN` clause
+type JoinClause dalClause
+
+// Join creates a new JoinClause
+func Join(clause string, params ...interface{}) *JoinClause {
+ return &JoinClause{clause, params}
+}
+
+// WhereClause represents a SQL `WHERE` clause
+type WhereClause dalClause
+
+// Where creates a new WhereClause
+func Where(clause string, params ...interface{}) *WhereClause {
+ return &WhereClause{clause, params}
+}
+
+// LimitClause represents a SQL `LIMIT` clause
+type LimitClause int
+
+// Limit creates a new LimitClause
+func Limit(limit int) LimitClause {
+ return LimitClause(limit)
+}
+
+// OffsetClause represents a SQL `OFFSET` clause
+type OffsetClause int
+
+// Offset creates a new OffsetClause
+func Offset(offset int) OffsetClause {
+ return OffsetClause(offset)
+}
+
+// FromClause represents a SQL `OFFSET` clause
+type FromClause string
+
+// From creates a new TableClause
+func From(table string) FromClause {
+ return FromClause(table)
+}
+
+// SelectClause represents a SQL `OFFSET` clause
+type SelectClause string
+
+// Select creates a new TableClause
+func Select(fields string) SelectClause {
+ return SelectClause(fields)
+}
+
+// OrderbyClause represents a SQL `ORDER BY` clause
+type OrderbyClause string
+
+// Orderby creates a new Orderby
+func Orderby(expr string) OrderbyClause {
+ return OrderbyClause(expr)
+}
diff --git a/plugins/core/logger.go b/plugins/core/logger.go
index 11155bb9..0b345426 100644
--- a/plugins/core/logger.go
+++ b/plugins/core/logger.go
@@ -40,3 +40,7 @@ type Logger interface {
// return a new logger which output nested log
Nested(name string) Logger
}
+
+type InjectLogger interface {
+ SetLogger(logger Logger)
+}
diff --git a/plugins/core/plugin_task.go b/plugins/core/plugin_task.go
index bc1d9451..9ca9aec8 100644
--- a/plugins/core/plugin_task.go
+++ b/plugins/core/plugin_task.go
@@ -20,6 +20,7 @@ package core
import (
"context"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
"gorm.io/gorm"
)
@@ -41,13 +42,19 @@ type RunningProgress struct {
SubTaskNumber int
}
+type BasicRes interface {
+ GetConfig(name string) string
+ GetLogger() Logger
+ // Deprecated: use dal instead
+ GetDb() *gorm.DB
+ GetDal() dal.Dal
+}
+
// This interface define all resources that needed for task/subtask execution
type ExecContext interface {
+ BasicRes
GetName() string
- GetConfig(name string) string
- GetDb() *gorm.DB
GetContext() context.Context
- GetLogger() Logger
GetData() interface{}
SetProgress(current int, total int)
IncProgress(quantity int)
diff --git a/plugins/gitlab/e2e/project_test.go b/plugins/gitlab/e2e/project_test.go
index cccc01fa..32a7f02b 100644
--- a/plugins/gitlab/e2e/project_test.go
+++ b/plugins/gitlab/e2e/project_test.go
@@ -20,15 +20,15 @@ package e2e
import (
"testing"
+ "github.com/apache/incubator-devlake/helpers/e2ehelper"
"github.com/apache/incubator-devlake/plugins/gitlab/impl"
"github.com/apache/incubator-devlake/plugins/gitlab/tasks"
- "github.com/apache/incubator-devlake/testhelper"
)
func TestGitlabDataFlow(t *testing.T) {
var gitlab impl.Gitlab
- dataflowTester := testhelper.NewDataFlowTester(t, "gitlab", gitlab)
+ dataflowTester := e2ehelper.NewDataFlowTester(t, "gitlab", gitlab)
taskData := &tasks.GitlabTaskData{
Options: &tasks.GitlabOptions{
diff --git a/plugins/helper/api_async_client.go b/plugins/helper/api_async_client.go
index 20d4f894..4add3dcf 100644
--- a/plugins/helper/api_async_client.go
+++ b/plugins/helper/api_async_client.go
@@ -28,11 +28,10 @@ import (
"time"
"github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/helper/common"
"github.com/apache/incubator-devlake/utils"
)
-type ApiAsyncCallback func(*http.Response, error) error
-
var HttpMinStatusRetryCode = http.StatusBadRequest
// ApiAsyncClient is built on top of ApiClient, to provide a asynchronous semantic
@@ -40,11 +39,13 @@ var HttpMinStatusRetryCode = http.StatusBadRequest
// will be performed in parallel with rate-limit support
type ApiAsyncClient struct {
*ApiClient
- maxRetry int
- scheduler *WorkerScheduler
- qps float64
+ maxRetry int
+ scheduler *WorkerScheduler
+ hasError bool
+ numOfWorkers int
}
+// CreateAsyncApiClient creates a new ApiAsyncClient
func CreateAsyncApiClient(
taskCtx core.TaskContext,
apiClient *ApiClient,
@@ -85,55 +86,69 @@ func CreateAsyncApiClient(
d := duration / RESPONSE_TIME
numOfWorkers := requests / int(d)
- taskCtx.GetLogger().Info(
+ logger := taskCtx.GetLogger()
+ logger.Info(
"scheduler for api %s worker: %d, request: %d, duration: %v",
apiClient.GetEndpoint(),
numOfWorkers,
requests,
duration,
)
- scheduler, err := NewWorkerScheduler(numOfWorkers, requests, duration, taskCtx.GetContext(), retry)
+ scheduler, err := NewWorkerScheduler(
+ numOfWorkers,
+ requests,
+ duration,
+ taskCtx.GetContext(),
+ retry,
+ logger,
+ )
if err != nil {
return nil, fmt.Errorf("failed to create scheduler: %w", err)
}
- qps := float64(requests) / duration.Seconds()
// finally, wrap around api client with async sematic
return &ApiAsyncClient{
apiClient,
retry,
scheduler,
- qps,
+ false,
+ numOfWorkers,
}, nil
}
+// GetMaxRetry returns the maximum retry attempts for a request
func (apiClient *ApiAsyncClient) GetMaxRetry() int {
return apiClient.maxRetry
}
+// SetMaxRetry sets the maximum retry attempts for a request
func (apiClient *ApiAsyncClient) SetMaxRetry(
maxRetry int,
) {
apiClient.maxRetry = maxRetry
}
+// DoAsync would carry out a asynchronous request
func (apiClient *ApiAsyncClient) DoAsync(
method string,
path string,
query url.Values,
body interface{},
header http.Header,
- handler ApiAsyncCallback,
+ handler common.ApiAsyncCallback,
retry int,
-) error {
- var subFunc func() error
- subFunc = func() error {
+) {
+ var request func() error
+ request = func() error {
var err error
var res *http.Response
var body []byte
res, err = apiClient.Do(method, path, query, body, header)
+ // make sure response body is read successfully, or we might have to retry
if err == nil {
+ // make sure response.Body stream will be closed to avoid running out of file handle
defer func(body io.ReadCloser) { body.Close() }(res.Body)
+ // replace NetworkStream with MemoryBuffer
body, err = ioutil.ReadAll(res.Body)
if err == nil {
res.Body = io.NopCloser(bytes.NewBuffer(body))
@@ -153,18 +168,23 @@ func (apiClient *ApiAsyncClient) DoAsync(
if needRetry {
// check weather we still have retry times and not error from handler and canceled error
if retry < apiClient.maxRetry && err != context.Canceled {
- apiClient.logError("retry #%d for %s", retry, err.Error())
+ apiClient.logger.Warn("retry #%d for %s", retry, err.Error())
retry++
- return apiClient.scheduler.Submit(subFunc, apiClient.scheduler.subPool)
+ apiClient.scheduler.NextTick(request)
+ return nil
}
+ }
+
+ if err != nil {
+ apiClient.hasError = true
return err
}
// it is important to let handler have a chance to handle error, or it can hang indefinitely
// when error occurs
- return handler(res, err)
+ return handler(res)
}
- return apiClient.scheduler.Submit(subFunc)
+ apiClient.scheduler.SubmitBlocking(request)
}
// Enqueue an api get request, the request may be sent sometime in future in parallel with other api requests
@@ -172,32 +192,34 @@ func (apiClient *ApiAsyncClient) GetAsync(
path string,
query url.Values,
header http.Header,
- handler ApiAsyncCallback,
-) error {
- return apiClient.DoAsync(http.MethodGet, path, query, nil, header, handler, 0)
+ handler common.ApiAsyncCallback,
+) {
+ apiClient.DoAsync(http.MethodGet, path, query, nil, header, handler, 0)
}
-// Wait until all async requests were done
+// WaitAsync blocks until all async requests were done
func (apiClient *ApiAsyncClient) WaitAsync() error {
- return apiClient.scheduler.WaitUntilFinish()
+ return apiClient.scheduler.Wait()
}
-func (apiClient *ApiAsyncClient) GetQps() float64 {
- return apiClient.qps
+func (apiClient *ApiAsyncClient) HasError() bool {
+ return apiClient.hasError
}
-func (apiClient *ApiAsyncClient) Add(delta int) {
- apiClient.scheduler.Add(delta)
+
+func (apiClient *ApiAsyncClient) NextTick(task func() error) {
+ apiClient.scheduler.NextTick(task)
}
-func (apiClient *ApiAsyncClient) Done() {
- apiClient.scheduler.Done()
+
+func (apiClient *ApiAsyncClient) GetNumOfWorkers() int {
+ return apiClient.numOfWorkers
}
type RateLimitedApiClient interface {
- GetAsync(path string, query url.Values, header http.Header, handler ApiAsyncCallback) error
+ GetAsync(path string, query url.Values, header http.Header, handler common.ApiAsyncCallback)
WaitAsync() error
- GetQps() float64
- Add(delta int)
- Done()
+ HasError() bool
+ NextTick(task func() error)
+ GetNumOfWorkers() int
}
var _ RateLimitedApiClient = (*ApiAsyncClient)(nil)
diff --git a/plugins/helper/api_async_client_test.go b/plugins/helper/api_async_client_test.go.old
similarity index 100%
rename from plugins/helper/api_async_client_test.go
rename to plugins/helper/api_async_client_test.go.old
diff --git a/plugins/helper/api_client_test.go b/plugins/helper/api_client_test.go
deleted file mode 100644
index 251179c1..00000000
--- a/plugins/helper/api_client_test.go
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package helper
-
-import (
- "net/url"
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestGetURIStringPointer_WithSlash(t *testing.T) {
- baseUrl := "http://my-site.com/"
- relativePath := "/api/stuff"
- queryParams := url.Values{}
- queryParams.Set("id", "1")
- expected := "http://my-site.com/api/stuff?id=1"
- actual, err := GetURIStringPointer(baseUrl, relativePath, queryParams)
- assert.Equal(t, err == nil, true)
- assert.Equal(t, expected, *actual)
-
-}
-func TestGetURIStringPointer_WithNoSlash(t *testing.T) {
- baseUrl := "http://my-site.com"
- relativePath := "api/stuff"
- queryParams := url.Values{}
- queryParams.Set("id", "1")
- expected := "http://my-site.com/api/stuff?id=1"
- actual, err := GetURIStringPointer(baseUrl, relativePath, queryParams)
- assert.Equal(t, err == nil, true)
- assert.Equal(t, expected, *actual)
-}
-func TestGetURIStringPointer_WithRelativePath(t *testing.T) {
- baseUrl := "http://my-site.com/rest"
- relativePath := "api/stuff"
- queryParams := url.Values{}
- queryParams.Set("id", "1")
- expected := "http://my-site.com/rest/api/stuff?id=1"
- actual, err := GetURIStringPointer(baseUrl, relativePath, queryParams)
- assert.Equal(t, err == nil, true)
- assert.Equal(t, expected, *actual)
-}
-func TestGetURIStringPointer_WithRelativePath2(t *testing.T) {
- baseUrl := "https://my-site.com/api/v4/"
- relativePath := "projects/stuff"
- queryParams := url.Values{}
- queryParams.Set("id", "1")
- expected := "https://my-site.com/api/v4/projects/stuff?id=1"
- actual, err := GetURIStringPointer(baseUrl, relativePath, queryParams)
- assert.Equal(t, err == nil, true)
- assert.Equal(t, expected, *actual)
-}
-
-func TestGetURIStringPointer_HandlesRelativePathStartingWithSlash(t *testing.T) {
- baseUrl := "https://my-site.com/api/v4/"
- relativePath := "/user"
- expected := "https://my-site.com/api/v4/user"
- actual, err := GetURIStringPointer(baseUrl, relativePath, nil)
- assert.Equal(t, err == nil, true)
- assert.Equal(t, expected, *actual)
-}
-
-func TestGetURIStringPointer_HandlesRelativePathStartingWithSlashWithParams(t *testing.T) {
- baseUrl := "https://my-site.com/api/v4/"
- relativePath := "/user"
- queryParams := url.Values{}
- queryParams.Set("id", "1")
- expected := "https://my-site.com/api/v4/user?id=1"
- actual, err := GetURIStringPointer(baseUrl, relativePath, queryParams)
- assert.Equal(t, err == nil, true)
- assert.Equal(t, expected, *actual)
-}
-
-func TestAddMissingSlashToURL_NoSlash(t *testing.T) {
- baseUrl := "http://my-site.com/rest"
- expected := "http://my-site.com/rest/"
- AddMissingSlashToURL(&baseUrl)
- assert.Equal(t, expected, baseUrl)
-}
-
-func TestAddMissingSlashToURL_WithSlash(t *testing.T) {
- baseUrl := "http://my-site.com/rest/"
- expected := "http://my-site.com/rest/"
- AddMissingSlashToURL(&baseUrl)
- assert.Equal(t, expected, baseUrl)
-}
-
-func TestRemoveStartingSlashFromPath(t *testing.T) {
- testString := "/user/api"
- expected := "user/api"
- actual := RemoveStartingSlashFromPath(testString)
- assert.Equal(t, expected, actual)
-}
-
-func TestRemoveStartingSlashFromPath_EmptyString(t *testing.T) {
- testString := ""
- expected := ""
- actual := RemoveStartingSlashFromPath(testString)
- assert.Equal(t, expected, actual)
-}
-
-func TestRemoveStartingSlashFromPath_NoStartingSlash(t *testing.T) {
- testString := "user/api"
- expected := "user/api"
- actual := RemoveStartingSlashFromPath(testString)
- assert.Equal(t, expected, actual)
-}
diff --git a/plugins/helper/api_collector.go b/plugins/helper/api_collector.go
index 23cbb099..a185b1a6 100644
--- a/plugins/helper/api_collector.go
+++ b/plugins/helper/api_collector.go
@@ -19,66 +19,58 @@ package helper
import (
"bytes"
- "context"
"encoding/json"
"fmt"
"io/ioutil"
- "math"
"net/http"
"net/url"
- "sync"
"text/template"
"github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
)
+// Pager contains pagination information for a api request
type Pager struct {
Page int
Skip int
Size int
}
+// RequestData is the input of `UrlTemplate` `Query` and `Header`, so we can generate them dynamically
type RequestData struct {
- Pager *Pager
- Params interface{}
- Input interface{}
+ Pager *Pager
+ Params interface{}
+ Input interface{}
+ InputJSON []byte
}
type AsyncResponseHandler func(res *http.Response) error
type ApiCollectorArgs struct {
RawDataSubTaskArgs
- /*
- url may use arbitrary variables from different source in any order, we need GoTemplate to allow more
- flexible for all kinds of possibility.
- Pager contains information for a particular page, calculated by ApiCollector, and will be passed into
- GoTemplate to generate a url for that page.
- We want to do page-fetching in ApiCollector, because the logic are highly similar, by doing so, we can
- avoid duplicate logic for every tasks, and when we have a better idea like improving performance, we can
- do it in one place
- */
+ // UrlTemplate is used to generate the final URL for Api Collector to request
+ // i.e. `api/3/issue/{{ .Input.IssueId }}/changelog`
+ // For detail of what variables can be used, please check `RequestData`
UrlTemplate string `comment:"GoTemplate for API url"`
- // (Optional) Return query string for request, or you can plug them into UrlTemplate directly
- Query func(reqData *RequestData) (url.Values, error) `comment:"Extra query string when requesting API, like 'Since' option for jira issues collection"`
- // Some api might do pagination by http headers
- Header func(reqData *RequestData) (http.Header, error)
- PageSize int
- Incremental bool `comment:"Indicate this is a incremental collection, so the existing data won't get flushed"`
- ApiClient RateLimitedApiClient
- /*
- Sometimes, we need to collect data based on previous collected data, like jira changelog, it requires
- issue_id as part of the url.
- We can mimic `stdin` design, to accept a `Input` function which produces a `Iterator`, collector
- should iterate all records, and do data-fetching for each on, either in parallel or sequential order
- UrlTemplate: "api/3/issue/{{ Input.ID }}/changelog"
- */
- Input Iterator
- InputRateLimit int
- /*
- For api endpoint that returns number of total pages, ApiCollector can collect pages in parallel with ease,
- or other techniques are required if this information was missing.
- */
- GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, error)
+ // Query would be sent out as part of the request URL
+ Query func(reqData *RequestData) (url.Values, error) ``
+ // Header would be sent out along with request
+ Header func(reqData *RequestData) (http.Header, error)
+ // PageSize tells ApiCollector the page size
+ PageSize int
+ // Incremental indicate if this is a incremental collection, the existing data won't get deleted if it was true
+ Incremental bool `comment:""`
+ // ApiClient is a asynchronize api request client with qps
+ ApiClient RateLimitedApiClient
+ // Input helps us collect data based on previous collected data, like collecting changelogs based on jira
+ // issue ids
+ Input Iterator
+ // GetTotalPages is to tell `ApiCollector` total number of pages based on response of the first page.
+ // so `ApiCollector` could collect those pages in parallel for us
+ GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, error)
+ // Concurrency specify qps for api that doesn't return total number of pages/records
+ // NORMALLY, DO NOT SPECIFY THIS PARAMETER, unless you know what it means
Concurrency int
ResponseParser func(res *http.Response) ([]json.RawMessage, error)
}
@@ -89,9 +81,9 @@ type ApiCollector struct {
urlTemplate *template.Template
}
-// NewApiCollector allocates a new ApiCollector with the given args.
-// ApiCollector can help you collecting data from some api with ease, pass in a AsyncApiClient and tell it which part
-// of response you want to save, ApiCollector will collect them from remote server and store them into database.
+// NewApiCollector allocates a new ApiCollector with the given args.
+// ApiCollector can help us collecting data from api with ease, pass in a AsyncApiClient and tell it which part
+// of response we want to save, ApiCollector will collect them from remote server and store them into database.
func NewApiCollector(args ApiCollectorArgs) (*ApiCollector, error) {
// process args
rawDataSubTask, err := newRawDataSubTask(args.RawDataSubTaskArgs)
@@ -112,12 +104,6 @@ func NewApiCollector(args ApiCollectorArgs) (*ApiCollector, error) {
if args.ResponseParser == nil {
return nil, fmt.Errorf("ResponseParser is required")
}
- if args.InputRateLimit == 0 {
- args.InputRateLimit = 50
- }
- if args.Concurrency < 1 {
- args.Concurrency = 1
- }
return &ApiCollector{
RawDataSubTask: rawDataSubTask,
args: &args,
@@ -131,81 +117,35 @@ func (collector *ApiCollector) Execute() error {
logger.Info("start api collection")
// make sure table is created
- db := collector.args.Ctx.GetDb()
- err := db.Table(collector.table).AutoMigrate(&RawData{})
+ db := collector.args.Ctx.GetDal()
+ err := db.AutoMigrate(&RawData{}, dal.From(collector.table))
if err != nil {
return err
}
// flush data if not incremental collection
if !collector.args.Incremental {
- err = db.Table(collector.table).Delete(&RawData{}, "params = ?", collector.params).Error
+ err = db.Delete(&RawData{}, dal.From(collector.table), dal.Where("params = ?", collector.params))
if err != nil {
return err
}
}
+ collector.args.Ctx.SetProgress(0, -1)
if collector.args.Input != nil {
- collector.args.Ctx.SetProgress(0, -1)
- // load all rows from iterator, and do multiple `exec` accordingly
- // TODO: this loads all records into memory, we need lazy-load
iterator := collector.args.Input
+ apiClient := collector.args.ApiClient
defer iterator.Close()
- // throttle input process speed so it can be canceled, create a channel to represent available slots
- slots := int(math.Ceil(collector.args.ApiClient.GetQps())) * 2
- if slots <= 0 {
- return fmt.Errorf("RateLimit can't use the 0 Qps")
- }
- slotsChan := make(chan bool, slots)
- defer close(slotsChan)
- for i := 0; i < slots; i++ {
- slotsChan <- true
- }
-
- errors := make(chan error, slots)
- defer close(errors)
-
- var wg sync.WaitGroup
- ctx := collector.args.Ctx.GetContext()
-
- out:
- for iterator.HasNext() {
- select {
- // canceled by user, stop
- case <-ctx.Done():
- err = ctx.Err()
- break out
- // obtain a slot
- case <-slotsChan:
- input, err := iterator.Fetch()
- if err != nil {
- break out
- }
- wg.Add(1)
- go func() {
- defer func() {
- wg.Done()
- recover() //nolint TODO: check the return and do log if not nil
- }()
- e := collector.exec(input)
- // propagate error
- if e != nil {
- errors <- e
- } else {
- // release 1 slot
- slotsChan <- true
- }
- }()
- case err = <-errors:
- break out
+ for iterator.HasNext() && !apiClient.HasError() {
+ input, err := iterator.Fetch()
+ if err != nil {
+ break
}
- }
- if err == nil {
- wg.Wait()
+ collector.exec(input)
}
} else {
// or we just did it once
- err = collector.exec(nil)
+ collector.exec(nil)
}
if err != nil {
@@ -213,62 +153,104 @@ func (collector *ApiCollector) Execute() error {
}
logger.Debug("wait for all async api to finished")
err = collector.args.ApiClient.WaitAsync()
- logger.Info("end api collection")
+ logger.Info("end api collection error: %w", err)
return err
}
-func (collector *ApiCollector) exec(input interface{}) error {
+func (collector *ApiCollector) exec(input interface{}) {
+ inputJson, err := json.Marshal(input)
+ if err != nil {
+ panic(err)
+ }
reqData := new(RequestData)
reqData.Input = input
- if collector.args.PageSize <= 0 {
- // collect detail of a record
- return collector.fetchAsync(reqData, collector.handleResponse(reqData))
+ reqData.InputJSON = inputJson
+ reqData.Pager = &Pager{
+ Page: 1,
+ Size: collector.args.PageSize,
}
- // collect multiple pages
- var err error
- if collector.args.GetTotalPages != nil {
- /* when total pages is available from api*/
- // fetch the very first page
- err = collector.fetchAsync(reqData, collector.handleResponseWithPages(reqData))
+ if collector.args.PageSize <= 0 {
+ collector.fetchAsync(reqData, nil)
+ } else if collector.args.GetTotalPages != nil {
+ collector.fetchPagesDetermined(reqData)
} else {
- // if api doesn't return total number of pages, employ a step concurrent technique
- // when `Concurrency` was set to 3:
- // goroutine #1 fetches pages 1/4/7..
- // goroutine #2 fetches pages 2/5/8...
- // goroutine #3 fetches pages 3/6/9...
- errs := make(chan error, collector.args.Concurrency)
- var errCount int
- // cancel can only be called when error occurs, because we are doomed anyway.
- ctx, cancel := context.WithCancel(collector.args.Ctx.GetContext())
- defer cancel()
- for i := 0; i < collector.args.Concurrency; i++ {
- reqDataTemp := RequestData{
- Pager: &Pager{
- Page: i + 1,
- Size: collector.args.PageSize,
- Skip: collector.args.PageSize * (i),
- },
- Input: reqData.Input,
- }
- go func() {
- errs <- collector.stepFetch(ctx, cancel, reqDataTemp)
- }()
+ collector.fetchPagesUndetermined(reqData)
+ }
+}
+
+// fetchPagesDetermined fetches data of all pages for APIs that return paging information
+func (collector *ApiCollector) fetchPagesDetermined(reqData *RequestData) {
+ // fetch first page
+ collector.fetchAsync(reqData, func(count int, body []byte, res *http.Response) error {
+ totalPages, err := collector.args.GetTotalPages(res, collector.args)
+ if err != nil {
+ return fmt.Errorf("fetchPagesDetermined get totalPages faileds: %s", err.Error())
}
- for e := range errs {
- errCount++
- if err != nil || errCount == collector.args.Concurrency {
- err = e
- break
+ // spawn a none blocking go routine to fetch other pages
+ collector.args.ApiClient.NextTick(func() error {
+ for page := 2; page <= totalPages; page++ {
+ reqDataTemp := &RequestData{
+ Pager: &Pager{
+ Page: page,
+ Skip: collector.args.PageSize * (page - 1),
+ Size: collector.args.PageSize,
+ },
+ Input: reqData.Input,
+ }
+ collector.fetchAsync(reqDataTemp, nil)
}
+ return nil
+ })
+ return nil
+ })
+}
+
+// fetchPagesUndetermined fetches data of all pages for APIs that do NOT return paging information
+func (collector *ApiCollector) fetchPagesUndetermined(reqData *RequestData) {
+ //logger := collector.args.Ctx.GetLogger()
+ //logger.Debug("fetch all pages in parallel with specified concurrency: %d", collector.args.Concurrency)
+ // if api doesn't return total number of pages, employ a step concurrent technique
+ // when `Concurrency` was set to 3:
+ // goroutine #1 fetches pages 1/4/7..
+ // goroutine #2 fetches pages 2/5/8...
+ // goroutine #3 fetches pages 3/6/9...
+ apiClient := collector.args.ApiClient
+ concurrency := collector.args.Concurrency
+ if concurrency == 0 {
+ // normally when a multi-pages api depends on a another resource, like jira changelogs depend on issue ids
+ // it tend to have less page, like 1 or 2 pages in total
+ if collector.args.Input != nil {
+ concurrency = 2
+ } else {
+ concurrency = apiClient.GetNumOfWorkers()
}
}
- if err != nil {
- return err
- }
- if collector.args.Input != nil {
- collector.args.Ctx.IncProgress(1)
+ for i := 0; i < concurrency; i++ {
+ reqDataCopy := RequestData{
+ Pager: &Pager{
+ Page: i + 1,
+ Size: collector.args.PageSize,
+ Skip: collector.args.PageSize * (i),
+ },
+ Input: reqData.Input,
+ }
+ var collect func() error
+ collect = func() error {
+ collector.fetchAsync(&reqDataCopy, func(count int, body []byte, res *http.Response) error {
+ if count < collector.args.PageSize {
+ return nil
+ }
+ apiClient.NextTick(func() error {
+ reqDataCopy.Pager.Skip += collector.args.PageSize
+ reqDataCopy.Pager.Page += concurrency
+ return collect()
+ })
+ return nil
+ })
+ return nil
+ }
+ apiClient.NextTick(collect)
}
- return nil
}
func (collector *ApiCollector) generateUrl(pager *Pager, input interface{}) (string, error) {
@@ -284,84 +266,23 @@ func (collector *ApiCollector) generateUrl(pager *Pager, input interface{}) (str
return buf.String(), nil
}
-// stepFetch collect pages synchronously. In practice, several stepFetch running concurrently, we could stop all of them by calling `cancel`.
-func (collector *ApiCollector) stepFetch(ctx context.Context, cancel func(), reqData RequestData) error {
- // channel `c` is used to make sure fetchAsync is called serially
- c := make(chan struct{})
- var err1 error
- handler := func(res *http.Response, err error) error {
- select {
- case <-ctx.Done():
- err = ctx.Err()
- default:
-
- }
- if err != nil {
- err1 = err
- close(c)
- return err
- }
- count, err := collector.saveRawData(res, reqData.Input)
- if err != nil {
- err1 = err
- close(c)
- cancel()
- return err
- }
- if count < collector.args.PageSize {
- close(c)
- return nil
- }
- reqData.Pager.Skip += collector.args.PageSize
- reqData.Pager.Page += collector.args.Concurrency
- c <- struct{}{}
- return nil
- }
- // kick off
- go func() { c <- struct{}{} }()
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case _, ok := <-c:
- if !ok || err1 != nil {
- return err1
- } else {
- err := collector.fetchAsync(&reqData, handler)
- if err != nil {
- close(c)
- cancel()
- return err
- }
- }
- }
- }
-}
-
-func (collector *ApiCollector) fetchAsync(reqData *RequestData, handler ApiAsyncCallback) error {
+func (collector *ApiCollector) fetchAsync(reqData *RequestData, handler func(int, []byte, *http.Response) error) {
if reqData.Pager == nil {
reqData.Pager = &Pager{
Page: 1,
Size: 100,
Skip: 0,
}
- }
- ctx := collector.args.Ctx.GetContext()
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
-
}
apiUrl, err := collector.generateUrl(reqData.Pager, reqData.Input)
if err != nil {
- return err
+ panic(err)
}
var apiQuery url.Values
if collector.args.Query != nil {
apiQuery, err = collector.args.Query(reqData)
if err != nil {
- return err
+ panic(err)
}
}
@@ -369,129 +290,56 @@ func (collector *ApiCollector) fetchAsync(reqData *RequestData, handler ApiAsync
if collector.args.Header != nil {
apiHeader, err = collector.args.Header(reqData)
if err != nil {
- return err
+ panic(err)
}
}
- return collector.args.ApiClient.GetAsync(apiUrl, apiQuery, apiHeader, handler)
-}
-
-func (collector *ApiCollector) handleResponse(reqData *RequestData) ApiAsyncCallback {
- return func(res *http.Response, err error) error {
+ logger := collector.args.Ctx.GetLogger()
+ logger.Debug("fetchAsync <<< enqueueing for %s %v", apiUrl, apiQuery)
+ collector.args.ApiClient.GetAsync(apiUrl, apiQuery, apiHeader, func(res *http.Response) error {
+ defer logger.Debug("fetchAsync >>> done for %s %v", apiUrl, apiQuery)
+ logger := collector.args.Ctx.GetLogger()
+ // read body to buffer
+ body, err := ioutil.ReadAll(res.Body)
if err != nil {
return err
}
- _, err = collector.saveRawData(res, reqData.Input)
- collector.args.Ctx.IncProgress(1)
- return err
- }
-}
-
-func (collector *ApiCollector) handleResponseWithPages(reqData *RequestData) ApiAsyncCallback {
- return func(res *http.Response, e error) error {
- if e != nil {
- return e
- }
- // gather total pages
- body, e := ioutil.ReadAll(res.Body)
- if e != nil {
- return e
- }
res.Body.Close()
res.Body = ioutil.NopCloser(bytes.NewBuffer(body))
- totalPages, e := collector.args.GetTotalPages(res, collector.args)
- if e != nil {
- return e
- }
- // save response body of first page
- res.Body = ioutil.NopCloser(bytes.NewBuffer(body))
- _, e = collector.saveRawData(res, reqData.Input)
- if e != nil {
- return e
+ // convert body to array of RawJSON
+ items, err := collector.args.ResponseParser(res)
+ if err != nil {
+ return err
}
- if collector.args.Input == nil {
- collector.args.Ctx.SetProgress(1, totalPages)
+ // save to db
+ count := len(items)
+ if count == 0 {
+ return nil
}
- // fetch other pages in parallel
- collector.args.ApiClient.Add(1)
- go func() {
- defer func() {
- collector.args.ApiClient.Done()
- recover() //nolint TODO: check the return and do log if not nil
- }()
- for page := 2; page <= totalPages; page++ {
- reqDataTemp := &RequestData{
- Pager: &Pager{
- Page: page,
- Size: collector.args.PageSize,
- Skip: collector.args.PageSize * (page - 1),
- },
- Input: reqData.Input,
- }
- _ = collector.fetchAsync(reqDataTemp, collector.handleResponse(reqDataTemp))
+ db := collector.args.Ctx.GetDal()
+ url := res.Request.URL.String()
+ rows := make([]*RawData, count)
+ for i, msg := range items {
+ rows[i] = &RawData{
+ Params: collector.params,
+ Data: msg,
+ Url: url,
+ Input: reqData.InputJSON,
}
- }()
- return nil
- }
-}
-
-func (collector *ApiCollector) saveRawData(res *http.Response, input interface{}) (int, error) {
- items, err := collector.args.ResponseParser(res)
- logger := collector.args.Ctx.GetLogger()
- if err != nil {
- return 0, err
- }
- res.Body.Close()
-
- inputJson, _ := json.Marshal(input)
-
- if len(items) == 0 {
- return 0, nil
- }
- db := collector.args.Ctx.GetDb()
- u := res.Request.URL.String()
- dd := make([]*RawData, len(items))
- for i, msg := range items {
- dd[i] = &RawData{
- Params: collector.params,
- Data: msg,
- Url: u,
- Input: inputJson,
}
- }
- err = db.Table(collector.table).Create(dd).Error
- if err != nil {
- logger.Error("failed to save raw data: %s", err)
- }
- return len(dd), err
-}
-
-func GetRawMessageDirectFromResponse(res *http.Response) ([]json.RawMessage, error) {
- body, err := ioutil.ReadAll(res.Body)
- res.Body.Close()
- if err != nil {
- return nil, err
- }
- return []json.RawMessage{body}, nil
-}
-
-func GetRawMessageArrayFromResponse(res *http.Response) ([]json.RawMessage, error) {
- rawMessages := []json.RawMessage{}
-
- if res == nil {
- return nil, fmt.Errorf("res is nil")
- }
- defer res.Body.Close()
- resBody, err := ioutil.ReadAll(res.Body)
- if err != nil {
- return nil, fmt.Errorf("%w %s", err, res.Request.URL.String())
- }
-
- err = json.Unmarshal(resBody, &rawMessages)
- if err != nil {
- return nil, fmt.Errorf("%w %s %s", err, res.Request.URL.String(), string(resBody))
- }
-
- return rawMessages, nil
+ err = db.Create(rows, dal.From(collector.table))
+ if err != nil {
+ return err
+ }
+ logger.Debug("fetchAsync === total %d rows were saved into database", count)
+ // increase progress only when it was not nested
+ collector.args.Ctx.IncProgress(1)
+ if handler != nil {
+ res.Body = ioutil.NopCloser(bytes.NewBuffer(body))
+ return handler(count, body, res)
+ }
+ return nil
+ })
+ logger.Debug("fetchAsync === enqueued for %s %v", apiUrl, apiQuery)
}
var _ core.SubTask = (*ApiCollector)(nil)
diff --git a/utils/callframes.go b/plugins/helper/api_collector_func.go
similarity index 50%
copy from utils/callframes.go
copy to plugins/helper/api_collector_func.go
index 44cda2d6..690e7a65 100644
--- a/utils/callframes.go
+++ b/plugins/helper/api_collector_func.go
@@ -15,38 +15,40 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package utils
+package helper
import (
+ "encoding/json"
"fmt"
- "runtime"
- "strings"
+ "io/ioutil"
+ "net/http"
)
-func GatherCallFrames() string {
- var name, file string
- var line int
- var pc [16]uintptr
-
- n := runtime.Callers(3, pc[:])
- for _, pc := range pc[:n] {
- fn := runtime.FuncForPC(pc)
- if fn == nil {
- continue
- }
- file, line = fn.FileLine(pc)
- name = fn.Name()
- if !strings.HasPrefix(name, "runtime.") {
- break
- }
+func GetRawMessageDirectFromResponse(res *http.Response) ([]json.RawMessage, error) {
+ body, err := ioutil.ReadAll(res.Body)
+ res.Body.Close()
+ if err != nil {
+ return nil, err
+ }
+ return []json.RawMessage{body}, nil
+}
+
+func GetRawMessageArrayFromResponse(res *http.Response) ([]json.RawMessage, error) {
+ rawMessages := []json.RawMessage{}
+
+ if res == nil {
+ return nil, fmt.Errorf("res is nil")
+ }
+ defer res.Body.Close()
+ resBody, err := ioutil.ReadAll(res.Body)
+ if err != nil {
+ return nil, fmt.Errorf("%w %s", err, res.Request.URL.String())
}
- switch {
- case name != "":
- return fmt.Sprintf("%v:%v", name, line)
- case file != "":
- return fmt.Sprintf("%v:%v", file, line)
+ err = json.Unmarshal(resBody, &rawMessages)
+ if err != nil {
+ return nil, fmt.Errorf("%w %s %s", err, res.Request.URL.String(), string(resBody))
}
- return fmt.Sprintf("pc:%x", pc)
+ return rawMessages, nil
}
diff --git a/plugins/helper/api_collector_test.go b/plugins/helper/api_collector_test.go
index c8f89286..aa8343c2 100644
--- a/plugins/helper/api_collector_test.go
+++ b/plugins/helper/api_collector_test.go
@@ -1,1142 +1,81 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
package helper
import (
"bytes"
- "context"
- "database/sql"
- "encoding/json"
- "fmt"
"io/ioutil"
"net/http"
"net/url"
- "reflect"
- "runtime/debug"
- "sync/atomic"
"testing"
- "time"
- "github.com/agiledragon/gomonkey/v2"
- "github.com/apache/incubator-devlake/logger"
- "github.com/apache/incubator-devlake/models/common"
- "github.com/sirupsen/logrus"
+ "github.com/apache/incubator-devlake/helpers/unithelper"
+ "github.com/apache/incubator-devlake/mocks"
+ "github.com/apache/incubator-devlake/plugins/helper/common"
"github.com/stretchr/testify/assert"
- "gorm.io/gorm"
+ "github.com/stretchr/testify/mock"
)
-// go test -gcflags=all=-l
-
-type TestTable struct {
- Email string `gorm:"primaryKey;type:varchar(255)"`
- Name string `gorm:"type:varchar(255)"`
- common.NoPKModel
-}
-
-type TestTable2 struct {
- Email string `gorm:"primaryKey;type:varchar(255)"`
- Name string `gorm:"type:varchar(255)"`
- common.NoPKModel
-}
-
-var TestTableData *TestTable = &TestTable{
- Email: "test@test.com",
- Name: "test",
-}
-
-type TestParam struct {
- Test string
-}
-
-func (TestTable) TableName() string {
- return "_tool_test"
-}
-
-var TestError error = fmt.Errorf("Error For Test")
-
-var gt *gomonkey.Patches
-var gc *gomonkey.Patches
-var gd *gomonkey.Patches
-var ga *gomonkey.Patches
-var gs *gomonkey.Patches
-var god *gomonkey.Patches
-var gw *gomonkey.Patches
-var gr *gomonkey.Patches
-
-var TestUrlBefor string = "test1"
-var TestUrlParam string = "test2"
-var TestUrlAfter string = "test3"
-var TestUrl string = "https://" + TestUrlBefor + TestUrlParam + TestUrlAfter
-
-var TestRawMessage string = "{\"message\":\"TestRawMessage\"}"
-var TestUrlValueKey string = "TestKey"
-var TestUrlValueValue string = "TestValue"
-var TestNoRunHere string = "should not run to this line of code"
-
-var TestDataCount int = 100
-var TestTotalPage int = 100
-var TestDataCountNotFull int = 50
-var TestPage int = 110
-var TestSkip int = 100100
-var TestSize int = 116102
-var TestTimeOut time.Duration = time.Duration(10) * time.Second
-
-var Cancel context.CancelFunc
-
-var TestHttpResponse_Suc http.Response = http.Response{
- Status: "200 OK",
- StatusCode: http.StatusOK,
- Proto: "HTTP/1.0",
- ProtoMajor: 1,
- ProtoMinor: 0,
-}
-
-var TestHttpResponse_404 http.Response = http.Response{
- Status: "404 Not Found",
- StatusCode: http.StatusNotFound,
- Proto: "HTTP/1.0",
- ProtoMajor: 1,
- ProtoMinor: 0,
-}
-
-// Assert http.Response base test data
-func AssertBaseResponse(t *testing.T, A *http.Response, B *http.Response) {
- assert.Equal(t, A.Status, B.Status)
- assert.Equal(t, A.StatusCode, B.StatusCode)
- assert.Equal(t, A.Proto, B.Proto)
- assert.Equal(t, A.ProtoMajor, B.ProtoMajor)
- assert.Equal(t, A.ProtoMinor, B.ProtoMinor)
-}
-
-func SetTimeOut(timeout time.Duration, handleer func()) error {
- stack := string(debug.Stack())
- t := time.After(timeout)
- done := make(chan bool)
- go func() {
- defer func() {
- if r := recover(); r != nil {
- fmt.Printf("%s\r\n", stack)
- fmt.Printf("%v\r\n", r)
- }
- }()
-
- if handleer != nil {
- handleer()
- }
- done <- true
- }()
-
- select {
- case <-t:
- return fmt.Errorf("[time:%s]\r\n[Time limit for %f seconed]\r\n[stack]\r\n%s\r\n",
- time.Now().String(),
- float64(timeout)/float64(time.Second),
- stack)
- case <-done:
- return nil
- }
-}
-
-func AddBodyData(res *http.Response, count int) {
- data := "["
- for i := 0; i < count; i++ {
- data += TestRawMessage
- if i != count-1 {
- data += ","
- }
- }
- data += "]"
- res.Body = ioutil.NopCloser(bytes.NewReader([]byte(data)))
-}
-
-func SetUrl(res *http.Response, rawURL string) {
- u, _ := url.Parse(rawURL)
- res.Request = &http.Request{
- URL: u,
- }
-}
-
-// Mock the DB api
-// Need be released by UnMockDB
-func MockDB(t *testing.T) {
- gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
- assert.Equal(t, name, TestTableData.TableName())
- return db
- })
-
- gc = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Create", func(db *gorm.DB, value interface{}) *gorm.DB {
- assert.Equal(t, TestTableData, value.(*TestTable))
- return db
- })
-
- gd = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Delete", func(db *gorm.DB, value interface{}, conds ...interface{}) (tx *gorm.DB) {
- return db
- })
-
- ga = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "AutoMigrate", func(db *gorm.DB, dst ...interface{}) error {
- return nil
- })
-
- god = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Order", func(db *gorm.DB, value interface{}) (tx *gorm.DB) {
- return db
- })
-
- gw = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Where", func(db *gorm.DB, query interface{}, args ...interface{}) (tx *gorm.DB) {
- return db
- })
-
- gr = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Count", func(db *gorm.DB, count *int64) (tx *gorm.DB) {
- return db
- })
-
- gr = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Rows", func(db *gorm.DB) (*sql.Rows, error) {
- return &sql.Rows{}, nil
- })
-
- gs = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "ScanRows", func(db *gorm.DB, rows *sql.Rows, dest interface{}) error {
- dest = TestRawMessage
- return nil
- })
-
-}
-
-// released MockDB
-func UnMockDB() {
- gt.Reset()
- gc.Reset()
- gd.Reset()
- ga.Reset()
- god.Reset()
- gw.Reset()
- gr.Reset()
- gs.Reset()
-}
-
-type TestIterator struct {
- data TestTable
- count int
- hasNextTimes int
- fetchTimes int
- closeTimes int
- unlimit bool
-}
-
-func (it *TestIterator) HasNext() bool {
- it.hasNextTimes++
- return it.count > 0
-}
-
-func (it *TestIterator) Fetch() (interface{}, error) {
- it.fetchTimes++
- if it.count > 0 {
- if it.unlimit == false {
- it.count--
- }
- ret := it.data
- return &ret, nil
- }
- return nil, TestError
-}
-
-func (it *TestIterator) Close() error {
- it.closeTimes++
- return nil
-}
-
-func CreateTestApiCollector() (*ApiCollector, error) {
- db := &gorm.DB{}
- var ctx context.Context
- ctx, Cancel = context.WithCancel(context.Background())
- return NewApiCollector(ApiCollectorArgs{
- RawDataSubTaskArgs: RawDataSubTaskArgs{
- Ctx: &DefaultSubTaskContext{
- defaultExecContext: newDefaultExecContext(GetConfigForTest("../../"), logger.NewDefaultLogger(logrus.New(), "Test", make(map[string]*logrus.Logger)), db, ctx, "Test", nil, nil),
- },
- Table: TestTable{}.TableName(),
- Params: &TestParam{
- Test: TestUrlParam,
+func TestFetchPageUndetermined(t *testing.T) {
+ mockDal := new(mocks.Dal)
+ mockDal.On("AutoMigrate", mock.Anything, mock.Anything).Return(nil).Once()
+ mockDal.On("Delete", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
+ mockDal.On("Create", mock.Anything, mock.Anything).Return(nil).Once()
+
+ mockCtx := unithelper.DummySubTaskContext(mockDal)
+
+ mockInput := new(mocks.Iterator)
+ mockInput.On("HasNext").Return(true).Once()
+ mockInput.On("HasNext").Return(false).Once()
+ mockInput.On("Fetch").Return(nil, nil).Once()
+ mockInput.On("Close").Return(nil)
+
+ // simulate fetching all pages of jira changelogs for 1 issue id with 1 concurrency,
+ // assuming api doesn't return total number of pages.
+ // then, we are expecting 2 calls for GetAsync and NextTick each, otherwise, deadlock happens
+ getAsyncCounter := 0
+ mockApi := new(mocks.RateLimitedApiClient)
+ mockApi.On("GetAsync", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
+ // fake records for first page, no records for second page
+ body := "[1,2,3]"
+ if getAsyncCounter > 0 {
+ body = "[]"
+ }
+ getAsyncCounter += 1
+ res := &http.Response{
+ Request: &http.Request{
+ URL: &url.URL{},
},
+ Body: ioutil.NopCloser(bytes.NewBufferString(body)),
+ }
+ handler := args.Get(3).(common.ApiAsyncCallback)
+ handler(res)
+ }).Twice()
+ mockApi.On("NextTick", mock.Anything).Run(func(args mock.Arguments) {
+ handler := args.Get(0).(func() error)
+ assert.Nil(t, handler())
+ }).Twice()
+ mockApi.On("HasError").Return(false)
+ mockApi.On("WaitAsync").Return(nil)
+
+ params := struct {
+ Name string
+ }{Name: "testparams"}
+
+ collector, err := NewApiCollector(ApiCollectorArgs{
+ RawDataSubTaskArgs: RawDataSubTaskArgs{
+ Ctx: mockCtx,
+ Table: "whatever rawtable",
+ Params: params,
},
- ApiClient: &ApiAsyncClient{qps: 10},
- PageSize: 100,
- Incremental: false,
- UrlTemplate: TestUrlBefor + "{{ .Params.Test }}" + TestUrlAfter,
- Query: func(reqData *RequestData) (url.Values, error) {
- u := url.Values{}
- json, err := json.Marshal(reqData.Input)
- u.Add("Vjson", string(json))
- if err != nil {
- u.Add("Verr", err.Error())
- } else {
- u.Add("Verr", "")
- }
- return u, nil
- },
- Header: func(reqData *RequestData) (http.Header, error) {
- h := http.Header{}
- json, err := json.Marshal(reqData.Input)
- h.Add("Hjson", string(json))
- if err != nil {
- h.Add("Herr", err.Error())
- } else {
- h.Add("Herr", "")
- }
- return h, nil
- },
- GetTotalPages: func(res *http.Response, args *ApiCollectorArgs) (int, error) { return TestTotalPage, nil },
+ ApiClient: mockApi,
+ Input: mockInput,
+ UrlTemplate: "whatever url",
+ Concurrency: 1,
+ PageSize: 3,
ResponseParser: GetRawMessageArrayFromResponse,
})
-}
-
-func TestGormDB(t *testing.T) {
- ts := &TestTable{
- Email: "test@test.com",
- Name: "test",
- }
-
- db := &gorm.DB{}
- MockDB(t)
- defer UnMockDB()
-
- db.Table(ts.TableName()).Order(ts).Where(ts).Create(ts).Delete(ts).AutoMigrate()
- db.Table(ts.TableName()).Order(ts).Where(ts).Create(ts).Delete(ts).Rows()
- db.Table(ts.TableName()).Order(ts).Where(ts).Create(ts).Delete(ts).ScanRows(nil, nil)
-}
-
-func TestSaveRawData(t *testing.T) {
- gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
- assert.Equal(t, name, "_raw_"+TestTableData.TableName())
- return db
- },
- )
- defer gt.Reset()
-
- gc = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Create", func(db *gorm.DB, value interface{}) *gorm.DB {
- rd := value.([]*RawData)
- params, _ := json.Marshal(&TestParam{
- Test: TestUrlParam,
- })
- input, _ := json.Marshal(TestTableData)
- for _, v := range rd {
- // check data and url
- assert.Equal(t, v.Params, string(params))
- assert.Equal(t, string(v.Data), TestRawMessage)
- assert.Equal(t, v.Url, TestUrl)
- assert.Equal(t, v.Input.String(), string(input))
- }
- return db
- },
- )
- defer gc.Reset()
-
- apiCollector, _ := CreateTestApiCollector()
-
- resBase := TestHttpResponse_Suc
- res := &resBase
-
- // build data and url
- AddBodyData(res, TestDataCount)
- SetUrl(res, TestUrl)
-
- i, err := apiCollector.saveRawData(res, TestTableData)
- assert.Equal(t, i, TestDataCount)
- assert.Equal(t, err, nil)
-}
-
-func TestSaveRawData_Fail(t *testing.T) {
- gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
- assert.Empty(t, TestNoRunHere)
- return db
- },
- )
- defer gt.Reset()
-
- gc = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Create", func(db *gorm.DB, value interface{}) *gorm.DB {
- assert.Empty(t, TestNoRunHere)
- return db
- },
- )
- defer gc.Reset()
-
- apiCollector, _ := CreateTestApiCollector()
-
- // build data and url
- resBase := TestHttpResponse_404
- res := &resBase
-
- AddBodyData(res, 0)
- SetUrl(res, TestUrl)
-
- //run testing
- i, err := apiCollector.saveRawData(res, TestTableData)
- assert.Equal(t, i, 0)
- assert.Equal(t, err, nil)
-}
-
-func TestHandleResponse(t *testing.T) {
- apiCollector, _ := CreateTestApiCollector()
-
- gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
- items, err := collector.args.ResponseParser(res)
- assert.Equal(t, err, nil)
- // check items data
- for _, v := range items {
- jsondata, err := json.Marshal(v)
- assert.Equal(t, err, nil)
- assert.Equal(t, string(jsondata), TestRawMessage)
- }
- assert.Equal(t, input, TestTableData)
- AssertBaseResponse(t, res, &TestHttpResponse_Suc)
- return len(items), nil
- })
- defer gs.Reset()
-
- // build requeset input
- reqData := new(RequestData)
- reqData.Input = TestTableData
- handle := apiCollector.handleResponse(reqData)
-
- resBase := TestHttpResponse_Suc
- res := &resBase
-
- // build data and url
- AddBodyData(res, TestDataCount)
- SetUrl(res, TestUrl)
-
- // run testing
- err := handle(res, nil)
- assert.Equal(t, err, nil)
-}
-
-func TestHandleResponse_Fail(t *testing.T) {
- apiCollector, _ := CreateTestApiCollector()
-
- gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
- items, err := collector.args.ResponseParser(res)
- assert.Equal(t, err, nil)
- for _, v := range items {
- jsondata, err := json.Marshal(v)
- assert.Equal(t, err, nil)
- assert.Equal(t, string(jsondata), TestRawMessage)
- }
- assert.Equal(t, input, TestTableData)
- AssertBaseResponse(t, res, &TestHttpResponse_404)
- return len(items), TestError
- })
- defer gs.Reset()
-
- // build requeset input
- reqData := new(RequestData)
- reqData.Input = TestTableData
- handle := apiCollector.handleResponse(reqData)
-
- // build data and url
- resBase := TestHttpResponse_404
- res := &resBase
-
- AddBodyData(res, 0)
- SetUrl(res, TestUrl)
-
- // run testing
- err := handle(res, nil)
- assert.Equal(t, err, TestError)
-}
-
-func TestFetchAsync(t *testing.T) {
- apiCollector, _ := CreateTestApiCollector()
-
- gg := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "GetAsync", func(apiAsyncClient *ApiAsyncClient, path string, query url.Values, header http.Header, handler ApiAsyncCallback) error {
- assert.Equal(t, path, TestUrlBefor+TestUrlParam+TestUrlAfter)
-
- json, err := json.Marshal(TestTableData)
- assert.Equal(t, query.Get("Vjson"), string(json))
- assert.Equal(t, header.Get("Hjson"), string(json))
- if err != nil {
- assert.Equal(t, query.Get("Verr"), err.Error())
- assert.Equal(t, header.Get("Herr"), err.Error())
- } else {
- assert.Equal(t, query.Get("Verr"), "")
- assert.Equal(t, header.Get("Herr"), "")
- }
-
- res := TestHttpResponse_Suc
- handler(&res, TestError)
- return nil
- })
- defer gg.Reset()
-
- // build request Input
- reqData := new(RequestData)
- reqData.Input = TestTableData
-
- // run testing
- err := apiCollector.fetchAsync(reqData, func(r *http.Response, err error) error {
- AssertBaseResponse(t, r, &TestHttpResponse_Suc)
- assert.Equal(t, err, TestError)
- return err
- })
-
- assert.Equal(t, err, nil)
-}
-
-func TestFetchAsync_Fail(t *testing.T) {
- apiCollector, _ := CreateTestApiCollector()
-
- gg := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "GetAsync", func(apiAsyncClient *ApiAsyncClient, path string, query url.Values, header http.Header, handler ApiAsyncCallback) error {
- assert.Equal(t, path, TestUrlBefor+TestUrlParam+TestUrlAfter)
-
- json, err := json.Marshal(TestTableData)
- assert.Equal(t, query.Get("Vjson"), string(json))
- assert.Equal(t, header.Get("Hjson"), string(json))
- if err != nil {
- assert.Equal(t, query.Get("Verr"), err.Error())
- assert.Equal(t, header.Get("Herr"), err.Error())
- } else {
- assert.Equal(t, query.Get("Verr"), "")
- assert.Equal(t, header.Get("Herr"), "")
- }
-
- res := TestHttpResponse_404
- handler(&res, TestError)
- return TestError
- })
- defer gg.Reset()
-
- // build request Input
- reqData := new(RequestData)
- reqData.Input = TestTableData
-
- // run testing
- err := apiCollector.fetchAsync(reqData, func(r *http.Response, err error) error {
- AssertBaseResponse(t, r, &TestHttpResponse_404)
- assert.Equal(t, err, TestError)
- return err
- })
-
- assert.Equal(t, err, TestError)
-}
-
-func TestHandleResponseWithPages(t *testing.T) {
- apiCollector, _ := CreateTestApiCollector()
- pages := make([]bool, TestTotalPage+1)
- for i := 1; i <= TestTotalPage; i++ {
- pages[i] = false
- }
-
- gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
- page := reqData.Pager.Page
- pages[page] = true
- return nil
- })
- defer gf.Reset()
-
- gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
- items, err := collector.args.ResponseParser(res)
- assert.Equal(t, err, nil)
- for _, v := range items {
- jsondata, err := json.Marshal(v)
- assert.Equal(t, err, nil)
- assert.Equal(t, string(jsondata), TestRawMessage)
- }
- assert.Equal(t, input, TestTableData)
- AssertBaseResponse(t, res, &TestHttpResponse_Suc)
- return len(items), nil
- })
- defer gs.Reset()
-
- NeedWait := int64(0)
- gad := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "Add", func(apiClient *ApiAsyncClient, delta int) {
- atomic.AddInt64(&NeedWait, int64(delta))
- })
- defer gad.Reset()
-
- gdo := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "Done", func(apiClient *ApiAsyncClient) {
- atomic.AddInt64(&NeedWait, -1)
- })
- defer gdo.Reset()
- // build request Input
- reqData := new(RequestData)
- reqData.Input = TestTableData
- handle := apiCollector.handleResponseWithPages(reqData)
+ assert.Nil(t, err)
+ assert.Nil(t, collector.Execute())
- // build data and url
- resBase := TestHttpResponse_Suc
- res := &resBase
-
- AddBodyData(res, TestDataCount)
- SetUrl(res, TestUrl)
-
- // run testing
- err := handle(res, nil)
-
- // wait run finished
- for atomic.LoadInt64(&NeedWait) > 0 {
- time.Sleep(time.Millisecond)
- }
-
- assert.Equal(t, err, nil)
- for i := 2; i <= TestTotalPage; i++ {
- assert.True(t, pages[i], i)
- }
-}
-
-func TestHandleResponseWithPages_Fail(t *testing.T) {
- apiCollector, _ := CreateTestApiCollector()
-
- gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
- return TestError
- })
- defer gf.Reset()
-
- gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
- items, err := collector.args.ResponseParser(res)
- assert.Equal(t, err, nil)
- for _, v := range items {
- jsondata, err := json.Marshal(v)
- assert.Equal(t, err, nil)
- assert.Equal(t, string(jsondata), TestRawMessage)
- }
- assert.Equal(t, input, TestTableData)
- AssertBaseResponse(t, res, &TestHttpResponse_404)
- return len(items), TestError
- })
- defer gs.Reset()
-
- // build request Input
- reqData := new(RequestData)
- reqData.Input = TestTableData
- handle := apiCollector.handleResponseWithPages(reqData)
-
- // build data and url
- resBase := TestHttpResponse_404
- res := &resBase
-
- AddBodyData(res, 0)
- SetUrl(res, TestUrl)
-
- // run testing
- err := handle(res, nil)
- assert.Equal(t, err, TestError)
-}
-
-func TestStepFetch(t *testing.T) {
- apiCollector, _ := CreateTestApiCollector()
-
- init := false
- noFullTimes := 0
-
- gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
- items, err := collector.args.ResponseParser(res)
- assert.Equal(t, err, nil)
- for _, v := range items {
- jsondata, err := json.Marshal(v)
- assert.Equal(t, err, nil)
- assert.Equal(t, string(jsondata), TestRawMessage)
- }
- // full page
- assert.Equal(t, input, TestTableData)
- AssertBaseResponse(t, res, &TestHttpResponse_Suc)
- return len(items), nil
- })
- defer gs.Reset()
-
- gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
- resBase := TestHttpResponse_Suc
- res := &resBase
- SetUrl(res, TestUrl)
-
- // full page for continue
- if reqData.Pager.Page == TestPage {
- init = true
- assert.Equal(t, reqData.Pager.Skip, TestSkip)
- assert.Equal(t, reqData.Pager.Size, TestSize)
- AddBodyData(res, TestDataCount)
- } else {
- // not full page for stop
- AddBodyData(res, TestDataCountNotFull)
- noFullTimes++
- }
-
- go handler(res, nil)
- return nil
- })
- defer gf.Reset()
-
- // build request Input
- reqData := new(RequestData)
- reqData.Input = TestTableData
- reqData.Pager = &Pager{
- Page: TestPage,
- Skip: TestSkip,
- Size: TestSize,
- }
-
- // cancel can only be called when error occurs, because we are doomed anyway.
- ctx, cancel := context.WithCancel(apiCollector.args.Ctx.GetContext())
-
- // run testing
- err := apiCollector.stepFetch(ctx, cancel, *reqData)
-
- assert.Equal(t, noFullTimes, 1)
- assert.Equal(t, init, true)
- assert.Equal(t, err, nil)
-}
-
-func TestStepFetch_Fail(t *testing.T) {
- apiCollector, _ := CreateTestApiCollector()
-
- init := false
- noFullTimes := 0
-
- gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
- items, err := collector.args.ResponseParser(res)
- assert.Equal(t, err, nil)
- for _, v := range items {
- jsondata, err := json.Marshal(v)
- assert.Equal(t, err, nil)
- assert.Equal(t, string(jsondata), TestRawMessage)
- }
- // full page
- assert.Equal(t, input, TestTableData)
- AssertBaseResponse(t, res, &TestHttpResponse_404)
- return len(items), nil
- })
- defer gs.Reset()
-
- gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
- resBase := TestHttpResponse_404
- res := &resBase
- SetUrl(res, TestUrl)
- // full page for continue
- if reqData.Pager.Page == TestPage {
- init = true
- assert.Equal(t, reqData.Pager.Skip, TestSkip)
- assert.Equal(t, reqData.Pager.Size, TestSize)
- AddBodyData(res, TestDataCount)
- go handler(res, nil)
- } else {
- // not full page for stop
- AddBodyData(res, TestDataCountNotFull)
- noFullTimes++
- return TestError
- }
- return nil
- })
- defer gf.Reset()
-
- // build request Input
- reqData := new(RequestData)
- reqData.Input = TestTableData
- reqData.Pager = &Pager{
- Page: TestPage,
- Skip: TestSkip,
- Size: TestSize,
- }
-
- // cancel can only be called when error occurs, because we are doomed anyway.
- ctx, cancel := context.WithCancel(apiCollector.args.Ctx.GetContext())
-
- err := apiCollector.stepFetch(ctx, cancel, *reqData)
-
- assert.Equal(t, noFullTimes, 1)
- assert.Equal(t, init, true)
- assert.Equal(t, err, TestError)
-}
-
-func TestStepFetch_Cancel(t *testing.T) {
- apiCollector, _ := CreateTestApiCollector()
-
- gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
- items, err := collector.args.ResponseParser(res)
- assert.Equal(t, err, nil)
- for _, v := range items {
- jsondata, err := json.Marshal(v)
- assert.Equal(t, err, nil)
- assert.Equal(t, string(jsondata), TestRawMessage)
- }
- // full page
- assert.Equal(t, input, TestTableData)
- AssertBaseResponse(t, res, &TestHttpResponse_Suc)
- return len(items), nil
- })
- defer gs.Reset()
-
- gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
- resBase := TestHttpResponse_Suc
- res := &resBase
- SetUrl(res, TestUrl)
- // always to continue
- assert.Equal(t, reqData.Pager.Size, TestSize)
- AddBodyData(res, TestDataCount)
- go handler(res, nil)
-
- return nil
- })
- defer gf.Reset()
-
- // build request Input
- reqData := new(RequestData)
- reqData.Input = TestTableData
- reqData.Pager = &Pager{
- Page: TestPage,
- Skip: TestSkip,
- Size: TestSize,
- }
-
- // cancel can only be called when error occurs, because we are doomed anyway.
-
- ctx, cancel := context.WithCancel(apiCollector.args.Ctx.GetContext())
-
- go func() {
- time.Sleep(time.Duration(500) * time.Microsecond)
- Cancel()
- }()
-
- err := SetTimeOut(TestTimeOut, func() {
- err := apiCollector.stepFetch(ctx, cancel, *reqData)
- assert.Equal(t, err, fmt.Errorf("context canceled"))
- })
- assert.Equal(t, err, nil)
-
-}
-
-func TestExecWithOutPageSize(t *testing.T) {
- apiCollector, _ := CreateTestApiCollector()
- apiCollector.args.PageSize = 0
-
- gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
- assert.Equal(t, reqData.Input, TestTableData)
- return nil
- })
- defer gf.Reset()
-
- gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "stepFetch", func(collector *ApiCollector, ctx context.Context, cancel func(), reqData RequestData) error {
- assert.Empty(t, TestNoRunHere)
- return TestError
- })
- defer gs.Reset()
-
- // run testing
- err := apiCollector.exec(TestTableData)
- assert.Equal(t, err, nil)
-}
-
-func TestExecWithGetTotalPages(t *testing.T) {
- apiCollector, _ := CreateTestApiCollector()
-
- gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
- assert.Equal(t, reqData.Input, TestTableData)
- return nil
- })
- defer gf.Reset()
-
- gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "stepFetch", func(collector *ApiCollector, ctx context.Context, cancel func(), reqData RequestData) error {
- assert.Empty(t, TestNoRunHere)
- return TestError
- })
- defer gs.Reset()
-
- // run testing
- err := apiCollector.exec(TestTableData)
- assert.Equal(t, err, nil)
-}
-
-func TestExecWithOutGetTotalPages(t *testing.T) {
- apiCollector, _ := CreateTestApiCollector()
- apiCollector.args.GetTotalPages = nil
- apiCollector.args.Concurrency = TestTotalPage
-
- pages := make([]bool, TestTotalPage+1)
- for i := 1; i <= TestTotalPage; i++ {
- pages[i] = false
- }
-
- gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
- assert.Equal(t, reqData.Input, TestTableData)
- return nil
- })
- defer gf.Reset()
-
- gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "stepFetch", func(collector *ApiCollector, ctx context.Context, cancel func(), reqData RequestData) error {
- assert.Equal(t, reqData.Input, TestTableData)
- page := reqData.Pager.Page
- pages[page] = true
- assert.Equal(t, reqData.Pager.Size, apiCollector.args.PageSize)
- assert.Equal(t, reqData.Pager.Skip, apiCollector.args.PageSize*(page-1))
- return nil
- })
- defer gs.Reset()
-
- // run testing
- err := apiCollector.exec(TestTableData)
- assert.Equal(t, err, nil)
-
- for i := 2; i <= TestTotalPage; i++ {
- assert.True(t, pages[i], i)
- }
-}
-
-func TestExec_Cancel(t *testing.T) {
- apiCollector, _ := CreateTestApiCollector()
- apiCollector.args.GetTotalPages = nil
- apiCollector.args.Concurrency = TestTotalPage
-
- pages := make([]bool, TestTotalPage+1)
- for i := 1; i <= TestTotalPage; i++ {
- pages[i] = false
- }
-
- gf := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "fetchAsync", func(collector *ApiCollector, reqData *RequestData, handler ApiAsyncCallback) error {
- assert.Equal(t, reqData.Input, TestTableData)
- return nil
- })
- defer gf.Reset()
-
- gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "stepFetch", func(collector *ApiCollector, ctx context.Context, cancel func(), reqData RequestData) error {
- assert.Equal(t, reqData.Input, TestTableData)
- page := reqData.Pager.Page
- pages[page] = true
- assert.Equal(t, reqData.Pager.Size, apiCollector.args.PageSize)
- assert.Equal(t, reqData.Pager.Skip, apiCollector.args.PageSize*(page-1))
-
- // check if it can get cancel command
- for range ctx.Done() {
- }
-
- return nil
- })
- defer gs.Reset()
-
- go func() {
- time.Sleep(time.Duration(500) * time.Microsecond)
- Cancel()
- }()
-
- err := SetTimeOut(TestTimeOut, func() {
- // run testing
- err := apiCollector.exec(TestTableData)
- assert.Equal(t, err, nil)
- for i := 2; i <= TestTotalPage; i++ {
- assert.True(t, pages[i], i)
- }
- })
- assert.Equal(t, err, nil)
-}
-
-func TestExecute(t *testing.T) {
- MockDB(t)
- defer UnMockDB()
- apiCollector, _ := CreateTestApiCollector()
-
- apiCollector.args.Input = &TestIterator{
- data: *TestTableData,
- count: TestDataCount,
- hasNextTimes: 0,
- fetchTimes: 0,
- closeTimes: 0,
- unlimit: false,
- }
-
- gt.Reset()
- gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
- assert.Equal(t, name, "_raw_"+TestTableData.TableName())
- return db
- },
- )
-
- NeedWait := int64(0)
- execTimes := 0
-
- ge := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "exec", func(collector *ApiCollector, input interface{}) error {
- atomic.AddInt64(&NeedWait, 1)
- execTimes++
- assert.Equal(t, input.(*TestTable).Email, TestTableData.Email)
- assert.Equal(t, input.(*TestTable).Name, TestTableData.Name)
- atomic.AddInt64(&NeedWait, -1)
- return nil
- })
- defer ge.Reset()
-
- gw := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "WaitAsync", func(apiClient *ApiAsyncClient) error {
- for atomic.LoadInt64(&NeedWait) > 0 {
- time.Sleep(time.Millisecond)
- }
- return nil
- })
- defer gw.Reset()
-
- // run testing
- err := apiCollector.Execute()
- assert.Equal(t, err, nil)
- assert.Equal(t, execTimes, TestDataCount)
-
- input := apiCollector.args.Input.(*TestIterator)
- assert.Equal(t, input.fetchTimes, TestDataCount)
- assert.Equal(t, input.hasNextTimes >= input.fetchTimes, true)
- assert.Equal(t, input.closeTimes > 0, true)
-}
-
-func TestExecute_Cancel(t *testing.T) {
- MockDB(t)
- defer UnMockDB()
- apiCollector, _ := CreateTestApiCollector()
-
- apiCollector.args.Input = &TestIterator{
- data: *TestTableData,
- count: TestDataCount,
- hasNextTimes: 0,
- fetchTimes: 0,
- closeTimes: 0,
- unlimit: true,
- }
-
- apiCollector.args.Input.HasNext()
-
- gt.Reset()
- gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
- assert.Equal(t, name, "_raw_"+TestTableData.TableName())
- return db
- },
- )
-
- NeedWait := int64(0)
- execTimes := 0
-
- ge := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "exec", func(collector *ApiCollector, input interface{}) error {
- atomic.AddInt64(&NeedWait, 1)
- execTimes++
- assert.Equal(t, input.(*TestTable).Email, TestTableData.Email)
- assert.Equal(t, input.(*TestTable).Name, TestTableData.Name)
- atomic.AddInt64(&NeedWait, -1)
- return nil
- })
- defer ge.Reset()
-
- gw := gomonkey.ApplyMethod(reflect.TypeOf(&ApiAsyncClient{}), "WaitAsync", func(apiClient *ApiAsyncClient) error {
- for atomic.LoadInt64(&NeedWait) > 0 {
- time.Sleep(time.Millisecond)
- }
- return nil
- })
- defer gw.Reset()
-
- go func() {
- time.Sleep(time.Duration(500) * time.Microsecond)
- Cancel()
- }()
-
- err := SetTimeOut(TestTimeOut, func() {
- // run testing
- err := apiCollector.Execute()
- assert.Equal(t, err, fmt.Errorf("context canceled"))
-
- input := apiCollector.args.Input.(*TestIterator)
- assert.Equal(t, input.hasNextTimes >= input.fetchTimes, true)
- assert.Equal(t, input.closeTimes > 0, true)
- })
- assert.Equal(t, err, nil)
-}
-
-func TestExecute_Total(t *testing.T) {
- MockDB(t)
- defer UnMockDB()
- apiCollector, _ := CreateTestApiCollector()
- // less count for more quick test
- TestDataCount = 10
- // ReLimit the workNum to test the block
- reWorkNum := 1
-
- apiCollector.args.Input = &TestIterator{
- data: *TestTableData,
- count: TestDataCount,
- hasNextTimes: 0,
- fetchTimes: 0,
- closeTimes: 0,
- unlimit: false,
- }
-
- gt.Reset()
- gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
- assert.Equal(t, name, "_raw_"+TestTableData.TableName())
- return db
- },
- )
- defer gw.Reset()
-
- gs := gomonkey.ApplyPrivateMethod(reflect.TypeOf(apiCollector), "saveRawData", func(collector *ApiCollector, res *http.Response, input interface{}) (int, error) {
- items, err := collector.args.ResponseParser(res)
- assert.Equal(t, err, nil)
- for _, v := range items {
- jsondata, err := json.Marshal(v)
- assert.Equal(t, err, nil)
- assert.Equal(t, string(jsondata), TestRawMessage)
- }
- assert.Equal(t, input, TestTableData)
- AssertBaseResponse(t, res, &TestHttpResponse_Suc)
- return len(items), nil
- })
- defer gs.Reset()
-
- gin := gomonkey.ApplyMethod(reflect.TypeOf(&logger.DefaultLogger{}), "Info", func(_ *logger.DefaultLogger, _ string, _ ...interface{}) {
- })
- defer gin.Reset()
-
- gdo := gomonkey.ApplyMethod(reflect.TypeOf(&ApiClient{}), "Do", func(
- apiClient *ApiClient,
- method string,
- path string,
- query url.Values,
- body interface{},
- headers http.Header,
- ) (*http.Response, error) {
- res := TestHttpResponse_Suc
-
- AddBodyData(&res, TestDataCount)
- SetUrl(&res, TestUrl)
-
- return &res, nil
- })
- defer gdo.Reset()
-
- var gse *gomonkey.Patches
- gse = gomonkey.ApplyFunc(NewWorkerScheduler, func(workerNum int, maxWork int, maxWorkDuration time.Duration, ctx context.Context, maxRetry int) (*WorkerScheduler, error) {
- gse.Reset()
- workerNum = reWorkNum
- return NewWorkerScheduler(workerNum, maxWork, maxWorkDuration, ctx, maxRetry)
- })
- defer gse.Reset()
-
- // create rate limit calculator
- rateLimiter := &ApiRateLimitCalculator{
- UserRateLimitPerHour: 360000000, // 100000 times each seconed
- }
-
- apiCollector.args.ApiClient, _ = CreateTestAsyncApiClientWithRateLimitAndCtx(t, rateLimiter, apiCollector.args.Ctx.GetContext())
-
- err := SetTimeOut(TestTimeOut, func() {
- err := apiCollector.Execute()
- assert.Equal(t, err, nil)
-
- input := apiCollector.args.Input.(*TestIterator)
- assert.Equal(t, input.fetchTimes, TestDataCount)
- assert.Equal(t, input.hasNextTimes >= input.fetchTimes, true)
- assert.Equal(t, input.closeTimes > 0, true)
- })
- assert.Equal(t, err, nil)
+ mockDal.AssertExpectations(t)
}
diff --git a/plugins/helper/api_extractor.go b/plugins/helper/api_extractor.go
index c96e33ed..fc09c27e 100644
--- a/plugins/helper/api_extractor.go
+++ b/plugins/helper/api_extractor.go
@@ -26,13 +26,10 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
)
-// Accept raw json body and params, return list of entities that need to be stored
-type RawDataExtractor func(row *RawData) ([]interface{}, error)
-
type ApiExtractorArgs struct {
RawDataSubTaskArgs
Params interface{}
- Extract RawDataExtractor
+ Extract func(row *RawData) ([]interface{}, error)
BatchSize int
}
diff --git a/plugins/helper/api_extractor_test.go b/plugins/helper/api_extractor_test.go.old
similarity index 100%
rename from plugins/helper/api_extractor_test.go
rename to plugins/helper/api_extractor_test.go.old
diff --git a/plugins/helper/batch_save_divider_test.go b/plugins/helper/batch_save_divider_test.go.old
similarity index 100%
rename from plugins/helper/batch_save_divider_test.go
rename to plugins/helper/batch_save_divider_test.go.old
diff --git a/plugins/helper/batch_save_test.go b/plugins/helper/batch_save_test.go.old
similarity index 100%
rename from plugins/helper/batch_save_test.go
rename to plugins/helper/batch_save_test.go.old
diff --git a/plugins/helper/subtask_flow_test_helper.go b/plugins/helper/common/callbacks.go
similarity index 90%
rename from plugins/helper/subtask_flow_test_helper.go
rename to plugins/helper/common/callbacks.go
index 4723e764..e7dd5633 100644
--- a/plugins/helper/subtask_flow_test_helper.go
+++ b/plugins/helper/common/callbacks.go
@@ -15,4 +15,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package helper
+package common
+
+import "net/http"
+
+type ApiAsyncCallback func(*http.Response) error
diff --git a/plugins/helper/connection.go b/plugins/helper/connection.go
index 30e9ab0d..26379be5 100644
--- a/plugins/helper/connection.go
+++ b/plugins/helper/connection.go
@@ -20,15 +20,15 @@ package helper
import (
"encoding/base64"
"fmt"
- "github.com/apache/incubator-devlake/config"
+ "reflect"
+ "strconv"
+
"github.com/apache/incubator-devlake/models/common"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/go-playground/validator/v10"
"github.com/mitchellh/mapstructure"
"gorm.io/gorm"
"gorm.io/gorm/clause"
- "reflect"
- "strconv"
)
type BaseConnection struct {
@@ -56,183 +56,173 @@ type RestConnection struct {
RateLimit int `comment:"api request rate limt per hour" json:"rateLimit"`
}
-// CreateConnection populate from request input into connection which come from REST functions to connection struct and save to DB
-// and only change value which `data` has
-// mergeFieldsToConnection merges fields from data
-// `connection` is the pointer of a plugin connection
-// `data` is http request input param
-func CreateConnection(data map[string]interface{}, connection interface{}, db *gorm.DB) error {
- var err error
- // update fields from request body
- err = mergeFieldsToConnection(connection, data)
- if err != nil {
- return err
+type ConnectionApiHelper struct {
+ encKey string
+ log core.Logger
+ db *gorm.DB
+ validator *validator.Validate
+}
+
+func NewConnectionHelper(
+ basicRes core.BasicRes,
+ vld *validator.Validate,
+) *ConnectionApiHelper {
+ if vld == nil {
+ vld = validator.New()
+ }
+ return &ConnectionApiHelper{
+ encKey: basicRes.GetConfig(core.EncodeKeyEnvStr),
+ log: basicRes.GetLogger(),
+ db: basicRes.GetDb(),
+ validator: vld,
}
- err = saveToDb(connection, db)
+}
+
+// Create a connection record based on request body
+func (c *ConnectionApiHelper) Create(connection interface{}, input *core.ApiResourceInput) error {
+ // update fields from request body
+ err := c.merge(connection, input.Body)
if err != nil {
return err
}
- return nil
+ return c.save(connection)
}
-func PatchConnection(input *core.ApiResourceInput, connection interface{}, db *gorm.DB) error {
- err := GetConnection(input.Params, connection, db)
+func (c *ConnectionApiHelper) Patch(connection interface{}, input *core.ApiResourceInput) error {
+ err := c.First(connection, input.Params)
if err != nil {
return err
}
- err = CreateConnection(input.Body, connection, db)
+ err = c.merge(connection, input.Body)
if err != nil {
return err
}
-
- return nil
+ return c.save(connection)
}
-func saveToDb(connection interface{}, db *gorm.DB) error {
- err := EncryptConnection(connection)
- if err != nil {
- return err
+// First finds connection from db by parsing request input and decrypt it
+func (c *ConnectionApiHelper) First(connection interface{}, params map[string]string) error {
+ connectionId := params["connectionId"]
+ if connectionId == "" {
+ return fmt.Errorf("missing connectionId")
+ }
+ id, err := strconv.ParseUint(connectionId, 10, 64)
+ if err != nil || id < 1 {
+ return fmt.Errorf("invalid connectionId")
}
- err = db.Clauses(clause.OnConflict{UpdateAll: true}).Save(connection).Error
+ return c.FirstById(connection, id)
+}
+
+func (c *ConnectionApiHelper) FirstById(connection interface{}, id uint64) error {
+ err := c.db.First(connection, "id = ?", id).Error
if err != nil {
return err
}
-
- return DecryptConnection(connection)
+ c.decrypt(connection)
+ return nil
}
-// mergeFieldsToConnection will populate all value in map to connection struct and validate the struct
-func mergeFieldsToConnection(specificConnection interface{}, connections ...map[string]interface{}) error {
- // decode
- for _, connection := range connections {
- err := mapstructure.Decode(connection, specificConnection)
- if err != nil {
- return err
- }
- }
- // validate
- vld := validator.New()
- err := vld.Struct(specificConnection)
+// List returns all connections with password/token decrypted
+func (c *ConnectionApiHelper) List(connections interface{}) error {
+ err := c.db.Find(connections).Error
if err != nil {
return err
}
-
+ conns := reflect.ValueOf(connections).Elem()
+ for i := 0; i < conns.Len(); i++ {
+ c.decrypt(conns.Index(i).Addr().Interface())
+ }
return nil
}
-func getEncKey() (string, error) {
- v := config.GetConfig()
- encKey := v.GetString(core.EncodeKeyEnvStr)
- if encKey == "" {
- // Randomly generate a bunch of encryption keys and set them to config
- encKey = core.RandomEncKey()
- v.Set(core.EncodeKeyEnvStr, encKey)
- err := config.WriteConfig(v)
- if err != nil {
- return encKey, err
- }
- }
- return encKey, nil
+// Delete connection
+func (c *ConnectionApiHelper) Delete(connection interface{}) error {
+ return c.db.Delete(connection).Error
}
-// GetConnection finds connection from db by parsing request input and decrypt it
-func GetConnection(data map[string]string, connection interface{}, db *gorm.DB) error {
- id, err := GetConnectionIdByInputParam(data)
+func (c *ConnectionApiHelper) merge(connection interface{}, body map[string]interface{}) error {
+ // merge
+ err := mapstructure.Decode(body, connection)
if err != nil {
- return fmt.Errorf("invalid connectionId")
+ return err
}
-
- err = db.First(connection, id).Error
+ // validate
+ err = c.validator.Struct(connection)
if err != nil {
return err
}
- return DecryptConnection(connection)
-
+ return nil
}
-// ListConnections returns all connections with password/token decrypted
-func ListConnections(connections interface{}, db *gorm.DB) error {
- err := db.Find(connections).Error
- connPtr := reflect.ValueOf(connections)
- connVal := reflect.Indirect(connPtr)
+func (c *ConnectionApiHelper) save(connection interface{}) error {
+ c.encrypt(connection)
+
+ err := c.db.Clauses(clause.OnConflict{UpdateAll: true}).Create(connection).Error
if err != nil {
return err
}
- for i := 0; i < connVal.Len(); i++ {
- //connVal.Index(i) returns value of ith elem in connections, .Elem() reutrns the original elem
- tmp := connVal.Index(i).Elem()
- err = DecryptConnection(tmp.Addr().Interface())
- if err != nil {
- return err
- }
- }
- return nil
-}
-// GetConnectionIdByInputParam gets connectionId by parsing request input
-func GetConnectionIdByInputParam(data map[string]string) (uint64, error) {
- connectionId := data["connectionId"]
- if connectionId == "" {
- return 0, fmt.Errorf("missing connectionId")
- }
- return strconv.ParseUint(connectionId, 10, 64)
+ c.decrypt(connection)
+ return nil
}
-func firstFieldNameWithTag(t reflect.Type, tag string) string {
- fieldName := ""
- for i := 0; i < t.NumField(); i++ {
- field := t.Field(i)
- if field.Type.Kind() == reflect.Struct {
- fieldName = firstFieldNameWithTag(field.Type, tag)
- } else {
- if field.Tag.Get(tag) == "yes" {
- fieldName = field.Name
- }
- }
+func (c *ConnectionApiHelper) decrypt(connection interface{}) {
+ err := UpdateEncryptFields(connection, func(encrypted string) (string, error) {
+ return core.Decrypt(c.encKey, encrypted)
+ })
+ if err != nil {
+ c.log.Error("failed to decrypt: %w", err)
}
- return fieldName
}
-// DecryptConnection decrypts password/token field for connection
-func DecryptConnection(connection interface{}) error {
- dataVal := reflect.ValueOf(connection)
- if dataVal.Kind() != reflect.Ptr {
- panic("connection is not a pointer")
- }
- encKey, err := getEncKey()
+func (c *ConnectionApiHelper) encrypt(connection interface{}) {
+ err := UpdateEncryptFields(connection, func(plaintext string) (string, error) {
+ return core.Encrypt(c.encKey, plaintext)
+ })
if err != nil {
- return nil
+ c.log.Error("failed to encrypt: %w", err)
}
- dataType := reflect.Indirect(dataVal).Type()
- fieldName := firstFieldNameWithTag(dataType, "encrypt")
- if len(fieldName) > 0 {
- decryptStr, _ := core.Decrypt(encKey, dataVal.Elem().FieldByName(fieldName).String())
- dataVal.Elem().FieldByName(fieldName).Set(reflect.ValueOf(decryptStr))
- }
- return nil
}
-func EncryptConnection(connection interface{}) error {
- dataVal := reflect.ValueOf(connection)
- if dataVal.Kind() != reflect.Ptr {
- panic("connection is not a pointer")
+// UpdateEncryptFields update fields of val with tag `encrypt:"yes|true"`
+func UpdateEncryptFields(val interface{}, update func(in string) (string, error)) error {
+ v := reflect.ValueOf(val)
+ if v.Kind() != reflect.Ptr {
+ panic(fmt.Errorf("val is not a pointer: %v", val))
}
- encKey, err := getEncKey()
- if err != nil {
- return err
+ e := v.Elem()
+ if e.Kind() != reflect.Struct {
+ panic(fmt.Errorf("*val is not a struct: %v", val))
}
- dataType := reflect.Indirect(dataVal).Type()
- fieldName := firstFieldNameWithTag(dataType, "encrypt")
- if len(fieldName) > 0 {
- plainPwd := dataVal.Elem().FieldByName(fieldName).String()
- encyptedStr, err := core.Encrypt(encKey, plainPwd)
-
- if err != nil {
- return err
+ t := e.Type()
+ for i := 0; i < t.NumField(); i++ {
+ field := t.Field(i)
+ if !field.IsExported() {
+ continue
+ }
+ if field.Type.Kind() == reflect.Struct {
+ err := UpdateEncryptFields(e.Field(i).Addr().Interface(), update)
+ if err != nil {
+ return err
+ }
+ } else if field.Type.Kind() == reflect.Ptr && field.Type.Elem().Kind() == reflect.Struct {
+ fmt.Printf("field : %v\n", e.Field(i).Interface())
+ err := UpdateEncryptFields(e.Field(i).Interface(), update)
+ if err != nil {
+ return err
+ }
+ } else if field.Type.Kind() == reflect.String {
+ tagValue := field.Tag.Get("encrypt")
+ if tagValue == "yes" || tagValue == "true" {
+ out, err := update(e.Field(i).String())
+ if err != nil {
+ return err
+ }
+ e.Field(i).Set(reflect.ValueOf(out))
+ }
}
- dataVal.Elem().FieldByName(fieldName).Set(reflect.ValueOf(encyptedStr))
}
return nil
}
diff --git a/plugins/helper/connection_test.go b/plugins/helper/connection_test.go
index bd1c7cc2..f04f4a92 100644
--- a/plugins/helper/connection_test.go
+++ b/plugins/helper/connection_test.go
@@ -18,149 +18,92 @@ limitations under the License.
package helper
import (
- "github.com/stretchr/testify/assert"
- "reflect"
+ "fmt"
"testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
)
-type MockConnection struct {
- RestConnection `mapstructure:",squash"`
- BasicAuth `mapstructure:",squash"`
- StoryPointField string `gorm:"type:varchar(50);" json:"storyPointField"`
- RemotelinkCommitShaPattern string `gorm:"type:varchar(255);comment='golang regexp, the first group will be recognized as commit sha, ref https://github.com/google/re2/wiki/Syntax'" json:"remotelinkCommitShaPattern"`
+type MockAuth struct {
+ Username string
+ Password string `encrypt:"yes"`
}
-func (MockConnection) TableName() string {
- return "_tool_jira_connections"
+type MockConnection struct {
+ MockAuth
+ Name string `mapstructure:"name"`
+ BasicAuth string `encrypt:"true"`
+ BearToken struct {
+ AccessToken string `encrypt:"true"`
+ }
+ MockAuth2 *MockAuth
+ Age int
+ Since *time.Time
}
+/*
func TestMergeFieldsToConnection(t *testing.T) {
v := &MockConnection{
- RestConnection: RestConnection{
- BaseConnection: BaseConnection{
- Name: "1",
- },
- Endpoint: "2",
- Proxy: "3",
- RateLimit: 0,
+ Name: "1",
+ BearToken: struct {
+ AccessToken string "encrypt:\"true\""
+ }{
+ AccessToken: "2",
},
- BasicAuth: BasicAuth{
- Username: "4",
- Password: "5",
+ MockAuth: &MockAuth{
+ Username: "3",
+ Password: "4",
},
- RemotelinkCommitShaPattern: "8",
+ Age: 5,
}
data := make(map[string]interface{})
- data["Endpoint"] = "2-2"
- data["Username"] = "4-4"
- data["Password"] = "5-5"
+ data["name"] = "1a"
+ data["BasicAuth"] = map[string]interface{}{
+ "AccessToken": "2a",
+ }
+ data["Username"] = "3a"
err := mergeFieldsToConnection(v, data)
assert.Nil(t, err)
- assert.Equal(t, "4-4", v.Username)
- assert.Equal(t, "2-2", v.Endpoint)
- assert.Equal(t, "5-5", v.Password)
+ assert.Equal(t, "1a", v.Name)
+ assert.Equal(t, "2a", v.BearToken.AccessToken)
+ assert.Equal(t, "3a", v.Username)
+ assert.Equal(t, "4", v.Password)
+ assert.Equal(t, 5, v.Age)
}
+*/
-func TestDecryptAndEncrypt(t *testing.T) {
+func TestUpdateEncryptFields(t *testing.T) {
+ sinc := time.Now()
v := &MockConnection{
- RestConnection: RestConnection{
- BaseConnection: BaseConnection{
- Name: "1",
- },
- Endpoint: "2",
- Proxy: "3",
- RateLimit: 0,
+ MockAuth: MockAuth{
+ Username: "1",
+ Password: "2",
},
- BasicAuth: BasicAuth{
- Username: "4",
- Password: "5",
- },
- RemotelinkCommitShaPattern: "8",
- }
- err := EncryptConnection(v)
- assert.Nil(t, err)
-
- assert.NotEqual(t, "5", v.Password)
- err = DecryptConnection(v)
- assert.Nil(t, err)
-
- assert.Equal(t, "5", v.Password)
-
-}
-
-func TestDecryptConnection(t *testing.T) {
- v := &MockConnection{
- RestConnection: RestConnection{
- BaseConnection: BaseConnection{
- Name: "1",
- },
- Endpoint: "2",
- Proxy: "3",
- RateLimit: 0,
+ Name: "3",
+ BearToken: struct {
+ AccessToken string `encrypt:"true"`
+ }{
+ AccessToken: "4",
},
- BasicAuth: BasicAuth{
- Username: "4",
- Password: "5",
+ MockAuth2: &MockAuth{
+ Username: "5",
+ Password: "6",
},
- RemotelinkCommitShaPattern: "8",
+ Age: 7,
+ Since: &sinc,
}
- err := EncryptConnection(v)
- assert.Nil(t, err)
-
- encryptedPwd := v.Password
- err = DecryptConnection(v)
+ err := UpdateEncryptFields(v, func(in string) (string, error) {
+ return fmt.Sprintf("%s-asdf", in), nil
+ })
assert.Nil(t, err)
- assert.NotEqual(t, encryptedPwd, v.Password)
- assert.Equal(t, "5", v.Password)
+ assert.Equal(t, "1", v.Username)
+ assert.Equal(t, "2-asdf", v.Password)
+ assert.Equal(t, "3", v.Name)
+ assert.Equal(t, "4-asdf", v.BearToken.AccessToken)
+ assert.Equal(t, "5", v.MockAuth2.Username)
+ assert.Equal(t, "6-asdf", v.MockAuth2.Password)
+ assert.Equal(t, 7, v.Age)
}
-
-func TestFirstFieldNameWithTag(t *testing.T) {
- v := &MockConnection{
- RestConnection: RestConnection{
- BaseConnection: BaseConnection{
- Name: "1",
- },
- Endpoint: "2",
- Proxy: "3",
- RateLimit: 0,
- },
- BasicAuth: BasicAuth{
- Username: "4",
- Password: "5",
- },
- StoryPointField: "7",
- RemotelinkCommitShaPattern: "8",
- }
- dataVal := reflect.ValueOf(v)
- dataType := reflect.Indirect(dataVal).Type()
- fieldName := firstFieldNameWithTag(dataType, "encrypt")
- assert.Equal(t, "Password", fieldName)
-}
-
-//func TestListConnections(t *testing.T) {
-// jiraConnections := make([]*MockConnection, 0)
-// cfg := config.GetConfig()
-// dbUrl := cfg.GetString("DB_URL")
-// u, err := url.Parse(dbUrl)
-// dbUrl = fmt.Sprintf("%s@tcp(%s)%s?%s", u.User.String(), u.Host, u.Path, u.RawQuery)
-// dbConfig := &gorm.Config{
-// Logger: gormLogger.New(
-// log.Default(),
-// gormLogger.Config{
-// SlowThreshold: time.Second, // Slow SQL threshold
-// LogLevel: gormLogger.Error, // Log level
-// IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger
-// Colorful: true, // Disable color
-// },
-// ),
-// // most of our operation are in batch, this can improve performance
-// PrepareStmt: true,
-// }
-// db, err := gorm.Open(mysql.Open(dbUrl), dbConfig)
-//
-// err = ListConnections(&jiraConnections, db)
-//
-// assert.Nil(t, err)
-//}
diff --git a/plugins/helper/data_convertor_test.go b/plugins/helper/data_convertor_test.go.old
similarity index 100%
rename from plugins/helper/data_convertor_test.go
rename to plugins/helper/data_convertor_test.go.old
diff --git a/plugins/helper/default_task_context.go b/plugins/helper/default_task_context.go
index 3b14b0d0..9f4b6747 100644
--- a/plugins/helper/default_task_context.go
+++ b/plugins/helper/default_task_context.go
@@ -24,7 +24,9 @@ import (
"sync/atomic"
"time"
+ "github.com/apache/incubator-devlake/impl/dalgorm"
"github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/spf13/viper"
"gorm.io/gorm"
)
@@ -32,11 +34,45 @@ import (
// bridge to current implementation at this point
// TODO: implement another TaskContext for distributed runner/worker
+type DefaultBasicRes struct {
+ cfg *viper.Viper
+ logger core.Logger
+ db *gorm.DB
+ dal dal.Dal
+}
+
+func (c *DefaultBasicRes) GetConfig(name string) string {
+ return c.cfg.GetString(name)
+}
+
+func (c *DefaultBasicRes) GetDb() *gorm.DB {
+ return c.db
+}
+
+func (c *DefaultBasicRes) GetDal() dal.Dal {
+ return c.dal
+}
+
+func (c *DefaultBasicRes) GetLogger() core.Logger {
+ return c.logger
+}
+
+func NewDefaultBasicRes(
+ cfg *viper.Viper,
+ logger core.Logger,
+ db *gorm.DB,
+) *DefaultBasicRes {
+ return &DefaultBasicRes{
+ cfg: cfg,
+ logger: logger,
+ db: db,
+ dal: dalgorm.NewDalgorm(db),
+ }
+}
+
// shared by TasContext and SubTaskContext
type defaultExecContext struct {
- cfg *viper.Viper
- logger core.Logger
- db *gorm.DB
+ *DefaultBasicRes
ctx context.Context
name string
data interface{}
@@ -56,9 +92,11 @@ func newDefaultExecContext(
progress chan core.RunningProgress,
) *defaultExecContext {
return &defaultExecContext{
- cfg: cfg,
- logger: logger,
- db: db,
+ DefaultBasicRes: NewDefaultBasicRes(
+ cfg,
+ logger,
+ db,
+ ),
ctx: ctx,
name: name,
data: data,
@@ -70,14 +108,6 @@ func (c *defaultExecContext) GetName() string {
return c.name
}
-func (c *defaultExecContext) GetConfig(name string) string {
- return c.cfg.GetString(name)
-}
-
-func (c *defaultExecContext) GetDb() *gorm.DB {
- return c.db
-}
-
func (c *defaultExecContext) GetContext() context.Context {
return c.ctx
}
@@ -86,10 +116,6 @@ func (c *defaultExecContext) GetData() interface{} {
return c.data
}
-func (c *defaultExecContext) GetLogger() core.Logger {
- return c.logger
-}
-
func (c *defaultExecContext) SetProgress(progressType core.ProgressType, current int, total int) {
c.current = int64(current)
c.total = total
@@ -160,7 +186,7 @@ type DefaultSubTaskContext struct {
func (c *DefaultSubTaskContext) SetProgress(current int, total int) {
c.defaultExecContext.SetProgress(core.SubTaskSetProgress, current, total)
if total > -1 {
- c.logger.Info("total records: %d", c.total)
+ c.logger.Info("total jobs: %d", c.total)
}
}
diff --git a/plugins/helper/iterator.go b/plugins/helper/iterator.go
index d2cd5b24..b84b500b 100644
--- a/plugins/helper/iterator.go
+++ b/plugins/helper/iterator.go
@@ -22,6 +22,7 @@ import (
"reflect"
"time"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
"gorm.io/gorm"
)
@@ -31,12 +32,14 @@ type Iterator interface {
Close() error
}
+// Deprecated: use DalCursorIterator instead
type CursorIterator struct {
db *gorm.DB
cursor *sql.Rows
elemType reflect.Type
}
+// Deprecated: use NewDalCursorIterator instead
func NewCursorIterator(db *gorm.DB, cursor *sql.Rows, elemType reflect.Type) (*CursorIterator, error) {
return &CursorIterator{
db: db,
@@ -64,6 +67,41 @@ func (c *CursorIterator) Close() error {
var _ Iterator = (*CursorIterator)(nil)
+// DalCursorIterator
+type DalCursorIterator struct {
+ db dal.Dal
+ cursor *sql.Rows
+ elemType reflect.Type
+}
+
+func NewDalCursorIterator(db dal.Dal, cursor *sql.Rows, elemType reflect.Type) (*DalCursorIterator, error) {
+ return &DalCursorIterator{
+ db: db,
+ cursor: cursor,
+ elemType: elemType,
+ }, nil
+}
+
+func (c *DalCursorIterator) HasNext() bool {
+ return c.cursor.Next()
+}
+
+func (c *DalCursorIterator) Fetch() (interface{}, error) {
+ elem := reflect.New(c.elemType).Interface()
+ err := c.db.Fetch(c.cursor, elem)
+ if err != nil {
+ return nil, err
+ }
+ return elem, nil
+}
+
+func (c *DalCursorIterator) Close() error {
+ return c.cursor.Close()
+}
+
+var _ Iterator = (*DalCursorIterator)(nil)
+
+// DateIterator
type DateIterator struct {
startTime time.Time
endTime time.Time
diff --git a/plugins/helper/worker_scheduler.go b/plugins/helper/worker_scheduler.go
index e42bfe33..71d7574e 100644
--- a/plugins/helper/worker_scheduler.go
+++ b/plugins/helper/worker_scheduler.go
@@ -22,133 +22,155 @@ import (
"fmt"
"os"
"sync"
+ "sync/atomic"
"time"
+ "github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/utils"
"github.com/panjf2000/ants/v2"
)
+// WorkerScheduler runs asynchronous tasks in parallel with throttling support
type WorkerScheduler struct {
- waitGroup *sync.WaitGroup
+ waitGroup sync.WaitGroup
pool *ants.Pool
- subPool *ants.Pool
ticker *time.Ticker
- workerErrors *[]error
+ workerErrors []error
ctx context.Context
+ mu sync.Mutex
+ counter int32
+ logger core.Logger
}
-// NewWorkerScheduler 创建一个并行执行的调度器,控制最大运行数和每秒最大运行数量
-// NewWorkerScheduler Create a parallel scheduler to control the maximum number of runs and the maximum number of runs per second
-// 注意: task执行是无序的
-// Warning: task execution is out of order
-func NewWorkerScheduler(workerNum int, maxWork int, maxWorkDuration time.Duration, ctx context.Context, maxRetry int) (*WorkerScheduler, error) {
- var waitGroup sync.WaitGroup
- workerErrors := make([]error, 0)
- pWorkerErrors := &workerErrors
- var mux sync.Mutex
- pool, err := ants.NewPool(workerNum, ants.WithPanicHandler(func(i interface{}) {
- mux.Lock()
- defer mux.Unlock()
- workerErrors = append(*pWorkerErrors, i.(error))
- pWorkerErrors = &workerErrors
- }))
- if err != nil {
- return nil, err
+var callframeEnabled = os.Getenv("ASYNC_CF") == "true"
+
+// NewWorkerScheduler creates a WorkerScheduler
+func NewWorkerScheduler(
+ workerNum int,
+ maxWork int,
+ maxWorkDuration time.Duration,
+ ctx context.Context,
+ maxRetry int,
+ logger core.Logger,
+) (*WorkerScheduler, error) {
+ if maxWork <= 0 {
+ return nil, fmt.Errorf("maxWork less than 1")
+ }
+ if maxWorkDuration <= 0 {
+ return nil, fmt.Errorf("maxWorkDuration less than 1")
}
- subPool, err := ants.NewPool(workerNum*maxRetry, ants.WithPanicHandler(func(i interface{}) {
- mux.Lock()
- defer mux.Unlock()
- workerErrors = append(*pWorkerErrors, i.(error))
- pWorkerErrors = &workerErrors
+ s := &WorkerScheduler{
+ ctx: ctx,
+ ticker: time.NewTicker(maxWorkDuration / time.Duration(maxWork)),
+ logger: logger,
+ }
+ pool, err := ants.NewPool(workerNum, ants.WithPanicHandler(func(i interface{}) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.workerErrors = append(s.workerErrors, i.(error))
}))
if err != nil {
return nil, err
}
- var ticker *time.Ticker
- if maxWork > 0 {
- ticker = time.NewTicker(maxWorkDuration / time.Duration(maxWork))
- }
- scheduler := &WorkerScheduler{
- waitGroup: &waitGroup,
- pool: pool,
- subPool: subPool,
- ticker: ticker,
- workerErrors: pWorkerErrors,
- ctx: ctx,
- }
- return scheduler, nil
+ s.pool = pool
+ return s, nil
}
-func (s *WorkerScheduler) Submit(task func() error, pool ...*ants.Pool) error {
- select {
- case <-s.ctx.Done():
- return s.ctx.Err()
- default:
- }
- s.waitGroup.Add(1)
+// SubmitBlocking enqueues a async task to ants, the task will be executed in future when timing is right.
+// It doesn't return error because it wouldn't be any when with a Blocking semantic, returned error does nothing but
+// causing confusion, more often, people thought it is returned by the task.
+// Since it is async task, the callframes would not be available for production mode, you can export Environment
+// Varaible ASYNC_CF=true to enable callframes capturing when debugging.
+// IMPORTANT: do NOT call SubmitBlocking inside the async task, it is likely to cause a deadlock, call
+// SubmitNonBlocking instead when number of tasks is relatively small.
+func (s *WorkerScheduler) SubmitBlocking(task func() error) {
// this is expensive, enable by EnvVar
- cf := "set Environment Varaible ASYNC_CF=true to enable callframes information"
- if os.Getenv("ASYNC_CF") == "true" {
- cf = utils.GatherCallFrames()
- }
- var currentPool *ants.Pool
- if pool == nil {
- currentPool = s.pool
- } else {
- currentPool = pool[0]
+ cf := s.gatherCallFrames()
+ // to make sure task is done
+ if len(s.workerErrors) > 0 {
+ // not point to continue
+ return
}
-
- return currentPool.Submit(func() {
- var err error
+ s.waitGroup.Add(1)
+ err := s.pool.Submit(func() {
defer s.waitGroup.Done()
- defer func() {
- if err == nil {
- r := recover()
- if r != nil {
- err = fmt.Errorf("%s", r)
- }
- }
- if err != nil {
- panic(fmt.Errorf("%s\n%s", err, cf))
- }
- }()
- if pool == nil && s.ticker != nil {
- for s.subPool.Running() != 0 {
- <-s.ticker.C
- }
- }
- if s.ticker != nil {
- <-s.ticker.C
+
+ id := atomic.AddInt32(&s.counter, 1)
+ s.logger.Debug("schedulerJob >>> %d started", id)
+ defer s.logger.Debug("schedulerJob <<< %d ended", id)
+
+ if len(s.workerErrors) > 0 {
+ // not point to continue
+ return
}
+ // wait for rate limit throttling
+
+ var err error
+ defer s.gatherError(err, cf)
select {
case <-s.ctx.Done():
err = s.ctx.Err()
- default:
- err = task()
+ case <-s.ticker.C:
+ err = task() // nolint
}
})
+ // failed to submit, note that this is not task erro
+ if err != nil {
+ s.gatherError(err, cf)
+ }
+}
+
+func (s *WorkerScheduler) gatherCallFrames() string {
+ cf := "set Environment Varaible ASYNC_CF=true to enable callframes capturing"
+ if callframeEnabled {
+ cf = utils.GatherCallFrames(1)
+ }
+ return cf
+}
+
+func (s *WorkerScheduler) gatherError(err error, callframs string) {
+ if err == nil {
+ r := recover()
+ if r != nil {
+ err = fmt.Errorf("%s\n%s", r, callframs)
+ }
+ }
+ if err != nil {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.workerErrors = append(s.workerErrors, err)
+ }
}
-func (s *WorkerScheduler) WaitUntilFinish() error {
+// NextTick enqueues task in a NonBlocking manner, you should only call this method within task submitted by
+// SubmitBlocking method
+// IMPORTANT: do NOT call this method with a huge number of tasks, it is likely to eat up all available memory
+func (s *WorkerScheduler) NextTick(task func() error) {
+ cf := s.gatherCallFrames()
+ // to make sure task will be enqueued
+ s.waitGroup.Add(1)
+ go func() {
+ var err error
+ defer s.waitGroup.Done()
+ defer s.gatherError(err, cf)
+ err = task() // nolint
+ }()
+}
+
+// Wait blocks current go-routine until all workers returned
+func (s *WorkerScheduler) Wait() error {
s.waitGroup.Wait()
- if s.workerErrors != nil && len(*s.workerErrors) > 0 {
- return fmt.Errorf("%s", *s.workerErrors)
+ if len(s.workerErrors) > 0 {
+ return fmt.Errorf("%s", s.workerErrors)
}
return nil
}
+// Release resources
func (s *WorkerScheduler) Release() {
+ s.waitGroup.Wait()
s.pool.Release()
- s.subPool.Release()
if s.ticker != nil {
s.ticker.Stop()
}
}
-
-func (s *WorkerScheduler) Add(delta int) {
- s.waitGroup.Add(delta)
-}
-
-func (s *WorkerScheduler) Done() {
- s.waitGroup.Done()
-}
diff --git a/plugins/helper/worker_scheduler_test.go b/plugins/helper/worker_scheduler_test.go
index 4f65e7a0..aa1ad22c 100644
--- a/plugins/helper/worker_scheduler_test.go
+++ b/plugins/helper/worker_scheduler_test.go
@@ -22,21 +22,24 @@ import (
"testing"
"time"
+ "github.com/apache/incubator-devlake/helpers/unithelper"
"github.com/stretchr/testify/assert"
)
-func TestNewWorkerScheduler(t *testing.T) {
+func TestWorkerSchedulerQpsControl(t *testing.T) {
+ // assuming we want 2 requests per second
testChannel := make(chan int, 100)
ctx, cancel := context.WithCancel(context.Background())
- s, _ := NewWorkerScheduler(5, 2, 1*time.Second, ctx, 0)
+ s, _ := NewWorkerScheduler(5, 2, 1*time.Second, ctx, 0, unithelper.DummyLogger())
defer s.Release()
for i := 1; i <= 5; i++ {
t := i
- _ = s.Submit(func() error {
+ s.SubmitBlocking(func() error {
testChannel <- t
return nil
})
}
+ // after 1 second, 2 requerts should be issued
time.Sleep(1200 * time.Millisecond)
if len(testChannel) < 2 {
t.Fatal(`worker not start`)
@@ -44,6 +47,7 @@ func TestNewWorkerScheduler(t *testing.T) {
if len(testChannel) > 2 {
t.Fatal(`worker run too fast`)
}
+ // after 2 seconds, 4 requests should be issued
time.Sleep(time.Second)
if len(testChannel) < 4 {
t.Fatal(`worker not run after a second`)
@@ -51,53 +55,9 @@ func TestNewWorkerScheduler(t *testing.T) {
if len(testChannel) > 4 {
t.Fatal(`worker run too fast after a second`)
}
- assert.Nil(t, s.WaitUntilFinish())
- if len(*s.workerErrors) != 0 {
- t.Fatal(`worker got panic`)
- }
+ assert.Nil(t, s.Wait())
if len(testChannel) != 5 {
t.Fatal(`worker not wait until finish`)
}
cancel()
}
-
-func TestNewWorkerSchedulerWithoutSecond(t *testing.T) {
- testChannel := make(chan int, 100)
- ctx, cancel := context.WithCancel(context.Background())
- s, _ := NewWorkerScheduler(5, 0, 1*time.Second, ctx, 0)
- defer s.Release()
- for i := 1; i <= 5; i++ {
- t := i
- _ = s.Submit(func() error {
- testChannel <- t
- return nil
- })
- }
- time.Sleep(5 * time.Millisecond)
- if len(testChannel) != 5 {
- t.Fatal(`worker not finish`)
- }
- assert.Nil(t, s.WaitUntilFinish())
- if len(testChannel) != 5 {
- t.Fatal(`worker not finish`)
- }
- cancel()
-}
-
-/*
-func TestNewWorkerSchedulerWithPanic(t *testing.T) {
- testChannel := make(chan int, 100)
- ctx, cancel := context.WithCancel(context.Background())
- s,_ := NewWorkerScheduler(1, 1, ctx)
- defer s.Release()
- _ = s.Submit(func() error {
- testChannel <- 1
- return errors.New(`error message`)
- })
- s.WaitUntilFinish()
- if len(*s.workerErrors) != 1 {
- t.Fatal(`worker not got panic`)
- }
- cancel()
-}
-*/
diff --git a/plugins/jira/api/connection.go b/plugins/jira/api/connection.go
index 0c422bbe..f11c5a26 100644
--- a/plugins/jira/api/connection.go
+++ b/plugins/jira/api/connection.go
@@ -28,12 +28,9 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/jira/models"
- "github.com/go-playground/validator/v10"
"github.com/mitchellh/mapstructure"
)
-var vld = validator.New()
-
func TestConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, error) {
// decode
@@ -107,16 +104,13 @@ POST /plugins/jira/connections
}
*/
func PostConnections(input *core.ApiResourceInput) (*core.ApiResourceOutput, error) {
- // create a new connection
- jiraConnection := &models.JiraConnection{}
-
// update from request and save to database
- err := helper.CreateConnection(input.Body, jiraConnection, db)
+ connection := &models.JiraConnection{}
+ err := connectionHelper.Create(connection, input)
if err != nil {
return nil, err
}
-
- return &core.ApiResourceOutput{Body: jiraConnection, Status: http.StatusOK}, nil
+ return &core.ApiResourceOutput{Body: connection, Status: http.StatusOK}, nil
}
/*
@@ -129,43 +123,37 @@ PATCH /plugins/jira/connections/:connectionId
}
*/
func PatchConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, error) {
- jiraConnection := &models.JiraConnection{}
- err := helper.PatchConnection(input, jiraConnection, db)
+ connection := &models.JiraConnection{}
+ err := connectionHelper.Patch(connection, input)
if err != nil {
return nil, err
}
-
- return &core.ApiResourceOutput{Body: jiraConnection}, nil
+ return &core.ApiResourceOutput{Body: connection}, nil
}
/*
DELETE /plugins/jira/connections/:connectionId
*/
func DeleteConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, error) {
- // load from db
- jiraConnectionID, err := helper.GetConnectionIdByInputParam(input.Params)
- if err != nil {
- return nil, err
- }
- // cascading delete
- err = db.Where("id = ?", jiraConnectionID).Delete(&models.JiraConnection{}).Error
+ connection := &models.JiraConnection{}
+ err := connectionHelper.First(connection, input.Params)
if err != nil {
return nil, err
}
- return &core.ApiResourceOutput{Body: jiraConnectionID}, nil
+ err = connectionHelper.Delete(connection)
+ return &core.ApiResourceOutput{Body: connection}, err
}
/*
GET /plugins/jira/connections
*/
func ListConnections(input *core.ApiResourceInput) (*core.ApiResourceOutput, error) {
- jiraConnections := make([]*models.JiraConnection, 0)
-
- err := helper.ListConnections(&jiraConnections, db)
+ var connections []models.JiraConnection
+ err := connectionHelper.List(&connections)
if err != nil {
return nil, err
}
- return &core.ApiResourceOutput{Body: jiraConnections, Status: http.StatusOK}, nil
+ return &core.ApiResourceOutput{Body: connections, Status: http.StatusOK}, nil
}
/*
@@ -180,11 +168,7 @@ GET /plugins/jira/connections/:connectionId
}
*/
func GetConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, error) {
- jiraConnection := &models.JiraConnection{}
- err := helper.GetConnection(input.Params, jiraConnection, db)
- if err != nil {
- return nil, err
- }
-
- return &core.ApiResourceOutput{Body: jiraConnection}, nil
+ connection := &models.JiraConnection{}
+ err := connectionHelper.First(connection, input.Params)
+ return &core.ApiResourceOutput{Body: connection}, err
}
diff --git a/plugins/jira/api/init.go b/plugins/jira/api/init.go
index acaa495e..6774e148 100644
--- a/plugins/jira/api/init.go
+++ b/plugins/jira/api/init.go
@@ -19,12 +19,21 @@ package api
import (
"github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/helper"
+ "github.com/go-playground/validator/v10"
"github.com/spf13/viper"
"gorm.io/gorm"
)
-var db *gorm.DB
+var vld *validator.Validate
+var connectionHelper *helper.ConnectionApiHelper
+var basicRes core.BasicRes
func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- db = database
+ basicRes = helper.NewDefaultBasicRes(config, logger, database)
+ vld = validator.New()
+ connectionHelper = helper.NewConnectionHelper(
+ basicRes,
+ vld,
+ )
}
diff --git a/plugins/jira/api/proxy.go b/plugins/jira/api/proxy.go
index 398f4f1f..f477ff26 100644
--- a/plugins/jira/api/proxy.go
+++ b/plugins/jira/api/proxy.go
@@ -20,7 +20,6 @@ package api
import (
"encoding/json"
"fmt"
- "github.com/apache/incubator-devlake/utils"
"io/ioutil"
"time"
@@ -34,23 +33,18 @@ const (
)
func Proxy(input *core.ApiResourceInput) (*core.ApiResourceOutput, error) {
- jiraConnection := &models.JiraConnection{}
- err := helper.GetConnection(input.Params, jiraConnection, db)
- if err != nil {
- return nil, err
- }
- basicAuth := utils.GetEncodedToken(jiraConnection.Username, jiraConnection.Password)
-
+ connection := &models.JiraConnection{}
+ err := connectionHelper.First(connection, input.Params)
if err != nil {
return nil, err
}
apiClient, err := helper.NewApiClient(
- jiraConnection.Endpoint,
+ connection.Endpoint,
map[string]string{
- "Authorization": fmt.Sprintf("Basic %v", basicAuth),
+ "Authorization": fmt.Sprintf("Basic %v", connection.GetEncodedToken()),
},
30*time.Second,
- jiraConnection.Proxy,
+ connection.Proxy,
nil,
)
if err != nil {
diff --git a/plugins/jira/jira.go b/plugins/jira/jira.go
index 1e0d23fb..0dc791ff 100644
--- a/plugins/jira/jira.go
+++ b/plugins/jira/jira.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/incubator-devlake/migration"
"github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/jira/api"
"github.com/apache/incubator-devlake/plugins/jira/models"
"github.com/apache/incubator-devlake/plugins/jira/models/migrationscripts"
@@ -99,7 +100,8 @@ func (plugin Jira) SubTaskMetas() []core.SubTaskMeta {
func (plugin Jira) PrepareTaskData(taskCtx core.TaskContext, options map[string]interface{}) (interface{}, error) {
var op tasks.JiraOptions
var err error
- db := taskCtx.GetDb()
+ logger := taskCtx.GetLogger()
+ logger.Debug("%v", options)
err = mapstructure.Decode(options, &op)
if err != nil {
return nil, err
@@ -108,7 +110,14 @@ func (plugin Jira) PrepareTaskData(taskCtx core.TaskContext, options map[string]
return nil, fmt.Errorf("connectionId is invalid")
}
connection := &models.JiraConnection{}
- err = db.First(connection, op.ConnectionId).Error
+ connectionHelper := helper.NewConnectionHelper(
+ taskCtx,
+ nil,
+ )
+ if err != nil {
+ return nil, err
+ }
+ err = connectionHelper.FirstById(connection, op.ConnectionId)
if err != nil {
return nil, err
}
@@ -136,6 +145,7 @@ func (plugin Jira) PrepareTaskData(taskCtx core.TaskContext, options map[string]
}
if !since.IsZero() {
taskData.Since = &since
+ logger.Debug("collect data updated since %s", since)
}
return taskData, nil
}
@@ -188,14 +198,16 @@ var PluginEntry Jira //nolint
// standalone mode for debugging
func main() {
cmd := &cobra.Command{Use: "jira"}
- connectionId := cmd.Flags().Uint64P("connection", "s", 0, "jira connection id")
+ connectionId := cmd.Flags().Uint64P("connection", "c", 0, "jira connection id")
boardId := cmd.Flags().Uint64P("board", "b", 0, "jira board id")
_ = cmd.MarkFlagRequired("connection")
_ = cmd.MarkFlagRequired("board")
+ since := cmd.Flags().StringP("since", "s", "", "collect data that are updated after specified time, ie 2006-05-06T07:08:09Z")
cmd.Run = func(c *cobra.Command, args []string) {
runner.DirectRun(c, args, PluginEntry, map[string]interface{}{
"connectionId": *connectionId,
"boardId": *boardId,
+ "since": *since,
})
}
runner.RunCmd(cmd)
diff --git a/plugins/jira/models/migrationscripts/updateSchemas20220601.go b/plugins/jira/models/migrationscripts/updateSchemas20220601.go
index 5043cfca..ba9e9554 100644
--- a/plugins/jira/models/migrationscripts/updateSchemas20220601.go
+++ b/plugins/jira/models/migrationscripts/updateSchemas20220601.go
@@ -20,10 +20,11 @@ package migrationscripts
import (
"context"
"encoding/base64"
+ "strings"
+
+ "github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
"gorm.io/gorm"
- "gorm.io/gorm/clause"
- "strings"
)
type JiraConnection20220601 struct {
@@ -38,9 +39,20 @@ func (JiraConnection20220601) TableName() string {
return "_tool_jira_connections"
}
-type UpdateSchemas20220601 struct{}
+type UpdateSchemas20220601 struct {
+ config core.ConfigGetter
+ logger core.Logger
+}
+
+func (u *UpdateSchemas20220601) SetConfigGetter(getter core.ConfigGetter) {
+ u.config = getter
+}
-func (*UpdateSchemas20220601) Up(ctx context.Context, db *gorm.DB) error {
+func (u *UpdateSchemas20220601) SetLogger(logger core.Logger) {
+ u.logger = logger
+}
+
+func (u *UpdateSchemas20220601) Up(ctx context.Context, db *gorm.DB) error {
var err error
if !db.Migrator().HasColumn(&JiraConnection20220505{}, "password") {
err = db.Migrator().AddColumn(&JiraConnection20220601{}, "password")
@@ -62,39 +74,42 @@ func (*UpdateSchemas20220601) Up(ctx context.Context, db *gorm.DB) error {
if err != nil {
return err
}
- for i := range connections {
- err = helper.DecryptConnection(connections[i])
+ encKey := u.config.GetString(core.EncodeKeyEnvStr)
+ for _, connection := range connections {
+ basicAuthEncoded, err := core.Decrypt(encKey, connection.BasicAuthEncoded)
if err != nil {
- return err
+ u.logger.Warn("failed to decrypt basicAuth for connection %d", connection.ID)
+ continue
}
- decodedStr, err := base64.StdEncoding.DecodeString(connections[i].BasicAuthEncoded)
+ basicAuth, err := base64.StdEncoding.DecodeString(basicAuthEncoded)
if err != nil {
return err
}
- strList := strings.Split(string(decodedStr), ":")
+ strList := strings.Split(string(basicAuth), ":")
if len(strList) > 1 {
newConnection := JiraConnection20220601{
RestConnection: helper.RestConnection{
BaseConnection: helper.BaseConnection{
- Name: connections[i].Name,
- Model: connections[i].Model,
+ Name: connection.Name,
+ Model: connection.Model,
},
- Endpoint: connections[i].Endpoint,
- Proxy: connections[i].Proxy,
- RateLimit: connections[i].RateLimit,
+ Endpoint: connection.Endpoint,
+ Proxy: connection.Proxy,
+ RateLimit: connection.RateLimit,
},
BasicAuth: helper.BasicAuth{
Username: strList[0],
Password: strList[1],
},
- EpicKeyField: connections[i].EpicKeyField,
- StoryPointField: connections[i].StoryPointField,
- RemotelinkCommitShaPattern: connections[i].RemotelinkCommitShaPattern,
+ EpicKeyField: connection.EpicKeyField,
+ StoryPointField: connection.StoryPointField,
+ RemotelinkCommitShaPattern: connection.RemotelinkCommitShaPattern,
+ }
+ err := db.Save(newConnection).Error
+ if err != nil {
+ return err
}
- db.Clauses(clause.OnConflict{UpdateAll: true}).Create(newConnection)
-
}
- connections[i].Name = strList[0]
}
err = db.Migrator().DropColumn(&JiraConnection20220505{}, "basic_auth_encoded")
if err != nil {
diff --git a/plugins/jira/tasks/api_client.go b/plugins/jira/tasks/api_client.go
index b44b024b..b6eddd8e 100644
--- a/plugins/jira/tasks/api_client.go
+++ b/plugins/jira/tasks/api_client.go
@@ -27,12 +27,6 @@ import (
)
func NewJiraApiClient(taskCtx core.TaskContext, connection *models.JiraConnection) (*helper.ApiAsyncClient, error) {
- // decrypt connection first
- err := helper.DecryptConnection(connection)
- if err != nil {
- return nil, fmt.Errorf("Failed to decrypt Auth AccessToken: %w", err)
- }
-
// create synchronize api client so we can calculate api rate limit dynamically
headers := map[string]string{
"Authorization": fmt.Sprintf("Basic %v", connection.GetEncodedToken()),
diff --git a/plugins/jira/tasks/changelog_collector.go b/plugins/jira/tasks/changelog_collector.go
index bd3f37bf..b4150735 100644
--- a/plugins/jira/tasks/changelog_collector.go
+++ b/plugins/jira/tasks/changelog_collector.go
@@ -25,6 +25,7 @@ import (
"reflect"
"github.com/apache/incubator-devlake/plugins/core"
+ . "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/jira/models"
"github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
@@ -39,28 +40,35 @@ func CollectChangelogs(taskCtx core.SubTaskContext) error {
if data.JiraServerInfo.DeploymentType == models.DeploymentServer {
return nil
}
- db := taskCtx.GetDb()
+ db := taskCtx.GetDal()
// figure out the time range
since := data.Since
// filter out issue_ids that needed collection
- tx := db.Table("_tool_jira_board_issues bi").
- Select("bi.issue_id, NOW() AS update_time").
- Joins("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)").
- Where("bi.connection_id = ? AND bi.board_id = ? AND (i.changelog_updated IS NULL OR i.changelog_updated < i.updated)", data.Options.ConnectionId, data.Options.BoardId)
-
+ clauses := []interface{}{
+ Select("bi.issue_id, NOW() AS update_time"),
+ From("_tool_jira_board_issues bi"),
+ Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
+ Where(
+ `bi.connection_id = ?
+ AND bi.board_id = ?
+ AND (i.changelog_updated IS NULL OR i.changelog_updated < i.updated)`,
+ data.Options.ConnectionId,
+ data.Options.BoardId,
+ ),
+ }
// apply time range if any
if since != nil {
- tx = tx.Where("i.updated > ?", *since)
+ clauses = append(clauses, Where("i.updated > ?", *since))
}
// construct the input iterator
- cursor, err := tx.Rows()
+ cursor, err := db.Cursor(clauses...)
if err != nil {
return err
}
// smaller struct can reduce memory footprint, we should try to avoid using big struct
- iterator, err := helper.NewCursorIterator(db, cursor, reflect.TypeOf(apiv2models.Input{}))
+ iterator, err := helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(apiv2models.Input{}))
if err != nil {
return err
}
@@ -76,7 +84,7 @@ func CollectChangelogs(taskCtx core.SubTaskContext) error {
Table: RAW_CHANGELOG_TABLE,
},
ApiClient: data.ApiClient,
- PageSize: 50,
+ PageSize: 100,
Incremental: true,
GetTotalPages: GetTotalPagesFromResponse,
Input: iterator,
diff --git a/plugins/jira/tasks/remotelink_collector.go b/plugins/jira/tasks/remotelink_collector.go
index a38d8ba1..3c6ac045 100644
--- a/plugins/jira/tasks/remotelink_collector.go
+++ b/plugins/jira/tasks/remotelink_collector.go
@@ -23,8 +23,8 @@ import (
"reflect"
"github.com/apache/incubator-devlake/plugins/core"
+ . "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
- "github.com/apache/incubator-devlake/plugins/jira/models"
"github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
)
@@ -34,10 +34,9 @@ var _ core.SubTaskEntryPoint = CollectRemotelinks
func CollectRemotelinks(taskCtx core.SubTaskContext) error {
data := taskCtx.GetData().(*JiraTaskData)
- db := taskCtx.GetDb()
+ db := taskCtx.GetDal()
logger := taskCtx.GetLogger()
logger.Info("collect remotelink")
- jiraIssue := &models.JiraIssue{}
/*
`CollectIssues` will take into account of `since` option and set the `updated` field for issues that have
@@ -45,29 +44,29 @@ func CollectRemotelinks(taskCtx core.SubTaskContext) error {
`remotelink_updated` field. If `remotelink_updated` is older, then we'll collect remotelinks for this issue and
set its `remotelink_updated` to `updated` at the end.
*/
- cursor, err := db.Model(jiraIssue).
- Select("_tool_jira_issues.issue_id", "NOW() AS update_time").
- Joins(`LEFT JOIN _tool_jira_board_issues ON (
- _tool_jira_board_issues.connection_id = _tool_jira_issues.connection_id AND
- _tool_jira_board_issues.issue_id = _tool_jira_issues.issue_id
- )`).
+ cursor, err := db.Cursor(
+ Select("i.issue_id, NOW() AS update_time"),
+ From("_tool_jira_remotelinks"),
+ Join(`LEFT JOIN bi ON (
+ bi.connection_id = i.connection_id AND
+ bi.issue_id = i.issue_id
+ )`),
Where(`
- _tool_jira_board_issues.connection_id = ? AND
- _tool_jira_board_issues.board_id = ? AND
- (_tool_jira_issues.remotelink_updated IS NULL OR _tool_jira_issues.remotelink_updated < _tool_jira_issues.updated)
+ bi.connection_id = ? AND
+ bi.board_id = ? AND
+ (i.remotelink_updated IS NULL OR i.remotelink_updated < i.updated)
`,
data.Options.ConnectionId,
data.Options.BoardId,
- ).
- Rows()
+ ),
+ )
if err != nil {
logger.Error("collect remotelink error:%v", err)
return err
}
- defer cursor.Close()
// smaller struct can reduce memory footprint, we should try to avoid using big struct
- iterator, err := helper.NewCursorIterator(db, cursor, reflect.TypeOf(apiv2models.Input{}))
+ iterator, err := helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(apiv2models.Input{}))
if err != nil {
return err
}
@@ -84,7 +83,6 @@ func CollectRemotelinks(taskCtx core.SubTaskContext) error {
ApiClient: data.ApiClient,
Input: iterator,
UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/remotelink",
- Concurrency: 10,
ResponseParser: func(res *http.Response) ([]json.RawMessage, error) {
if res.StatusCode == http.StatusNotFound {
return nil, nil
diff --git a/plugins/jira/tasks/user_collector.go b/plugins/jira/tasks/user_collector.go
index 051a62b9..6c9e62c2 100644
--- a/plugins/jira/tasks/user_collector.go
+++ b/plugins/jira/tasks/user_collector.go
@@ -24,6 +24,7 @@ import (
"reflect"
"github.com/apache/incubator-devlake/plugins/core"
+ . "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/jira/models"
)
@@ -32,14 +33,18 @@ const RAW_USERS_TABLE = "jira_api_users"
func CollectUsers(taskCtx core.SubTaskContext) error {
data := taskCtx.GetData().(*JiraTaskData)
- db := taskCtx.GetDb()
+ db := taskCtx.GetDal()
logger := taskCtx.GetLogger()
logger.Info("collect user")
- cursor, err := db.Model(&models.JiraUser{}).Where("connection_id = ?", data.Options.ConnectionId).Rows()
+ cursor, err := db.Cursor(
+ Select("account_id"),
+ From("_tool_jira_users"),
+ Where("connection_id = ?", data.Options.ConnectionId),
+ )
if err != nil {
return err
}
- iterator, err := helper.NewCursorIterator(db, cursor, reflect.TypeOf(models.JiraUser{}))
+ iterator, err := helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.JiraUser{}))
if err != nil {
return err
}
@@ -65,7 +70,6 @@ func CollectUsers(taskCtx core.SubTaskContext) error {
query.Set(queryKey, user.AccountId)
return query, nil
},
- Concurrency: 10,
ResponseParser: func(res *http.Response) ([]json.RawMessage, error) {
var result json.RawMessage
err := helper.UnmarshalResponse(res, &result)
diff --git a/plugins/jira/tasks/worklog_collector.go b/plugins/jira/tasks/worklog_collector.go
index 70e873d7..2ec8fea2 100644
--- a/plugins/jira/tasks/worklog_collector.go
+++ b/plugins/jira/tasks/worklog_collector.go
@@ -23,6 +23,7 @@ import (
"reflect"
"github.com/apache/incubator-devlake/plugins/core"
+ . "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/plugins/jira/tasks/apiv2models"
)
@@ -30,26 +31,36 @@ import (
const RAW_WORKLOGS_TABLE = "jira_api_worklogs"
func CollectWorklogs(taskCtx core.SubTaskContext) error {
- db := taskCtx.GetDb()
+ db := taskCtx.GetDal()
data := taskCtx.GetData().(*JiraTaskData)
since := data.Since
logger := taskCtx.GetLogger()
- connectionId := data.Connection.ID
- boardId := data.Options.BoardId
- tx := db.Table("_tool_jira_board_issues bi").
- Select("bi.issue_id, NOW() AS update_time").
- Joins("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)").
- Where("bi.connection_id = ? AND bi.board_id = ? AND (i.worklog_updated IS NULL OR i.worklog_updated < i.updated)", connectionId, boardId)
+ // filter out issue_ids that needed collection
+ clauses := []interface{}{
+ Select("bi.issue_id, NOW() AS update_time"),
+ From("_tool_jira_board_issues bi"),
+ Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id = i.connection_id AND bi.issue_id = i.issue_id)"),
+ Where(
+ `bi.connection_id = ?
+ AND bi.board_id = ?
+ AND (i.worklog_updated IS NULL OR i.worklog_updated < i.updated)`,
+ data.Options.ConnectionId,
+ data.Options.BoardId,
+ ),
+ }
+ // apply time range if any
if since != nil {
- tx = tx.Where("i.updated > ?", since)
+ clauses = append(clauses, Where("i.updated > ?", *since))
}
- cursor, err := tx.Rows()
+
+ // construct the input iterator
+ cursor, err := db.Cursor(clauses...)
if err != nil {
return err
}
- iterator, err := helper.NewCursorIterator(db, cursor, reflect.TypeOf(apiv2models.Input{}))
+ iterator, err := helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(apiv2models.Input{}))
if err != nil {
return err
}
diff --git a/runner/directrun.go b/runner/directrun.go
index 86d4609c..8c0802ef 100644
--- a/runner/directrun.go
+++ b/runner/directrun.go
@@ -68,7 +68,7 @@ func DirectRun(cmd *cobra.Command, args []string, pluginTask core.PluginTask, op
// collect migration and run
migration.Init(db)
if migratable, ok := pluginTask.(core.Migratable); ok {
- migration.Register(migratable.MigrationScripts(), cmd.Use)
+ RegisterMigrationScripts(migratable.MigrationScripts(), cmd.Use, cfg, log)
}
err = migration.Execute(context.Background())
if err != nil {
diff --git a/runner/loader.go b/runner/loader.go
index c801fc12..e5da95ca 100644
--- a/runner/loader.go
+++ b/runner/loader.go
@@ -19,7 +19,6 @@ package runner
import (
"fmt"
- "github.com/apache/incubator-devlake/migration"
"io/fs"
"path/filepath"
"plugin"
@@ -58,7 +57,7 @@ func LoadPlugins(pluginsDir string, config *viper.Viper, logger core.Logger, db
}
}
if migratable, ok := symPluginEntry.(core.Migratable); ok {
- migration.Register(migratable.MigrationScripts(), pluginName)
+ RegisterMigrationScripts(migratable.MigrationScripts(), pluginName, config, logger)
}
err = core.RegisterPlugin(pluginName, pluginMeta)
if err != nil {
diff --git a/plugins/jira/api/init.go b/runner/migration.go
similarity index 66%
copy from plugins/jira/api/init.go
copy to runner/migration.go
index acaa495e..d189f57a 100644
--- a/plugins/jira/api/init.go
+++ b/runner/migration.go
@@ -15,16 +15,21 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package api
+package runner
import (
+ "github.com/apache/incubator-devlake/migration"
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/spf13/viper"
- "gorm.io/gorm"
)
-var db *gorm.DB
-
-func Init(config *viper.Viper, logger core.Logger, database *gorm.DB) {
- db = database
+func RegisterMigrationScripts(scripts []migration.Script, comment string, config core.ConfigGetter, logger core.Logger) {
+ for _, script := range scripts {
+ if s, ok := script.(core.InjectConfigGetter); ok {
+ s.SetConfigGetter(config)
+ }
+ if s, ok := script.(core.InjectLogger); ok {
+ s.SetLogger(logger)
+ }
+ }
+ migration.Register(scripts, comment)
}
diff --git a/runner/run_task.go b/runner/run_task.go
index 568d47b6..24c8fd24 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -56,7 +56,7 @@ func RunTask(
// make sure task status always correct even if it panicked
defer func() {
if r := recover(); r != nil {
- err = fmt.Errorf("run task failed with panic (%s): %v", utils.GatherCallFrames(), r)
+ err = fmt.Errorf("run task failed with panic (%s): %v", utils.GatherCallFrames(0), r)
}
finishedAt := time.Now()
spentSeconds := finishedAt.Unix() - beganAt.Unix()
diff --git a/services/init.go b/services/init.go
index 995d1aec..4843e887 100644
--- a/services/init.go
+++ b/services/init.go
@@ -19,8 +19,11 @@ package services
import (
"context"
+
"github.com/apache/incubator-devlake/models/migrationscripts"
+ "time"
+
"github.com/apache/incubator-devlake/config"
"github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/migration"
@@ -28,7 +31,6 @@ import (
"github.com/robfig/cron/v3"
"github.com/spf13/viper"
"gorm.io/gorm"
- "time"
)
var cfg *viper.Viper
@@ -45,7 +47,7 @@ func init() {
panic(err)
}
migration.Init(db)
- migrationscripts.RegisterAll()
+ runner.RegisterMigrationScripts(migrationscripts.All(), "Framework", cfg, logger.Global)
// load plugins
err = runner.LoadPlugins(
cfg.GetString("PLUGIN_DIR"),
diff --git a/utils/callframes.go b/utils/callframes.go
index 44cda2d6..611ad346 100644
--- a/utils/callframes.go
+++ b/utils/callframes.go
@@ -23,12 +23,12 @@ import (
"strings"
)
-func GatherCallFrames() string {
+func GatherCallFrames(delta int) string {
var name, file string
var line int
var pc [16]uintptr
- n := runtime.Callers(3, pc[:])
+ n := runtime.Callers(3+delta, pc[:])
for _, pc := range pc[:n] {
fn := runtime.FuncForPC(pc)
if fn == nil {