You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ka...@apache.org on 2023/05/19 02:10:53 UTC
[incubator-devlake] 01/01: feat: Rework scope_helper for compatiblity with Python and adapt DeleteScope to it
This is an automated email from the ASF dual-hosted git repository.
ka94 pushed a commit to branch issues/4762
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 26f5a7d2fbba57d037a57641620e788d1ee172fe
Author: Keon Amini <ke...@merico.dev>
AuthorDate: Wed May 10 21:36:21 2023 -0500
feat: Rework scope_helper for compatiblity with Python and adapt DeleteScope to it
---
backend/core/dal/dal.go | 4 +-
backend/core/models/blueprint.go | 2 +-
backend/core/models/dynamic_tabler.go | 15 +-
.../pluginhelper/api/reflection_helper.go} | 45 +-
.../helpers/pluginhelper/api/scope_db_helper.go | 115 ++++
.../{scope_helper.go => scope_generic_helper.go} | 396 +++++-------
backend/helpers/pluginhelper/api/scope_helper.go | 669 ++-------------------
.../helpers/pluginhelper/api/scope_helper_test.go | 5 +-
.../pluginhelper/services/blueprint_helper.go | 2 +-
backend/impls/dalgorm/dalgorm.go | 70 ++-
.../dalgorm/db_mapper.go} | 35 +-
backend/plugins/pagerduty/api/init.go | 11 +-
backend/plugins/pagerduty/api/scope.go | 6 +-
backend/python/pydevlake/pydevlake/message.py | 1 +
backend/python/pydevlake/pydevlake/plugin.py | 9 +-
backend/python/pydevlake/pydevlake/stream.py | 6 +-
backend/python/pydevlake/pydevlake/subtasks.py | 2 +-
backend/python/pydevlake/tests/stream_test.py | 4 +-
backend/resources/swagger/open_api_spec.json.tmpl | 45 ++
.../server/services/remote/models/conversion.go | 30 +
backend/server/services/remote/models/models.go | 20 +-
.../server/services/remote/models/plugin_remote.go | 1 +
.../server/services/remote/plugin/default_api.go | 24 +-
backend/server/services/remote/plugin/init.go | 1 +
.../services/remote/plugin/plugin_extensions.go | 3 +-
.../server/services/remote/plugin/plugin_impl.go | 8 +
backend/server/services/remote/plugin/scope_api.go | 240 ++------
.../services/remote/plugin/scope_db_helper.go | 147 +++++
backend/test/e2e/remote/python_plugin_test.go | 8 +
29 files changed, 799 insertions(+), 1125 deletions(-)
diff --git a/backend/core/dal/dal.go b/backend/core/dal/dal.go
index e1b8478bf..78a07a9fe 100644
--- a/backend/core/dal/dal.go
+++ b/backend/core/dal/dal.go
@@ -174,9 +174,9 @@ type Dal interface {
// Begin create a new transaction
Begin() Transaction
// IsErrorNotFound returns true if error is record-not-found
- IsErrorNotFound(err errors.Error) bool
+ IsErrorNotFound(err error) bool
// IsDuplicationError returns true if error is duplicate-error
- IsDuplicationError(err errors.Error) bool
+ IsDuplicationError(err error) bool
// RawCursor (Deprecated) executes raw sql query and returns a database cursor.
RawCursor(query string, params ...interface{}) (*sql.Rows, errors.Error)
}
diff --git a/backend/core/models/blueprint.go b/backend/core/models/blueprint.go
index f09b39d29..e8b63c44e 100644
--- a/backend/core/models/blueprint.go
+++ b/backend/core/models/blueprint.go
@@ -132,7 +132,7 @@ func (bp *Blueprint) UpdateSettings(settings *BlueprintSettings) errors.Error {
return nil
}
-// GetScopes Gets all the scopes across all the connections for this blueprint
+// GetScopes Gets all the scopes for a given connection for this blueprint. Returns an empty slice if none found.
func (bp *Blueprint) GetScopes(connectionId uint64) ([]*plugin.BlueprintScopeV200, errors.Error) {
conns, err := bp.GetConnections()
if err != nil {
diff --git a/backend/core/models/dynamic_tabler.go b/backend/core/models/dynamic_tabler.go
index ab58b77ef..2efd836ac 100644
--- a/backend/core/models/dynamic_tabler.go
+++ b/backend/core/models/dynamic_tabler.go
@@ -41,10 +41,14 @@ func NewDynamicTabler(tableName string, objType reflect.Type) *DynamicTabler {
}
}
+func (d *DynamicTabler) NewValue() any {
+ return reflect.New(d.objType).Interface()
+}
+
func (d *DynamicTabler) New() *DynamicTabler {
return &DynamicTabler{
objType: d.objType,
- wrapped: reflect.New(d.objType).Interface(),
+ wrapped: d.NewValue(),
table: d.table,
}
}
@@ -82,6 +86,15 @@ func (d *DynamicTabler) Unwrap() any {
return d.wrapped
}
+func (d *DynamicTabler) UnwrapSlice() []any {
+ var arr []any
+ slice := reflect.ValueOf(d.wrapped).Elem()
+ for i := 0; i < slice.Len(); i++ {
+ arr = append(arr, slice.Index(i).Interface())
+ }
+ return arr
+}
+
func (d *DynamicTabler) TableName() string {
return d.table
}
diff --git a/backend/server/services/remote/models/plugin_remote.go b/backend/helpers/pluginhelper/api/reflection_helper.go
similarity index 54%
copy from backend/server/services/remote/models/plugin_remote.go
copy to backend/helpers/pluginhelper/api/reflection_helper.go
index a8984ea56..e413372f2 100644
--- a/backend/server/services/remote/models/plugin_remote.go
+++ b/backend/helpers/pluginhelper/api/reflection_helper.go
@@ -15,18 +15,35 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package models
-
-import (
- "github.com/apache/incubator-devlake/core/errors"
- "github.com/apache/incubator-devlake/core/plugin"
-)
-
-// RemotePlugin API supported by plugins running in different/remote processes
-type RemotePlugin interface {
- plugin.PluginApi
- plugin.PluginTask
- plugin.PluginMeta
- plugin.PluginOpenApiSpec
- RunMigrations(forceMigrate bool) errors.Error
+package api
+
+import "reflect"
+
+func reflectField(obj any, fieldName string) reflect.Value {
+ return reflectValue(obj).FieldByName(fieldName)
+}
+
+func hasField(obj any, fieldName string) bool {
+ _, ok := reflectType(obj).FieldByName(fieldName)
+ return ok
+}
+
+func reflectValue(obj any) reflect.Value {
+ val := reflect.ValueOf(obj)
+ kind := val.Kind()
+ for kind == reflect.Ptr || kind == reflect.Interface {
+ val = val.Elem()
+ kind = val.Kind()
+ }
+ return val
+}
+
+func reflectType(obj any) reflect.Type {
+ typ := reflect.TypeOf(obj)
+ kind := typ.Kind()
+ for kind == reflect.Ptr {
+ typ = typ.Elem()
+ kind = typ.Kind()
+ }
+ return typ
}
diff --git a/backend/helpers/pluginhelper/api/scope_db_helper.go b/backend/helpers/pluginhelper/api/scope_db_helper.go
new file mode 100644
index 000000000..6e6bbe2ab
--- /dev/null
+++ b/backend/helpers/pluginhelper/api/scope_db_helper.go
@@ -0,0 +1,115 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package api
+
+import (
+ "fmt"
+ "github.com/apache/incubator-devlake/core/context"
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/plugin"
+)
+
+type ScopeDatabaseHelper[Conn any, Scope any, Tr any] interface {
+ VerifyConnection(connectionId uint64) errors.Error
+ SaveScope(scopes []*Scope) errors.Error
+ UpdateScope(connectionId uint64, scopeId string, scope *Scope) errors.Error
+ GetScope(connectionId uint64, scopeId string) (Scope, errors.Error)
+ ListScopes(input *plugin.ApiResourceInput, connectionId uint64) ([]*Scope, errors.Error)
+ DeleteScope(connectionId uint64, scopeId string) errors.Error
+ GetTransformationRule(ruleId uint64) (Tr, errors.Error)
+ ListTransformationRules(ruleIds []uint64) ([]*Tr, errors.Error)
+}
+
+type ScopeDatabaseHelperImpl[Conn any, Scope any, Tr any] struct {
+ ScopeDatabaseHelper[Conn, Scope, Tr]
+ db dal.Dal
+ connHelper *ConnectionApiHelper
+ params *ReflectionParameters
+}
+
+func NewScopeDatabaseHelperImpl[Conn any, Scope any, Tr any](
+ basicRes context.BasicRes, connHelper *ConnectionApiHelper, params *ReflectionParameters) *ScopeDatabaseHelperImpl[Conn, Scope, Tr] {
+ return &ScopeDatabaseHelperImpl[Conn, Scope, Tr]{
+ db: basicRes.GetDal(),
+ connHelper: connHelper,
+ params: params,
+ }
+}
+
+func (s *ScopeDatabaseHelperImpl[Conn, Scope, Tr]) VerifyConnection(connectionId uint64) errors.Error {
+ var conn Conn
+ err := s.connHelper.FirstById(&conn, connectionId)
+ if err != nil {
+ if s.db.IsErrorNotFound(err) {
+ return errors.BadInput.New("Invalid Connection Id")
+ }
+ return err
+ }
+ return nil
+}
+
+func (s *ScopeDatabaseHelperImpl[Conn, Scope, Tr]) SaveScope(scopes []*Scope) errors.Error {
+ err := s.db.CreateOrUpdate(&scopes)
+ if err != nil {
+ if s.db.IsDuplicationError(err) {
+ return errors.BadInput.New("the scope already exists")
+ }
+ return err
+ }
+ return nil
+}
+
+func (s *ScopeDatabaseHelperImpl[Conn, Scope, Tr]) UpdateScope(connectionId uint64, scopeId string, scope *Scope) errors.Error {
+ return s.db.Update(&scope)
+}
+
+func (s *ScopeDatabaseHelperImpl[Conn, Scope, Tr]) GetScope(connectionId uint64, scopeId string) (Scope, errors.Error) {
+ query := dal.Where(fmt.Sprintf("connection_id = ? AND %s = ?", s.params.ScopeIdColumnName), connectionId, scopeId)
+ var scope Scope
+ err := s.db.First(&scope, query)
+ return scope, err
+}
+
+func (s *ScopeDatabaseHelperImpl[Conn, Scope, Tr]) ListScopes(input *plugin.ApiResourceInput, connectionId uint64) ([]*Scope, errors.Error) {
+ limit, offset := GetLimitOffset(input.Query, "pageSize", "page")
+ var scopes []*Scope
+ err := s.db.All(&scopes, dal.Where("connection_id = ?", connectionId), dal.Limit(limit), dal.Offset(offset))
+ return scopes, err
+}
+
+func (s *ScopeDatabaseHelperImpl[Conn, Scope, Tr]) DeleteScope(connectionId uint64, scopeId string) errors.Error {
+ scope := new(Scope)
+ err := s.db.Delete(&scope, dal.Where(fmt.Sprintf("connection_id = ? AND %s = ?", s.params.ScopeIdColumnName),
+ connectionId, scopeId))
+ return err
+}
+
+func (s *ScopeDatabaseHelperImpl[Conn, Scope, Tr]) GetTransformationRule(ruleId uint64) (Tr, errors.Error) {
+ var rule Tr
+ err := s.db.First(&rule, dal.Where("id = ?", ruleId))
+ return rule, err
+}
+
+func (s *ScopeDatabaseHelperImpl[Conn, Scope, Tr]) ListTransformationRules(ruleIds []uint64) ([]*Tr, errors.Error) {
+ var rules []*Tr
+ err := s.db.All(&rules, dal.Where("id IN (?)", ruleIds))
+ return rules, err
+}
+
+var _ ScopeDatabaseHelper[any, any, any] = &ScopeDatabaseHelperImpl[any, any, any]{}
diff --git a/backend/helpers/pluginhelper/api/scope_helper.go b/backend/helpers/pluginhelper/api/scope_generic_helper.go
similarity index 59%
copy from backend/helpers/pluginhelper/api/scope_helper.go
copy to backend/helpers/pluginhelper/api/scope_generic_helper.go
index a2edbcf7c..71b37fa04 100644
--- a/backend/helpers/pluginhelper/api/scope_helper.go
+++ b/backend/helpers/pluginhelper/api/scope_generic_helper.go
@@ -20,25 +20,21 @@ package api
import (
"encoding/json"
"fmt"
- "github.com/apache/incubator-devlake/core/models"
- "github.com/apache/incubator-devlake/core/models/domainlayer/domaininfo"
- serviceHelper "github.com/apache/incubator-devlake/helpers/pluginhelper/services"
- "net/http"
- "strconv"
- "strings"
- "sync"
- "time"
-
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/log"
+ "github.com/apache/incubator-devlake/core/models"
+ "github.com/apache/incubator-devlake/core/models/domainlayer/domaininfo"
"github.com/apache/incubator-devlake/core/plugin"
+ serviceHelper "github.com/apache/incubator-devlake/helpers/pluginhelper/services"
"github.com/go-playground/validator/v10"
"github.com/mitchellh/mapstructure"
- "gorm.io/gorm"
-
"reflect"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
)
var (
@@ -48,14 +44,26 @@ var (
type NoTransformation struct{}
-// ScopeApiHelper is used to write the CURD of scopes
-type ScopeApiHelper[Conn any, Scope any, Tr any] struct {
- log log.Logger
- db dal.Dal
- validator *validator.Validate
- bpManager *serviceHelper.BlueprintManager
- connHelper *ConnectionApiHelper
-}
+type (
+ GenericScopeApiHelper[Conn any, Scope any, Tr any] struct {
+ log log.Logger
+ db dal.Dal
+ validator *validator.Validate
+ reflectionParams *ReflectionParameters
+ dbHelper ScopeDatabaseHelper[Conn, Scope, Tr]
+ bpManager *serviceHelper.BlueprintManager
+ connHelper *ConnectionApiHelper
+ opts *ScopeHelperOptions
+ }
+ ReflectionParameters struct {
+ ScopeIdFieldName string
+ ScopeIdColumnName string
+ RawScopeParamName string
+ }
+ ScopeHelperOptions struct {
+ GetScopeParamValue func(db dal.Dal, scopeId string) (string, errors.Error)
+ }
+)
type (
requestParams struct {
@@ -74,17 +82,22 @@ type (
}
)
-// NewScopeHelper creates a ScopeHelper for scopes management
-func NewScopeHelper[Conn any, Scope any, Tr any](
+func NewGenericScopeHelper[Conn any, Scope any, Tr any](
basicRes context.BasicRes,
vld *validator.Validate,
connHelper *ConnectionApiHelper,
-) *ScopeApiHelper[Conn, Scope, Tr] {
- if vld == nil {
- vld = validator.New()
- }
+ dbHelper ScopeDatabaseHelper[Conn, Scope, Tr],
+ params *ReflectionParameters,
+ opts *ScopeHelperOptions,
+) *GenericScopeApiHelper[Conn, Scope, Tr] {
if connHelper == nil {
- return nil
+ panic("nil connHelper")
+ }
+ if params == nil {
+ panic("reflection params not provided")
+ }
+ if opts == nil {
+ opts = &ScopeHelperOptions{}
}
tablesCacheLoader.Do(func() {
var err errors.Error
@@ -93,84 +106,55 @@ func NewScopeHelper[Conn any, Scope any, Tr any](
panic(err)
}
})
- return &ScopeApiHelper[Conn, Scope, Tr]{
- log: basicRes.GetLogger(),
- db: basicRes.GetDal(),
- validator: vld,
- bpManager: serviceHelper.NewBlueprintManager(basicRes.GetDal()),
- connHelper: connHelper,
+ return &GenericScopeApiHelper[Conn, Scope, Tr]{
+ log: basicRes.GetLogger(),
+ db: basicRes.GetDal(),
+ validator: vld,
+ reflectionParams: params,
+ dbHelper: dbHelper,
+ bpManager: serviceHelper.NewBlueprintManager(basicRes.GetDal()),
+ connHelper: connHelper,
+ opts: opts,
}
}
-type ScopeRes[T any] struct {
- Scope T `mapstructure:",squash"`
- TransformationRuleName string `mapstructure:"transformationRuleName,omitempty"`
- Blueprints []*models.Blueprint
-}
-
-type ScopeReq[T any] struct {
- Data []*T `json:"data"`
-}
-
-// Put saves the given scopes to the database. It expects a slice of struct pointers
-// as the scopes argument. It also expects a fieldName argument, which is used to extract
-// the connection ID from the input.Params map.
-func (c *ScopeApiHelper[Conn, Scope, Tr]) Put(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- var req struct {
- Data []*Scope `json:"data"`
- }
- err := errors.Convert(DecodeMapStruct(input.Body, &req, true))
- if err != nil {
- return nil, errors.BadInput.Wrap(err, "decoding scope error")
- }
+func (c *GenericScopeApiHelper[Conn, Scope, Tr]) PutScopes(input *plugin.ApiResourceInput, scopes []*Scope) ([]*ScopeRes[Scope], errors.Error) {
params := c.extractFromReqParam(input)
if params.connectionId == 0 {
return nil, errors.BadInput.New("invalid connectionId")
}
- err = c.VerifyConnection(params.connectionId)
+ err := c.dbHelper.VerifyConnection(params.connectionId)
+ if err != nil {
+ return nil, errors.Default.Wrap(err, fmt.Sprintf("error verifying connection for connection ID %d", params.connectionId))
+ }
+ err = c.validatePrimaryKeys(scopes)
if err != nil {
return nil, err
}
- // Create a map to keep track of primary key values
- keeper := make(map[string]struct{})
-
- // Set the CreatedDate and UpdatedDate fields to the current time for each scope
now := time.Now()
- for _, v := range req.Data {
- // Ensure that the primary key value is unique
- primaryValueStr := returnPrimaryKeyValue(*v)
- if _, ok := keeper[primaryValueStr]; ok {
- return nil, errors.BadInput.New("duplicated item")
- } else {
- keeper[primaryValueStr] = struct{}{}
- }
-
+ for _, scope := range scopes {
// Set the connection ID, CreatedDate, and UpdatedDate fields
- setScopeFields(v, params.connectionId, &now, &now)
-
- // Verify that the primary key value is valid
- err = VerifyScope(v, c.validator)
+ setScopeFields(scope, params.connectionId, &now, &now)
+ err = VerifyScope(scope, c.validator)
if err != nil {
- return nil, err
+ return nil, errors.Default.Wrap(err, "error verifying scope")
}
}
// Save the scopes to the database
- if req.Data != nil && len(req.Data) > 0 {
- err = c.save(&req.Data)
+ if len(scopes) > 0 {
+ err = c.dbHelper.SaveScope(scopes)
if err != nil {
- return nil, err
+ return nil, errors.Default.Wrap(err, "error saving scope")
}
}
-
- apiScopes, err := c.addTransformationName(req.Data)
+ apiScopes, err := c.addTransformationName(scopes...)
if err != nil {
- return nil, err
+ return nil, errors.Default.Wrap(err, "error associating transformation to scope")
}
-
- return &plugin.ApiResourceOutput{Body: apiScopes, Status: http.StatusOK}, nil
+ return apiScopes, nil
}
-func (c *ScopeApiHelper[Conn, Scope, Tr]) Update(input *plugin.ApiResourceInput, fieldName string) (*plugin.ApiResourceOutput, errors.Error) {
+func (c *GenericScopeApiHelper[Conn, Scope, Tr]) UpdateScope(input *plugin.ApiResourceInput) (*ScopeRes[Scope], errors.Error) {
params := c.extractFromReqParam(input)
if params.connectionId == 0 {
return nil, errors.BadInput.New("invalid connectionId")
@@ -178,76 +162,52 @@ func (c *ScopeApiHelper[Conn, Scope, Tr]) Update(input *plugin.ApiResourceInput,
if len(params.scopeId) == 0 {
return nil, errors.BadInput.New("invalid scopeId")
}
- err := c.VerifyConnection(params.connectionId)
+ err := c.dbHelper.VerifyConnection(params.connectionId)
if err != nil {
- return &plugin.ApiResourceOutput{Body: nil, Status: http.StatusInternalServerError}, err
+ return nil, err
}
- var scope Scope
- err = c.db.First(&scope, dal.Where(fmt.Sprintf("connection_id = ? AND %s = ?", fieldName), params.connectionId, params.scopeId))
+ scope, err := c.dbHelper.GetScope(params.connectionId, params.scopeId)
if err != nil {
- return &plugin.ApiResourceOutput{Body: nil, Status: http.StatusInternalServerError}, errors.Default.New("getting Scope error")
+ return nil, err
}
- err = DecodeMapStruct(input.Body, &scope, true)
+ err = DecodeMapStruct(input.Body, &scope, false)
if err != nil {
- return &plugin.ApiResourceOutput{Body: nil, Status: http.StatusInternalServerError}, errors.Default.Wrap(err, "patch scope error")
+ return nil, errors.Default.Wrap(err, "patch scope error")
}
err = VerifyScope(&scope, c.validator)
if err != nil {
- return &plugin.ApiResourceOutput{Body: nil, Status: http.StatusInternalServerError}, errors.Default.Wrap(err, "Invalid scope")
+ return nil, errors.Default.Wrap(err, "Invalid scope")
}
-
- err = c.db.Update(scope)
+ err = c.dbHelper.UpdateScope(params.connectionId, params.scopeId, &scope)
if err != nil {
- return &plugin.ApiResourceOutput{Body: nil, Status: http.StatusInternalServerError}, errors.Default.Wrap(err, "error on saving Scope")
- }
- valueRepoRuleId := reflect.ValueOf(scope).FieldByName("TransformationRuleId")
- if !valueRepoRuleId.IsValid() {
- return &plugin.ApiResourceOutput{Body: scope, Status: http.StatusOK}, nil
+ return nil, errors.Default.Wrap(err, "error on saving Scope")
}
- repoRuleId := reflect.ValueOf(scope).FieldByName("TransformationRuleId").Uint()
- var rule Tr
- if repoRuleId > 0 {
- err = c.db.First(&rule, dal.Where("id = ?", repoRuleId))
- if err != nil {
- return nil, errors.NotFound.New("transformationRule not found")
- }
+ scopeRes, err := c.addTransformationName(&scope)
+ if err != nil {
+ return nil, err
}
- scopeRes := &ScopeRes[Scope]{
- Scope: scope,
- TransformationRuleName: reflect.ValueOf(rule).FieldByName("Name").String()}
-
- return &plugin.ApiResourceOutput{Body: scopeRes, Status: http.StatusOK}, nil
+ return scopeRes[0], nil
}
-// GetScopeList returns a list of scopes. It expects a fieldName argument, which is used
-// to extract the connection ID from the input.Params map.
-
-func (c *ScopeApiHelper[Conn, Scope, Tr]) GetScopeList(input *plugin.ApiResourceInput, scopeIdFieldName ...string) (*plugin.ApiResourceOutput, errors.Error) {
- // Extract the connection ID from the input.Params map
+func (c *GenericScopeApiHelper[Conn, Scope, Tr]) GetScopes(input *plugin.ApiResourceInput) ([]*ScopeRes[Scope], errors.Error) {
params := c.extractFromGetReqParam(input)
if params.connectionId == 0 {
return nil, errors.BadInput.New("invalid path params: \"connectionId\" not set")
}
- err := c.VerifyConnection(params.connectionId)
+ err := c.dbHelper.VerifyConnection(params.connectionId)
if err != nil {
- return nil, err
+ return nil, errors.Default.Wrap(err, fmt.Sprintf("error verifying connection for connection ID %d", params.connectionId))
}
- limit, offset := GetLimitOffset(input.Query, "pageSize", "page")
- var scopes []*Scope
- err = c.db.All(&scopes, dal.Where("connection_id = ?", params.connectionId), dal.Limit(limit), dal.Offset(offset))
+ scopes, err := c.dbHelper.ListScopes(input, params.connectionId)
if err != nil {
- return nil, err
+ return nil, errors.Default.Wrap(err, fmt.Sprintf("error verifying connection for connection ID %d", params.connectionId))
}
-
- apiScopes, err := c.addTransformationName(scopes)
+ apiScopes, err := c.addTransformationName(scopes...)
if err != nil {
- return nil, err
+ return nil, errors.Default.Wrap(err, "error associating transformations with scopes")
}
if params.loadBlueprints {
- if len(scopeIdFieldName) == 0 {
- return nil, errors.Default.New("scope Id field name is not known") //temporary, limited solution until I properly refactor all of this in another PR
- }
- scopesById := c.mapByScopeId(apiScopes, scopeIdFieldName[0])
+ scopesById := c.mapByScopeId(apiScopes)
var scopeIds []string
for id := range scopesById {
scopeIds = append(scopeIds, id)
@@ -272,10 +232,10 @@ func (c *ScopeApiHelper[Conn, Scope, Tr]) GetScopeList(input *plugin.ApiResource
c.log.Warn(nil, "The following dangling scopes were found: %v", danglingIds)
}
}
- return &plugin.ApiResourceOutput{Body: apiScopes, Status: http.StatusOK}, nil
+ return apiScopes, nil
}
-func (c *ScopeApiHelper[Conn, Scope, Tr]) GetScope(input *plugin.ApiResourceInput, scopeIdColumnName string) (*plugin.ApiResourceOutput, errors.Error) {
+func (c *GenericScopeApiHelper[Conn, Scope, Tr]) GetScope(input *plugin.ApiResourceInput) (*ScopeRes[Scope], errors.Error) {
params := c.extractFromGetReqParam(input)
if params == nil || params.connectionId == 0 {
return nil, errors.BadInput.New("invalid path params: \"connectionId\" not set")
@@ -283,41 +243,34 @@ func (c *ScopeApiHelper[Conn, Scope, Tr]) GetScope(input *plugin.ApiResourceInpu
if len(params.scopeId) == 0 || params.scopeId == "0" {
return nil, errors.BadInput.New("invalid path params: \"scopeId\" not set/invalid")
}
- err := c.VerifyConnection(params.connectionId)
+ err := c.dbHelper.VerifyConnection(params.connectionId)
if err != nil {
- return nil, err
- }
- db := c.db
-
- query := dal.Where(fmt.Sprintf("connection_id = ? AND %s = ?", scopeIdColumnName), params.connectionId, params.scopeId)
- var scope Scope
- err = db.First(&scope, query)
- if db.IsErrorNotFound(err) {
- return nil, errors.NotFound.New("Scope not found")
+ return nil, errors.Default.Wrap(err, fmt.Sprintf("error verifying connection for connection ID %d", params.connectionId))
}
+ scope, err := c.dbHelper.GetScope(params.connectionId, params.scopeId)
if err != nil {
- return nil, err
+ return nil, errors.Default.Wrap(err, fmt.Sprintf("error retrieving scope with scope ID %s", params.scopeId))
}
- valueRepoRuleId := reflect.ValueOf(scope).FieldByName("TransformationRuleId")
- if !valueRepoRuleId.IsValid() {
- return &plugin.ApiResourceOutput{Body: scope, Status: http.StatusOK}, nil
+ apiScopes, err := c.addTransformationName(&scope)
+ if err != nil {
+ return nil, errors.Default.Wrap(err, fmt.Sprintf("error associating transformation with scope %s", params.scopeId))
}
- repoRuleId := reflect.ValueOf(scope).FieldByName("TransformationRuleId").Uint()
- var rule Tr
- if repoRuleId > 0 {
- err = db.First(&rule, dal.Where("id = ?", repoRuleId))
+ scopeRes := apiScopes[0]
+ var blueprints []*models.Blueprint
+ if params.loadBlueprints {
+ blueprintMap, err := c.bpManager.GetBlueprintsByScopes(params.connectionId, params.scopeId)
if err != nil {
- return nil, errors.NotFound.New("transformationRule not found")
+ return nil, errors.Default.Wrap(err, fmt.Sprintf("error getting blueprints for scope with scope ID %s", params.scopeId))
+ }
+ if len(blueprintMap) == 1 {
+ blueprints = blueprintMap[params.scopeId]
}
}
- scopeRes := &ScopeRes[Scope]{
- Scope: scope,
- TransformationRuleName: reflect.ValueOf(rule).FieldByName("Name").String(),
- }
- return &plugin.ApiResourceOutput{Body: scopeRes, Status: http.StatusOK}, nil
+ scopeRes.Blueprints = blueprints
+ return scopeRes, nil
}
-func (c *ScopeApiHelper[Conn, Scope, Tr]) DeleteScope(input *plugin.ApiResourceInput, scopeIdFieldName string, rawScopeParamName string,
- getScopeParamValue func(db dal.Dal, scopeId string) (string, errors.Error)) (*plugin.ApiResourceOutput, errors.Error) {
+
+func (c *GenericScopeApiHelper[Conn, Scope, Tr]) DeleteScope(input *plugin.ApiResourceInput) ([]*models.Blueprint, errors.Error) {
params := c.extractFromDeleteReqParam(input)
if params == nil || params.connectionId == 0 {
return nil, errors.BadInput.New("invalid path params: \"connectionId\" not set")
@@ -325,7 +278,7 @@ func (c *ScopeApiHelper[Conn, Scope, Tr]) DeleteScope(input *plugin.ApiResourceI
if len(params.scopeId) == 0 || params.scopeId == "0" {
return nil, errors.BadInput.New("invalid path params: \"scopeId\" not set/invalid")
}
- err := c.VerifyConnection(params.connectionId)
+ err := c.dbHelper.VerifyConnection(params.connectionId)
if err != nil {
return nil, errors.Default.Wrap(err, fmt.Sprintf("error verifying connection for connection ID %d", params.connectionId))
}
@@ -341,16 +294,16 @@ func (c *ScopeApiHelper[Conn, Scope, Tr]) DeleteScope(input *plugin.ApiResourceI
return nil, errors.Default.Wrap(err, fmt.Sprintf("error getting database tables managed by plugin %s", params.plugin))
}
// delete all the plugin records referencing this scope
- if rawScopeParamName != "" {
+ if c.reflectionParams.RawScopeParamName != "" {
scopeParamValue := params.scopeId
- if getScopeParamValue != nil {
- scopeParamValue, err = getScopeParamValue(c.db, params.scopeId) // this function is optional - use it if API data params stores a value different to the scope id (e.g. github plugin)
+ if c.opts.GetScopeParamValue != nil {
+ scopeParamValue, err = c.opts.GetScopeParamValue(c.db, params.scopeId)
if err != nil {
return nil, errors.Default.Wrap(err, fmt.Sprintf("error extracting scope parameter name for scope %s", params.scopeId))
}
}
for _, table := range tables {
- err = db.Exec(createDeleteQuery(table, rawScopeParamName, scopeParamValue))
+ err = db.Exec(createDeleteQuery(table, c.reflectionParams.RawScopeParamName, scopeParamValue))
if err != nil {
return nil, errors.Default.Wrap(err, fmt.Sprintf("error deleting data bound to scope %s for plugin %s", params.scopeId, params.plugin))
}
@@ -359,9 +312,7 @@ func (c *ScopeApiHelper[Conn, Scope, Tr]) DeleteScope(input *plugin.ApiResourceI
var impactedBlueprints []*models.Blueprint
if !params.deleteDataOnly {
// Delete the scope itself
- scope := new(Scope)
- err = c.db.Delete(&scope, dal.Where(fmt.Sprintf("connection_id = ? AND %s = ?", scopeIdFieldName),
- params.connectionId, params.scopeId))
+ err = c.dbHelper.DeleteScope(params.connectionId, params.scopeId)
if err != nil {
return nil, errors.Default.Wrap(err, fmt.Sprintf("error deleting scope %s", params.scopeId))
}
@@ -397,38 +348,25 @@ func (c *ScopeApiHelper[Conn, Scope, Tr]) DeleteScope(input *plugin.ApiResourceI
}
}
}
- return &plugin.ApiResourceOutput{Body: impactedBlueprints, Status: http.StatusOK}, nil
-}
-
-func (c *ScopeApiHelper[Conn, Scope, Tr]) VerifyConnection(connId uint64) errors.Error {
- var conn Conn
- err := c.connHelper.FirstById(&conn, connId)
- if err != nil {
- if errors.Is(err, gorm.ErrRecordNotFound) {
- return errors.BadInput.New("Invalid Connection Id")
- }
- return err
- }
- return nil
+ return impactedBlueprints, nil
}
-func (c *ScopeApiHelper[Conn, Scope, Tr]) addTransformationName(scopes []*Scope) ([]*ScopeRes[Scope], errors.Error) {
+func (c *GenericScopeApiHelper[Conn, Scope, Tr]) addTransformationName(scopes ...*Scope) ([]*ScopeRes[Scope], errors.Error) {
var ruleIds []uint64
-
- apiScopes := make([]*ScopeRes[Scope], 0)
for _, scope := range scopes {
- valueRepoRuleId := reflect.ValueOf(scope).Elem().FieldByName("TransformationRuleId")
+ valueRepoRuleId := reflectField(scope, "TransformationRuleId")
if !valueRepoRuleId.IsValid() {
break
}
- ruleId := valueRepoRuleId.Uint()
+ ruleId := reflectField(scope, "TransformationRuleId").Uint()
if ruleId > 0 {
ruleIds = append(ruleIds, ruleId)
}
}
var rules []*Tr
+ var err errors.Error
if len(ruleIds) > 0 {
- err := c.db.All(&rules, dal.Where("id IN (?)", ruleIds))
+ rules, err = c.dbHelper.ListTransformationRules(ruleIds)
if err != nil {
return nil, err
}
@@ -436,46 +374,33 @@ func (c *ScopeApiHelper[Conn, Scope, Tr]) addTransformationName(scopes []*Scope)
names := make(map[uint64]string)
for _, rule := range rules {
// Get the reflect.Value of the i-th struct pointer in the slice
- names[reflect.ValueOf(rule).Elem().FieldByName("ID").Uint()] = reflect.ValueOf(rule).Elem().FieldByName("Name").String()
+ names[reflectField(rule, "ID").Uint()] = reflectField(rule, "Name").String()
}
-
+ apiScopes := make([]*ScopeRes[Scope], 0)
for _, scope := range scopes {
- field := reflect.ValueOf(scope).Elem().FieldByName("TransformationRuleId")
- if field.IsValid() {
- apiScopes = append(apiScopes, &ScopeRes[Scope]{
- Scope: *scope,
- TransformationRuleName: names[field.Uint()],
- })
- } else {
- apiScopes = append(apiScopes, &ScopeRes[Scope]{Scope: *scope, TransformationRuleName: ""})
+ txRuleField := reflectField(scope, "TransformationRuleId")
+ txRuleName := ""
+ if txRuleField.IsValid() {
+ txRuleName = names[txRuleField.Uint()]
}
-
+ apiScopes = append(apiScopes, &ScopeRes[Scope]{
+ Scope: *scope,
+ TransformationRuleName: txRuleName,
+ })
}
-
return apiScopes, nil
}
-func (c *ScopeApiHelper[Conn, Scope, Tr]) save(scope interface{}) errors.Error {
- err := c.db.CreateOrUpdate(scope)
- if err != nil {
- if c.db.IsDuplicationError(err) {
- return errors.BadInput.New("the scope already exists")
- }
- return err
- }
- return nil
-}
-
-func (c *ScopeApiHelper[Conn, Scope, Tr]) mapByScopeId(scopes []*ScopeRes[Scope], scopeIdFieldName string) map[string]*ScopeRes[Scope] {
+func (c *GenericScopeApiHelper[Conn, Scope, Tr]) mapByScopeId(scopes []*ScopeRes[Scope]) map[string]*ScopeRes[Scope] {
scopeMap := map[string]*ScopeRes[Scope]{}
for _, scope := range scopes {
- scopeId := fmt.Sprintf("%v", reflectField(scope.Scope, scopeIdFieldName).Interface())
+ scopeId := fmt.Sprintf("%v", reflectField(scope.Scope, c.reflectionParams.ScopeIdFieldName).Interface())
scopeMap[scopeId] = scope
}
return scopeMap
}
-func (c *ScopeApiHelper[Conn, Scope, Tr]) extractFromReqParam(input *plugin.ApiResourceInput) *requestParams {
+func (c *GenericScopeApiHelper[Conn, Scope, Tr]) extractFromReqParam(input *plugin.ApiResourceInput) *requestParams {
connectionId, err := strconv.ParseUint(input.Params["connectionId"], 10, 64)
if err != nil || connectionId == 0 {
connectionId = 0
@@ -489,7 +414,7 @@ func (c *ScopeApiHelper[Conn, Scope, Tr]) extractFromReqParam(input *plugin.ApiR
}
}
-func (c *ScopeApiHelper[Conn, Scope, Tr]) extractFromDeleteReqParam(input *plugin.ApiResourceInput) *deleteRequestParams {
+func (c *GenericScopeApiHelper[Conn, Scope, Tr]) extractFromDeleteReqParam(input *plugin.ApiResourceInput) *deleteRequestParams {
params := c.extractFromReqParam(input)
var err errors.Error
var deleteDataOnly bool
@@ -508,7 +433,7 @@ func (c *ScopeApiHelper[Conn, Scope, Tr]) extractFromDeleteReqParam(input *plugi
}
}
-func (c *ScopeApiHelper[Conn, Scope, Tr]) extractFromGetReqParam(input *plugin.ApiResourceInput) *getRequestParams {
+func (c *GenericScopeApiHelper[Conn, Scope, Tr]) extractFromGetReqParam(input *plugin.ApiResourceInput) *getRequestParams {
params := c.extractFromReqParam(input)
var err errors.Error
var loadBlueprints bool
@@ -532,8 +457,7 @@ func setScopeFields(p interface{}, connectionId uint64, createdDate *time.Time,
if pType.Kind() != reflect.Ptr {
panic("expected a pointer to a struct")
}
- pValue := reflect.ValueOf(p).Elem()
-
+ pValue := reflectValue(p)
// set connectionId
connIdField := pValue.FieldByName("ConnectionId")
connIdField.SetUint(connectionId)
@@ -565,8 +489,8 @@ func setScopeFields(p interface{}, connectionId uint64, createdDate *time.Time,
func returnPrimaryKeyValue(p interface{}) string {
result := ""
// get the type and value of the input interface using reflection
- t := reflect.TypeOf(p)
- v := reflect.ValueOf(p)
+ t := reflectType(p)
+ v := reflectValue(p)
// iterate over each field in the struct type
for i := 0; i < t.NumField(); i++ {
// get the i-th field
@@ -601,6 +525,23 @@ func VerifyScope(scope interface{}, vld *validator.Validate) errors.Error {
return nil
}
+func (c *GenericScopeApiHelper[Conn, Scope, Tr]) validatePrimaryKeys(scopes []*Scope) errors.Error {
+ if c.validator == nil {
+ return nil
+ }
+ keeper := make(map[string]struct{})
+ for _, scope := range scopes {
+ // Ensure that the primary key value is unique
+ primaryValueStr := returnPrimaryKeyValue(scope)
+ if _, ok := keeper[primaryValueStr]; ok {
+ return errors.BadInput.New("duplicate scope was requested")
+ } else {
+ keeper[primaryValueStr] = struct{}{}
+ }
+ }
+ return nil
+}
+
// Implement MarshalJSON method to flatten all fields
func (sr *ScopeRes[T]) MarshalJSON() ([]byte, error) {
var flatMap map[string]interface{}
@@ -665,32 +606,3 @@ func getAffectedTables(pluginName string) ([]string, errors.Error) {
}
return tables, nil
}
-
-func reflectField(obj any, fieldName string) reflect.Value {
- return reflectValue(obj).FieldByName(fieldName)
-}
-
-func hasField(obj any, fieldName string) bool {
- _, ok := reflectType(obj).FieldByName(fieldName)
- return ok
-}
-
-func reflectValue(obj any) reflect.Value {
- val := reflect.ValueOf(obj)
- kind := val.Kind()
- for kind == reflect.Ptr || kind == reflect.Interface {
- val = val.Elem()
- kind = val.Kind()
- }
- return val
-}
-
-func reflectType(obj any) reflect.Type {
- typ := reflect.TypeOf(obj)
- kind := typ.Kind()
- for kind == reflect.Ptr {
- typ = typ.Elem()
- kind = typ.Kind()
- }
- return typ
-}
diff --git a/backend/helpers/pluginhelper/api/scope_helper.go b/backend/helpers/pluginhelper/api/scope_helper.go
index a2edbcf7c..51439f2d3 100644
--- a/backend/helpers/pluginhelper/api/scope_helper.go
+++ b/backend/helpers/pluginhelper/api/scope_helper.go
@@ -18,98 +18,59 @@ limitations under the License.
package api
import (
- "encoding/json"
- "fmt"
- "github.com/apache/incubator-devlake/core/models"
- "github.com/apache/incubator-devlake/core/models/domainlayer/domaininfo"
- serviceHelper "github.com/apache/incubator-devlake/helpers/pluginhelper/services"
- "net/http"
- "strconv"
- "strings"
- "sync"
- "time"
-
"github.com/apache/incubator-devlake/core/context"
- "github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
- "github.com/apache/incubator-devlake/core/log"
+ "github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/go-playground/validator/v10"
- "github.com/mitchellh/mapstructure"
- "gorm.io/gorm"
-
- "reflect"
-)
-
-var (
- tablesCache []string // these cached vars can probably be moved somewhere more centralized later
- tablesCacheLoader = new(sync.Once)
+ "net/http"
)
-type NoTransformation struct{}
-
-// ScopeApiHelper is used to write the CURD of scopes
-type ScopeApiHelper[Conn any, Scope any, Tr any] struct {
- log log.Logger
- db dal.Dal
- validator *validator.Validate
- bpManager *serviceHelper.BlueprintManager
- connHelper *ConnectionApiHelper
-}
-
type (
- requestParams struct {
- connectionId uint64
- scopeId string
- plugin string
+ // ScopeApiHelper is used to write the CURD of scopes
+ ScopeApiHelper[Conn any, Scope any, Tr any] struct {
+ *GenericScopeApiHelper[Conn, Scope, Tr]
}
- deleteRequestParams struct {
- requestParams
- deleteDataOnly bool
+ ScopeRes[T any] struct {
+ Scope T `mapstructure:",squash"`
+ TransformationRuleName string `mapstructure:"transformationRuleName,omitempty"`
+ Blueprints []*models.Blueprint `mapstructure:"blueprints,omitempty"`
}
-
- getRequestParams struct {
- requestParams
- loadBlueprints bool
+ ScopeReq[T any] struct {
+ Data []*T `json:"data"`
}
)
-// NewScopeHelper creates a ScopeHelper for scopes management
+// Kept for backward compatibility. Use NewScopeHelper2 instead until we do a mass refactor
func NewScopeHelper[Conn any, Scope any, Tr any](
basicRes context.BasicRes,
vld *validator.Validate,
connHelper *ConnectionApiHelper,
) *ScopeApiHelper[Conn, Scope, Tr] {
- if vld == nil {
- vld = validator.New()
- }
- if connHelper == nil {
- return nil
- }
- tablesCacheLoader.Do(func() {
- var err errors.Error
- tablesCache, err = basicRes.GetDal().AllTables()
- if err != nil {
- panic(err)
- }
- })
- return &ScopeApiHelper[Conn, Scope, Tr]{
- log: basicRes.GetLogger(),
- db: basicRes.GetDal(),
- validator: vld,
- bpManager: serviceHelper.NewBlueprintManager(basicRes.GetDal()),
- connHelper: connHelper,
- }
+ reflectionParams := ReflectionParameters{}
+ return NewScopeHelper2[Conn, Scope, Tr](
+ basicRes,
+ vld,
+ connHelper,
+ NewScopeDatabaseHelperImpl[Conn, Scope, Tr](basicRes, connHelper, &reflectionParams),
+ &reflectionParams,
+ nil,
+ )
}
-type ScopeRes[T any] struct {
- Scope T `mapstructure:",squash"`
- TransformationRuleName string `mapstructure:"transformationRuleName,omitempty"`
- Blueprints []*models.Blueprint
-}
-
-type ScopeReq[T any] struct {
- Data []*T `json:"data"`
+// NewScopeHelper creates a ScopeHelper for scopes management
+func NewScopeHelper2[Conn any, Scope any, Tr any](
+ basicRes context.BasicRes,
+ vld *validator.Validate,
+ connHelper *ConnectionApiHelper,
+ dbHelper ScopeDatabaseHelper[Conn, Scope, Tr],
+ params *ReflectionParameters,
+ opts *ScopeHelperOptions,
+) *ScopeApiHelper[Conn, Scope, Tr] {
+ return &ScopeApiHelper[Conn, Scope, Tr]{
+ NewGenericScopeHelper[Conn, Scope, Tr](
+ basicRes, vld, connHelper, dbHelper, params, opts),
+ }
}
// Put saves the given scopes to the database. It expects a slice of struct pointers
@@ -123,574 +84,50 @@ func (c *ScopeApiHelper[Conn, Scope, Tr]) Put(input *plugin.ApiResourceInput) (*
if err != nil {
return nil, errors.BadInput.Wrap(err, "decoding scope error")
}
- params := c.extractFromReqParam(input)
- if params.connectionId == 0 {
- return nil, errors.BadInput.New("invalid connectionId")
- }
- err = c.VerifyConnection(params.connectionId)
- if err != nil {
- return nil, err
- }
- // Create a map to keep track of primary key values
- keeper := make(map[string]struct{})
-
- // Set the CreatedDate and UpdatedDate fields to the current time for each scope
- now := time.Now()
- for _, v := range req.Data {
- // Ensure that the primary key value is unique
- primaryValueStr := returnPrimaryKeyValue(*v)
- if _, ok := keeper[primaryValueStr]; ok {
- return nil, errors.BadInput.New("duplicated item")
- } else {
- keeper[primaryValueStr] = struct{}{}
- }
-
- // Set the connection ID, CreatedDate, and UpdatedDate fields
- setScopeFields(v, params.connectionId, &now, &now)
-
- // Verify that the primary key value is valid
- err = VerifyScope(v, c.validator)
- if err != nil {
- return nil, err
- }
- }
- // Save the scopes to the database
- if req.Data != nil && len(req.Data) > 0 {
- err = c.save(&req.Data)
- if err != nil {
- return nil, err
- }
- }
-
- apiScopes, err := c.addTransformationName(req.Data)
+ // Extract the connection ID from the input.Params map
+ apiScopes, err := c.PutScopes(input, req.Data)
if err != nil {
return nil, err
}
-
return &plugin.ApiResourceOutput{Body: apiScopes, Status: http.StatusOK}, nil
}
+// TODO remove fieldName param in the future and adjust plugins to use reflection params on init
func (c *ScopeApiHelper[Conn, Scope, Tr]) Update(input *plugin.ApiResourceInput, fieldName string) (*plugin.ApiResourceOutput, errors.Error) {
- params := c.extractFromReqParam(input)
- if params.connectionId == 0 {
- return nil, errors.BadInput.New("invalid connectionId")
- }
- if len(params.scopeId) == 0 {
- return nil, errors.BadInput.New("invalid scopeId")
+ if fieldName != "" {
+ c.reflectionParams.ScopeIdColumnName = fieldName //for backward compatibility
}
- err := c.VerifyConnection(params.connectionId)
- if err != nil {
- return &plugin.ApiResourceOutput{Body: nil, Status: http.StatusInternalServerError}, err
- }
- var scope Scope
- err = c.db.First(&scope, dal.Where(fmt.Sprintf("connection_id = ? AND %s = ?", fieldName), params.connectionId, params.scopeId))
- if err != nil {
- return &plugin.ApiResourceOutput{Body: nil, Status: http.StatusInternalServerError}, errors.Default.New("getting Scope error")
- }
- err = DecodeMapStruct(input.Body, &scope, true)
- if err != nil {
- return &plugin.ApiResourceOutput{Body: nil, Status: http.StatusInternalServerError}, errors.Default.Wrap(err, "patch scope error")
- }
- err = VerifyScope(&scope, c.validator)
- if err != nil {
- return &plugin.ApiResourceOutput{Body: nil, Status: http.StatusInternalServerError}, errors.Default.Wrap(err, "Invalid scope")
- }
-
- err = c.db.Update(scope)
- if err != nil {
- return &plugin.ApiResourceOutput{Body: nil, Status: http.StatusInternalServerError}, errors.Default.Wrap(err, "error on saving Scope")
- }
- valueRepoRuleId := reflect.ValueOf(scope).FieldByName("TransformationRuleId")
- if !valueRepoRuleId.IsValid() {
- return &plugin.ApiResourceOutput{Body: scope, Status: http.StatusOK}, nil
- }
- repoRuleId := reflect.ValueOf(scope).FieldByName("TransformationRuleId").Uint()
- var rule Tr
- if repoRuleId > 0 {
- err = c.db.First(&rule, dal.Where("id = ?", repoRuleId))
- if err != nil {
- return nil, errors.NotFound.New("transformationRule not found")
- }
- }
- scopeRes := &ScopeRes[Scope]{
- Scope: scope,
- TransformationRuleName: reflect.ValueOf(rule).FieldByName("Name").String()}
-
- return &plugin.ApiResourceOutput{Body: scopeRes, Status: http.StatusOK}, nil
-}
-
-// GetScopeList returns a list of scopes. It expects a fieldName argument, which is used
-// to extract the connection ID from the input.Params map.
-
-func (c *ScopeApiHelper[Conn, Scope, Tr]) GetScopeList(input *plugin.ApiResourceInput, scopeIdFieldName ...string) (*plugin.ApiResourceOutput, errors.Error) {
- // Extract the connection ID from the input.Params map
- params := c.extractFromGetReqParam(input)
- if params.connectionId == 0 {
- return nil, errors.BadInput.New("invalid path params: \"connectionId\" not set")
- }
- err := c.VerifyConnection(params.connectionId)
+ apiScope, err := c.GenericScopeApiHelper.UpdateScope(input)
if err != nil {
return nil, err
}
- limit, offset := GetLimitOffset(input.Query, "pageSize", "page")
- var scopes []*Scope
- err = c.db.All(&scopes, dal.Where("connection_id = ?", params.connectionId), dal.Limit(limit), dal.Offset(offset))
- if err != nil {
- return nil, err
- }
-
- apiScopes, err := c.addTransformationName(scopes)
- if err != nil {
- return nil, err
- }
- if params.loadBlueprints {
- if len(scopeIdFieldName) == 0 {
- return nil, errors.Default.New("scope Id field name is not known") //temporary, limited solution until I properly refactor all of this in another PR
- }
- scopesById := c.mapByScopeId(apiScopes, scopeIdFieldName[0])
- var scopeIds []string
- for id := range scopesById {
- scopeIds = append(scopeIds, id)
- }
- blueprintMap, err := c.bpManager.GetBlueprintsByScopes(params.connectionId, scopeIds...)
- if err != nil {
- return nil, errors.Default.Wrap(err, fmt.Sprintf("error getting blueprints for scopes from connection %d", params.connectionId))
- }
- apiScopes = nil
- for scopeId, scope := range scopesById {
- if bps, ok := blueprintMap[scopeId]; ok {
- scope.Blueprints = bps
- delete(blueprintMap, scopeId)
- }
- apiScopes = append(apiScopes, scope)
- }
- if len(blueprintMap) > 0 {
- var danglingIds []string
- for bpId := range blueprintMap {
- danglingIds = append(danglingIds, bpId)
- }
- c.log.Warn(nil, "The following dangling scopes were found: %v", danglingIds)
- }
- }
- return &plugin.ApiResourceOutput{Body: apiScopes, Status: http.StatusOK}, nil
+ return &plugin.ApiResourceOutput{Body: apiScope, Status: http.StatusOK}, nil
}
-func (c *ScopeApiHelper[Conn, Scope, Tr]) GetScope(input *plugin.ApiResourceInput, scopeIdColumnName string) (*plugin.ApiResourceOutput, errors.Error) {
- params := c.extractFromGetReqParam(input)
- if params == nil || params.connectionId == 0 {
- return nil, errors.BadInput.New("invalid path params: \"connectionId\" not set")
- }
- if len(params.scopeId) == 0 || params.scopeId == "0" {
- return nil, errors.BadInput.New("invalid path params: \"scopeId\" not set/invalid")
- }
- err := c.VerifyConnection(params.connectionId)
+func (c *ScopeApiHelper[Conn, Scope, Tr]) GetScopeList(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ scopes, err := c.GetScopes(input)
if err != nil {
return nil, err
}
- db := c.db
-
- query := dal.Where(fmt.Sprintf("connection_id = ? AND %s = ?", scopeIdColumnName), params.connectionId, params.scopeId)
- var scope Scope
- err = db.First(&scope, query)
- if db.IsErrorNotFound(err) {
- return nil, errors.NotFound.New("Scope not found")
- }
- if err != nil {
- return nil, err
- }
- valueRepoRuleId := reflect.ValueOf(scope).FieldByName("TransformationRuleId")
- if !valueRepoRuleId.IsValid() {
- return &plugin.ApiResourceOutput{Body: scope, Status: http.StatusOK}, nil
- }
- repoRuleId := reflect.ValueOf(scope).FieldByName("TransformationRuleId").Uint()
- var rule Tr
- if repoRuleId > 0 {
- err = db.First(&rule, dal.Where("id = ?", repoRuleId))
- if err != nil {
- return nil, errors.NotFound.New("transformationRule not found")
- }
- }
- scopeRes := &ScopeRes[Scope]{
- Scope: scope,
- TransformationRuleName: reflect.ValueOf(rule).FieldByName("Name").String(),
- }
- return &plugin.ApiResourceOutput{Body: scopeRes, Status: http.StatusOK}, nil
+ return &plugin.ApiResourceOutput{Body: scopes, Status: http.StatusOK}, nil
}
-func (c *ScopeApiHelper[Conn, Scope, Tr]) DeleteScope(input *plugin.ApiResourceInput, scopeIdFieldName string, rawScopeParamName string,
- getScopeParamValue func(db dal.Dal, scopeId string) (string, errors.Error)) (*plugin.ApiResourceOutput, errors.Error) {
- params := c.extractFromDeleteReqParam(input)
- if params == nil || params.connectionId == 0 {
- return nil, errors.BadInput.New("invalid path params: \"connectionId\" not set")
- }
- if len(params.scopeId) == 0 || params.scopeId == "0" {
- return nil, errors.BadInput.New("invalid path params: \"scopeId\" not set/invalid")
- }
- err := c.VerifyConnection(params.connectionId)
- if err != nil {
- return nil, errors.Default.Wrap(err, fmt.Sprintf("error verifying connection for connection ID %d", params.connectionId))
- }
- db := c.db
- blueprintsMap, err := c.bpManager.GetBlueprintsByScopes(params.connectionId, params.scopeId)
- if err != nil {
- return nil, errors.Default.Wrap(err, fmt.Sprintf("error retrieving scope with scope ID %s", params.scopeId))
- }
- blueprints := blueprintsMap[params.scopeId]
- // find all tables for this plugin
- tables, err := getAffectedTables(params.plugin)
- if err != nil {
- return nil, errors.Default.Wrap(err, fmt.Sprintf("error getting database tables managed by plugin %s", params.plugin))
- }
- // delete all the plugin records referencing this scope
- if rawScopeParamName != "" {
- scopeParamValue := params.scopeId
- if getScopeParamValue != nil {
- scopeParamValue, err = getScopeParamValue(c.db, params.scopeId) // this function is optional - use it if API data params stores a value different to the scope id (e.g. github plugin)
- if err != nil {
- return nil, errors.Default.Wrap(err, fmt.Sprintf("error extracting scope parameter name for scope %s", params.scopeId))
- }
- }
- for _, table := range tables {
- err = db.Exec(createDeleteQuery(table, rawScopeParamName, scopeParamValue))
- if err != nil {
- return nil, errors.Default.Wrap(err, fmt.Sprintf("error deleting data bound to scope %s for plugin %s", params.scopeId, params.plugin))
- }
- }
- }
- var impactedBlueprints []*models.Blueprint
- if !params.deleteDataOnly {
- // Delete the scope itself
- scope := new(Scope)
- err = c.db.Delete(&scope, dal.Where(fmt.Sprintf("connection_id = ? AND %s = ?", scopeIdFieldName),
- params.connectionId, params.scopeId))
- if err != nil {
- return nil, errors.Default.Wrap(err, fmt.Sprintf("error deleting scope %s", params.scopeId))
- }
- // update the blueprints (remove scope reference from them)
- for _, blueprint := range blueprints {
- settings, _ := blueprint.UnmarshalSettings()
- var changed bool
- err = settings.UpdateConnections(func(c *plugin.BlueprintConnectionV200) errors.Error {
- var retainedScopes []*plugin.BlueprintScopeV200
- for _, bpScope := range c.Scopes {
- if bpScope.Id == params.scopeId { // we'll be removing this one
- changed = true
- } else {
- retainedScopes = append(retainedScopes, bpScope)
- }
- }
- c.Scopes = retainedScopes
- return nil
- })
- if err != nil {
- return nil, errors.Default.Wrap(err, fmt.Sprintf("error removing scope %s from blueprint %d", params.scopeId, blueprint.ID))
- }
- if changed {
- err = blueprint.UpdateSettings(&settings)
- if err != nil {
- return nil, errors.Default.Wrap(err, fmt.Sprintf("error writing new settings into blueprint %s", blueprint.Name))
- }
- err = c.bpManager.SaveDbBlueprint(blueprint)
- if err != nil {
- return nil, errors.Default.Wrap(err, fmt.Sprintf("error saving the updated blueprint %s", blueprint.Name))
- }
- impactedBlueprints = append(impactedBlueprints, blueprint)
- }
- }
- }
- return &plugin.ApiResourceOutput{Body: impactedBlueprints, Status: http.StatusOK}, nil
-}
-
-func (c *ScopeApiHelper[Conn, Scope, Tr]) VerifyConnection(connId uint64) errors.Error {
- var conn Conn
- err := c.connHelper.FirstById(&conn, connId)
- if err != nil {
- if errors.Is(err, gorm.ErrRecordNotFound) {
- return errors.BadInput.New("Invalid Connection Id")
- }
- return err
- }
- return nil
-}
-
-func (c *ScopeApiHelper[Conn, Scope, Tr]) addTransformationName(scopes []*Scope) ([]*ScopeRes[Scope], errors.Error) {
- var ruleIds []uint64
-
- apiScopes := make([]*ScopeRes[Scope], 0)
- for _, scope := range scopes {
- valueRepoRuleId := reflect.ValueOf(scope).Elem().FieldByName("TransformationRuleId")
- if !valueRepoRuleId.IsValid() {
- break
- }
- ruleId := valueRepoRuleId.Uint()
- if ruleId > 0 {
- ruleIds = append(ruleIds, ruleId)
- }
- }
- var rules []*Tr
- if len(ruleIds) > 0 {
- err := c.db.All(&rules, dal.Where("id IN (?)", ruleIds))
- if err != nil {
- return nil, err
- }
- }
- names := make(map[uint64]string)
- for _, rule := range rules {
- // Get the reflect.Value of the i-th struct pointer in the slice
- names[reflect.ValueOf(rule).Elem().FieldByName("ID").Uint()] = reflect.ValueOf(rule).Elem().FieldByName("Name").String()
- }
-
- for _, scope := range scopes {
- field := reflect.ValueOf(scope).Elem().FieldByName("TransformationRuleId")
- if field.IsValid() {
- apiScopes = append(apiScopes, &ScopeRes[Scope]{
- Scope: *scope,
- TransformationRuleName: names[field.Uint()],
- })
- } else {
- apiScopes = append(apiScopes, &ScopeRes[Scope]{Scope: *scope, TransformationRuleName: ""})
- }
+// TODO remove fieldName param in the future and adjust plugins to use reflection params on init
+func (c *ScopeApiHelper[Conn, Scope, Tr]) GetScope(input *plugin.ApiResourceInput, fieldName string) (*plugin.ApiResourceOutput, errors.Error) {
+ if fieldName != "" {
+ c.reflectionParams.ScopeIdColumnName = fieldName //for backward compatibility
}
-
- return apiScopes, nil
-}
-
-func (c *ScopeApiHelper[Conn, Scope, Tr]) save(scope interface{}) errors.Error {
- err := c.db.CreateOrUpdate(scope)
- if err != nil {
- if c.db.IsDuplicationError(err) {
- return errors.BadInput.New("the scope already exists")
- }
- return err
- }
- return nil
-}
-
-func (c *ScopeApiHelper[Conn, Scope, Tr]) mapByScopeId(scopes []*ScopeRes[Scope], scopeIdFieldName string) map[string]*ScopeRes[Scope] {
- scopeMap := map[string]*ScopeRes[Scope]{}
- for _, scope := range scopes {
- scopeId := fmt.Sprintf("%v", reflectField(scope.Scope, scopeIdFieldName).Interface())
- scopeMap[scopeId] = scope
- }
- return scopeMap
-}
-
-func (c *ScopeApiHelper[Conn, Scope, Tr]) extractFromReqParam(input *plugin.ApiResourceInput) *requestParams {
- connectionId, err := strconv.ParseUint(input.Params["connectionId"], 10, 64)
- if err != nil || connectionId == 0 {
- connectionId = 0
- }
- scopeId := input.Params["scopeId"]
- pluginName := input.Params["plugin"]
- return &requestParams{
- connectionId: connectionId,
- scopeId: scopeId,
- plugin: pluginName,
- }
-}
-
-func (c *ScopeApiHelper[Conn, Scope, Tr]) extractFromDeleteReqParam(input *plugin.ApiResourceInput) *deleteRequestParams {
- params := c.extractFromReqParam(input)
- var err errors.Error
- var deleteDataOnly bool
- {
- ddo, ok := input.Query["delete_data_only"]
- if ok {
- deleteDataOnly, err = errors.Convert01(strconv.ParseBool(ddo[0]))
- }
- if err != nil {
- deleteDataOnly = false
- }
- }
- return &deleteRequestParams{
- requestParams: *params,
- deleteDataOnly: deleteDataOnly,
- }
-}
-
-func (c *ScopeApiHelper[Conn, Scope, Tr]) extractFromGetReqParam(input *plugin.ApiResourceInput) *getRequestParams {
- params := c.extractFromReqParam(input)
- var err errors.Error
- var loadBlueprints bool
- {
- lbps, ok := input.Query["blueprints"]
- if ok {
- loadBlueprints, err = errors.Convert01(strconv.ParseBool(lbps[0]))
- }
- if err != nil {
- loadBlueprints = false
- }
- }
- return &getRequestParams{
- requestParams: *params,
- loadBlueprints: loadBlueprints,
- }
-}
-
-func setScopeFields(p interface{}, connectionId uint64, createdDate *time.Time, updatedDate *time.Time) {
- pType := reflect.TypeOf(p)
- if pType.Kind() != reflect.Ptr {
- panic("expected a pointer to a struct")
- }
- pValue := reflect.ValueOf(p).Elem()
-
- // set connectionId
- connIdField := pValue.FieldByName("ConnectionId")
- connIdField.SetUint(connectionId)
-
- // set CreatedDate
- createdDateField := pValue.FieldByName("CreatedDate")
- if createdDateField.IsValid() && createdDateField.Type().AssignableTo(reflect.TypeOf(createdDate)) {
- createdDateField.Set(reflect.ValueOf(createdDate))
- }
-
- // set UpdatedDate
- updatedDateField := pValue.FieldByName("UpdatedDate")
- if !updatedDateField.IsValid() || (updatedDate != nil && !updatedDateField.Type().AssignableTo(reflect.TypeOf(updatedDate))) {
- return
- }
- if updatedDate == nil {
- // if updatedDate is nil, set UpdatedDate to be nil
- updatedDateField.Set(reflect.Zero(updatedDateField.Type()))
- } else {
- // if updatedDate is not nil, set UpdatedDate to be the value
- updatedDateFieldValue := reflect.ValueOf(updatedDate)
- updatedDateField.Set(updatedDateFieldValue)
- }
-}
-
-// returnPrimaryKeyValue returns a string containing the primary key value(s) of a struct, concatenated with "-" between them.
-// This function receives an interface{} type argument p, which can be a pointer to any struct.
-// The function uses reflection to iterate through the fields of the struct, and checks if each field is tagged as "primaryKey".
-func returnPrimaryKeyValue(p interface{}) string {
- result := ""
- // get the type and value of the input interface using reflection
- t := reflect.TypeOf(p)
- v := reflect.ValueOf(p)
- // iterate over each field in the struct type
- for i := 0; i < t.NumField(); i++ {
- // get the i-th field
- field := t.Field(i)
-
- // check if the field is marked as "primaryKey" in the struct tag
- if strings.Contains(string(field.Tag), "primaryKey") {
- // if this is the first primaryKey field encountered, set the result to be its value
- if result == "" {
- result = fmt.Sprintf("%v", v.Field(i).Interface())
- } else {
- // if this is not the first primaryKey field, append its value to the result with a "-" separator
- result = fmt.Sprintf("%s-%v", result, v.Field(i).Interface())
- }
- }
- }
-
- // return the final primary key value as a string
- return result
-}
-
-func VerifyScope(scope interface{}, vld *validator.Validate) errors.Error {
- if vld != nil {
- pType := reflect.TypeOf(scope)
- if pType.Kind() != reflect.Ptr {
- panic("expected a pointer to a struct")
- }
- if err := vld.Struct(scope); err != nil {
- return errors.Default.Wrap(err, "error validating target")
- }
- }
- return nil
-}
-
-// Implement MarshalJSON method to flatten all fields
-func (sr *ScopeRes[T]) MarshalJSON() ([]byte, error) {
- var flatMap map[string]interface{}
- err := mapstructure.Decode(sr, &flatMap)
+ scope, err := c.GenericScopeApiHelper.GetScope(input)
if err != nil {
return nil, err
}
- // Encode the flattened map to JSON
- result, err := json.Marshal(flatMap)
- if err != nil {
- return nil, err
- }
-
- return result, nil
-}
-
-func createDeleteQuery(tableName string, scopeIdKey string, scopeId string) string {
- column := "_raw_data_params"
- if tableName == (models.CollectorLatestState{}.TableName()) {
- column = "raw_data_params"
- } else if strings.HasPrefix(tableName, "_raw_") {
- column = "params"
- }
- query := `DELETE FROM ` + tableName + ` WHERE ` + column + ` LIKE '%"` + scopeIdKey + `":"` + scopeId + `"%'`
- return query
+ return &plugin.ApiResourceOutput{Body: scope, Status: http.StatusOK}, nil
}
-func getAffectedTables(pluginName string) ([]string, errors.Error) {
- var tables []string
- meta, err := plugin.GetPlugin(pluginName)
+func (c *ScopeApiHelper[Conn, Scope, Tr]) Delete(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ bps, err := c.DeleteScope(input)
if err != nil {
return nil, err
}
- if pluginModel, ok := meta.(plugin.PluginModel); !ok {
- return nil, errors.Default.New(fmt.Sprintf("plugin \"%s\" does not implement listing its tables", pluginName))
- } else {
- // collect raw tables
- for _, table := range tablesCache {
- if strings.HasPrefix(table, "_raw_"+pluginName) {
- tables = append(tables, table)
- }
- }
- // collect tool tables
- tablesInfo := pluginModel.GetTablesInfo()
- for _, table := range tablesInfo {
- // we only care about tables with RawOrigin
- ok = hasField(table, "RawDataParams")
- if ok {
- tables = append(tables, table.TableName())
- }
- }
- // collect domain tables
- for _, domainTable := range domaininfo.GetDomainTablesInfo() {
- // we only care about tables with RawOrigin
- ok = hasField(domainTable, "RawDataParams")
- if ok {
- tables = append(tables, domainTable.TableName())
- }
- }
- // additional tables
- tables = append(tables, models.CollectorLatestState{}.TableName())
- }
- return tables, nil
-}
-
-func reflectField(obj any, fieldName string) reflect.Value {
- return reflectValue(obj).FieldByName(fieldName)
-}
-
-func hasField(obj any, fieldName string) bool {
- _, ok := reflectType(obj).FieldByName(fieldName)
- return ok
-}
-
-func reflectValue(obj any) reflect.Value {
- val := reflect.ValueOf(obj)
- kind := val.Kind()
- for kind == reflect.Ptr || kind == reflect.Interface {
- val = val.Elem()
- kind = val.Kind()
- }
- return val
-}
-
-func reflectType(obj any) reflect.Type {
- typ := reflect.TypeOf(obj)
- kind := typ.Kind()
- for kind == reflect.Ptr {
- typ = typ.Elem()
- kind = typ.Kind()
- }
- return typ
+ return &plugin.ApiResourceOutput{Body: bps, Status: http.StatusOK}, nil
}
diff --git a/backend/helpers/pluginhelper/api/scope_helper_test.go b/backend/helpers/pluginhelper/api/scope_helper_test.go
index a7f405267..e812d1ab2 100644
--- a/backend/helpers/pluginhelper/api/scope_helper_test.go
+++ b/backend/helpers/pluginhelper/api/scope_helper_test.go
@@ -294,8 +294,11 @@ func TestScopeApiHelper_Put(t *testing.T) {
"updatedAt": "string",
"updatedDate": "string",
}}}}
+
+ params := &ReflectionParameters{}
+ dbHelper := NewScopeDatabaseHelperImpl[TestConnection, TestRepo, TestTransformationRule](mockRes, connHelper, params)
// create a mock ScopeApiHelper with a mock database connection
- apiHelper := NewScopeHelper[TestConnection, TestRepo, TestTransformationRule](mockRes, nil, connHelper)
+ apiHelper := NewScopeHelper2[TestConnection, TestRepo, TestTransformationRule](mockRes, nil, connHelper, dbHelper, params, nil)
// test a successful call to Put
_, err := apiHelper.Put(input)
assert.NoError(t, err)
diff --git a/backend/helpers/pluginhelper/services/blueprint_helper.go b/backend/helpers/pluginhelper/services/blueprint_helper.go
index d784be53c..6fa01e0d7 100644
--- a/backend/helpers/pluginhelper/services/blueprint_helper.go
+++ b/backend/helpers/pluginhelper/services/blueprint_helper.go
@@ -138,7 +138,7 @@ func (b *BlueprintManager) GetDbBlueprint(blueprintId uint64) (*models.Blueprint
return blueprint, nil
}
-// GetBlueprintsByScopes returns all blueprints that have these scopeIds
+// GetBlueprintsByScopes returns all blueprints that have these scopeIds and this connection Id
func (b *BlueprintManager) GetBlueprintsByScopes(connectionId uint64, scopeIds ...string) (map[string][]*models.Blueprint, errors.Error) {
bps, _, err := b.GetDbBlueprints(&GetBlueprintQuery{})
if err != nil {
diff --git a/backend/impls/dalgorm/dalgorm.go b/backend/impls/dalgorm/dalgorm.go
index 2e670d6b8..caa7f3a37 100644
--- a/backend/impls/dalgorm/dalgorm.go
+++ b/backend/impls/dalgorm/dalgorm.go
@@ -136,22 +136,23 @@ var _ dal.Dal = (*Dalgorm)(nil)
// Exec executes raw sql query
func (d *Dalgorm) Exec(query string, params ...interface{}) errors.Error {
- return errors.Convert(d.db.Exec(query, transformParams(params)...).Error)
+ return d.convertGormError(d.db.Exec(query, transformParams(params)...).Error)
}
// AutoMigrate runs auto migration for given models
func (d *Dalgorm) AutoMigrate(entity interface{}, clauses ...dal.Clause) errors.Error {
- err := errors.Convert(buildTx(d.db, clauses).AutoMigrate(entity))
+ err := buildTx(d.db, clauses).AutoMigrate(entity)
if err == nil {
// fix pg cache plan error
_ = d.First(entity, clauses...)
}
- return err
+ return d.convertGormError(err)
}
// Cursor returns a database cursor, cursor is especially useful when handling big amount of rows of data
func (d *Dalgorm) Cursor(clauses ...dal.Clause) (dal.Rows, errors.Error) {
- return errors.Convert01(buildTx(d.db, clauses).Rows())
+ rows, err := buildTx(d.db, clauses).Rows()
+ return rows, d.convertGormError(err)
}
// CursorTx FIXME ...
@@ -162,7 +163,7 @@ func (d *Dalgorm) CursorTx(clauses ...dal.Clause) *gorm.DB {
// Fetch loads row data from `cursor` into `dst`
func (d *Dalgorm) Fetch(cursor dal.Rows, dst interface{}) errors.Error {
if rows, ok := cursor.(*sql.Rows); ok {
- return errors.Convert(d.db.ScanRows(rows, dst))
+ return d.convertGormError(d.db.ScanRows(rows, dst))
} else {
return errors.Default.New(fmt.Sprintf("can not support type %s to be a dal.Rows interface", reflect.TypeOf(cursor).String()))
}
@@ -170,12 +171,12 @@ func (d *Dalgorm) Fetch(cursor dal.Rows, dst interface{}) errors.Error {
// All loads matched rows from database to `dst`, USE IT WITH COUTIOUS!!
func (d *Dalgorm) All(dst interface{}, clauses ...dal.Clause) errors.Error {
- return errors.Convert(buildTx(d.db, clauses).Find(dst).Error)
+ return d.convertGormError(buildTx(d.db, clauses).Find(dst).Error)
}
// First loads first matched row from database to `dst`, error will be returned if no records were found
func (d *Dalgorm) First(dst interface{}, clauses ...dal.Clause) errors.Error {
- return errors.Convert(buildTx(d.db, clauses).First(dst).Error)
+ return d.convertGormError(buildTx(d.db, clauses).First(dst).Error)
}
// Count total records
@@ -187,37 +188,37 @@ func (d *Dalgorm) Count(clauses ...dal.Clause) (int64, errors.Error) {
// Pluck used to query single column
func (d *Dalgorm) Pluck(column string, dest interface{}, clauses ...dal.Clause) errors.Error {
- return errors.Convert(buildTx(d.db, clauses).Pluck(column, dest).Error)
+ return d.convertGormError(buildTx(d.db, clauses).Pluck(column, dest).Error)
}
// Create insert record to database
func (d *Dalgorm) Create(entity interface{}, clauses ...dal.Clause) errors.Error {
- return errors.Convert(buildTx(d.db, clauses).Create(entity).Error)
+ return d.convertGormError(buildTx(d.db, clauses).Create(entity).Error)
}
// CreateWithMap insert record to database
func (d *Dalgorm) CreateWithMap(entity interface{}, record map[string]interface{}) errors.Error {
- return errors.Convert(buildTx(d.db, nil).Model(entity).Clauses(clause.OnConflict{UpdateAll: true}).Create(record).Error)
+ return d.convertGormError(buildTx(d.db, nil).Model(entity).Clauses(clause.OnConflict{UpdateAll: true}).Create(record).Error)
}
// Update updates record
func (d *Dalgorm) Update(entity interface{}, clauses ...dal.Clause) errors.Error {
- return errors.Convert(buildTx(d.db, clauses).Save(entity).Error)
+ return d.convertGormError(buildTx(d.db, clauses).Save(entity).Error)
}
// CreateOrUpdate tries to create the record, or fallback to update all if failed
func (d *Dalgorm) CreateOrUpdate(entity interface{}, clauses ...dal.Clause) errors.Error {
- return errors.Convert(buildTx(d.db, clauses).Clauses(clause.OnConflict{UpdateAll: true}).Create(entity).Error)
+ return d.convertGormError(buildTx(d.db, clauses).Clauses(clause.OnConflict{UpdateAll: true}).Create(entity).Error)
}
// CreateIfNotExist tries to create the record if not exist
func (d *Dalgorm) CreateIfNotExist(entity interface{}, clauses ...dal.Clause) errors.Error {
- return errors.Convert(buildTx(d.db, clauses).Clauses(clause.OnConflict{DoNothing: true}).Create(entity).Error)
+ return d.convertGormError(buildTx(d.db, clauses).Clauses(clause.OnConflict{DoNothing: true}).Create(entity).Error)
}
// Delete records from database
func (d *Dalgorm) Delete(entity interface{}, clauses ...dal.Clause) errors.Error {
- return errors.Convert(buildTx(d.db, clauses).Delete(entity).Error)
+ return d.convertGormError(buildTx(d.db, clauses).Delete(entity).Error)
}
// UpdateColumn allows you to update mulitple records
@@ -226,7 +227,7 @@ func (d *Dalgorm) UpdateColumn(entityOrTable interface{}, columnName string, val
value = gorm.Expr(expr.Expr, transformParams(expr.Params)...)
}
clauses = append(clauses, dal.From(entityOrTable))
- return errors.Convert(buildTx(d.db, clauses).Update(columnName, value).Error)
+ return d.convertGormError(buildTx(d.db, clauses).Update(columnName, value).Error)
}
// UpdateColumns allows you to update multiple columns of mulitple records
@@ -241,19 +242,19 @@ func (d *Dalgorm) UpdateColumns(entityOrTable interface{}, set []dal.DalSet, cla
}
clauses = append(clauses, dal.From(entityOrTable))
- return errors.Convert(buildTx(d.db, clauses).Updates(updatesSet).Error)
+ return d.convertGormError(buildTx(d.db, clauses).Updates(updatesSet).Error)
}
// UpdateAllColumn updated all Columns of entity
func (d *Dalgorm) UpdateAllColumn(entity interface{}, clauses ...dal.Clause) errors.Error {
- return errors.Convert(buildTx(d.db, clauses).UpdateColumns(entity).Error)
+ return d.convertGormError(buildTx(d.db, clauses).UpdateColumns(entity).Error)
}
// GetColumns FIXME ...
func (d *Dalgorm) GetColumns(dst dal.Tabler, filter func(columnMeta dal.ColumnMeta) bool) (cms []dal.ColumnMeta, _ errors.Error) {
columnTypes, err := d.db.Migrator().ColumnTypes(dst.TableName())
if err != nil {
- return nil, errors.Convert(err)
+ return nil, d.convertGormError(err)
}
for _, columnType := range columnTypes {
if filter == nil {
@@ -262,7 +263,7 @@ func (d *Dalgorm) GetColumns(dst dal.Tabler, filter func(columnMeta dal.ColumnMe
cms = append(cms, columnType)
}
}
- return errors.Convert01(cms, nil)
+ return cms, nil
}
// AddColumn add one column for the table
@@ -286,7 +287,7 @@ func (d *Dalgorm) DropColumns(table string, columnNames ...string) errors.Error
err := d.Exec("ALTER TABLE ? DROP COLUMN ?", clause.Table{Name: table}, clause.Column{Name: columnName})
// err := d.db.Migrator().DropColumn(table, columnName)
if err != nil {
- return errors.Convert(err)
+ return d.convertGormError(err)
}
}
return nil
@@ -325,7 +326,7 @@ func (d *Dalgorm) AllTables() ([]string, errors.Error) {
var tables []string
err := d.db.Raw(tableSql).Scan(&tables).Error
if err != nil {
- return nil, errors.Convert(err)
+ return nil, d.convertGormError(err)
}
var filteredTables []string
for _, table := range tables {
@@ -338,7 +339,7 @@ func (d *Dalgorm) AllTables() ([]string, errors.Error) {
// DropTables drop multiple tables by Model Pointer or Table Name
func (d *Dalgorm) DropTables(dst ...interface{}) errors.Error {
- return errors.Convert(d.db.Migrator().DropTable(dst...))
+ return d.convertGormError(d.db.Migrator().DropTable(dst...))
}
// HasTable checks if table exists
@@ -348,7 +349,8 @@ func (d *Dalgorm) HasTable(table interface{}) bool {
// RenameTable renames table name
func (d *Dalgorm) RenameTable(oldName, newName string) errors.Error {
- return errors.Convert(d.db.Migrator().RenameTable(oldName, newName))
+ err := d.db.Migrator().RenameTable(oldName, newName)
+ return d.convertGormError(err)
}
// DropIndexes drops indexes for specified table
@@ -356,7 +358,7 @@ func (d *Dalgorm) DropIndexes(table string, indexNames ...string) errors.Error {
for _, indexName := range indexNames {
err := d.db.Migrator().DropIndex(table, indexName)
if err != nil {
- return errors.Convert(err)
+ return d.convertGormError(err)
}
}
return nil
@@ -382,21 +384,35 @@ func (d *Dalgorm) Begin() dal.Transaction {
}
// IsErrorNotFound checking if the sql error is not found.
-func (d *Dalgorm) IsErrorNotFound(err errors.Error) bool {
+func (d *Dalgorm) IsErrorNotFound(err error) bool {
return errors.Is(err, gorm.ErrRecordNotFound)
}
// IsDuplicationError checking if the sql error is not found.
-func (d *Dalgorm) IsDuplicationError(err errors.Error) bool {
+func (d *Dalgorm) IsDuplicationError(err error) bool {
return strings.Contains(strings.ToLower(err.Error()), "duplicate")
}
// RawCursor (Deprecated) executes raw sql query and returns a database cursor
func (d *Dalgorm) RawCursor(query string, params ...interface{}) (*sql.Rows, errors.Error) {
- return errors.Convert01(d.db.Raw(query, params...).Rows())
+ rows, err := d.db.Raw(query, params...).Rows()
+ return rows, d.convertGormError(err)
}
// NewDalgorm creates a *Dalgorm
func NewDalgorm(db *gorm.DB) *Dalgorm {
return &Dalgorm{db}
}
+
+func (d *Dalgorm) convertGormError(err error) errors.Error {
+ if err == nil {
+ return nil
+ }
+ if d.IsErrorNotFound(err) {
+ return errors.NotFound.WrapRaw(err)
+ }
+ if d.IsDuplicationError(err) {
+ return errors.BadInput.WrapRaw(err)
+ }
+ return errors.Default.WrapRaw(err)
+}
diff --git a/backend/server/services/remote/models/plugin_remote.go b/backend/impls/dalgorm/db_mapper.go
similarity index 54%
copy from backend/server/services/remote/models/plugin_remote.go
copy to backend/impls/dalgorm/db_mapper.go
index a8984ea56..01c953231 100644
--- a/backend/server/services/remote/models/plugin_remote.go
+++ b/backend/impls/dalgorm/db_mapper.go
@@ -15,18 +15,33 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package models
+package dalgorm
import (
- "github.com/apache/incubator-devlake/core/errors"
- "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+ "gorm.io/gorm/schema"
+ "reflect"
)
-// RemotePlugin API supported by plugins running in different/remote processes
-type RemotePlugin interface {
- plugin.PluginApi
- plugin.PluginTask
- plugin.PluginMeta
- plugin.PluginOpenApiSpec
- RunMigrations(forceMigrate bool) errors.Error
+// ToDatabaseMap convert the map to a format that can be inserted into a SQL database
+func ToDatabaseMap(tableName string, m map[string]any) map[string]any {
+ strategy := schema.NamingStrategy{}
+ newMap := map[string]any{}
+ for k, v := range m {
+ k = strategy.ColumnName(tableName, k)
+ if reflect.ValueOf(v).IsZero() {
+ continue
+ }
+ if str, ok := v.(string); ok {
+ t, err := api.ConvertStringToTime(str)
+ if err == nil {
+ if t.Second() == 0 {
+ continue
+ }
+ v = t
+ }
+ }
+ newMap[k] = v
+ }
+ return newMap
}
diff --git a/backend/plugins/pagerduty/api/init.go b/backend/plugins/pagerduty/api/init.go
index 52568e174..80c2fbcdd 100644
--- a/backend/plugins/pagerduty/api/init.go
+++ b/backend/plugins/pagerduty/api/init.go
@@ -39,10 +39,19 @@ func Init(br context.BasicRes) {
basicRes,
vld,
)
- scopeHelper = api.NewScopeHelper[models.PagerDutyConnection, models.Service, models.PagerdutyTransformationRule](
+ params := &api.ReflectionParameters{
+ ScopeIdFieldName: "Id",
+ ScopeIdColumnName: "id",
+ RawScopeParamName: "ScopeId",
+ }
+ scopeHelper = api.NewScopeHelper2[models.PagerDutyConnection, models.Service, models.PagerdutyTransformationRule](
basicRes,
vld,
connectionHelper,
+ api.NewScopeDatabaseHelperImpl[models.PagerDutyConnection, models.Service, models.PagerdutyTransformationRule](
+ basicRes, connectionHelper, params),
+ params,
+ &api.ScopeHelperOptions{},
)
trHelper = api.NewTransformationRuleHelper[models.PagerdutyTransformationRule](
basicRes,
diff --git a/backend/plugins/pagerduty/api/scope.go b/backend/plugins/pagerduty/api/scope.go
index ec5996c77..7560fa570 100644
--- a/backend/plugins/pagerduty/api/scope.go
+++ b/backend/plugins/pagerduty/api/scope.go
@@ -74,7 +74,7 @@ func UpdateScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, err
// @Failure 500 {object} shared.ApiBody "Internal Error"
// @Router /plugins/pagerduty/connections/{connectionId}/scopes/ [GET]
func GetScopeList(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- return scopeHelper.GetScopeList(input, "Id")
+ return scopeHelper.GetScopeList(input)
}
// GetScope get one PagerDuty service
@@ -89,7 +89,7 @@ func GetScopeList(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, er
// @Failure 500 {object} shared.ApiBody "Internal Error"
// @Router /plugins/pagerduty/connections/{connectionId}/scopes/{serviceId} [GET]
func GetScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- return scopeHelper.GetScope(input, "id")
+ return scopeHelper.GetScope(input, "")
}
// DeleteScope delete plugin data associated with the scope and optionally the scope itself
@@ -104,5 +104,5 @@ func GetScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors
// @Failure 500 {object} shared.ApiBody "Internal Error"
// @Router /plugins/pagerduty/connections/{connectionId}/scopes/{serviceId} [DELETE]
func DeleteScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- return scopeHelper.DeleteScope(input, "Id", "ScopeId", nil)
+ return scopeHelper.Delete(input)
}
diff --git a/backend/python/pydevlake/pydevlake/message.py b/backend/python/pydevlake/pydevlake/message.py
index 7bdda2658..0cb7c23cd 100644
--- a/backend/python/pydevlake/pydevlake/message.py
+++ b/backend/python/pydevlake/pydevlake/message.py
@@ -64,6 +64,7 @@ class PluginInfo(Message):
subtask_metas: list[SubtaskMeta]
extension: str = "datasource"
type: str = "python-poetry"
+ tables: list[str]
class RemoteProgress(Message):
diff --git a/backend/python/pydevlake/pydevlake/plugin.py b/backend/python/pydevlake/pydevlake/plugin.py
index 4958e729e..d543fe791 100644
--- a/backend/python/pydevlake/pydevlake/plugin.py
+++ b/backend/python/pydevlake/pydevlake/plugin.py
@@ -217,7 +217,7 @@ class Plugin(ABC):
def get_stream(self, stream_name: str):
stream = self._streams.get(stream_name)
if stream is None:
- raise Exception(f'Unkown stream {stream_name}')
+ raise Exception(f'Unknown stream {stream_name}')
return stream
def plugin_info(self) -> msg.PluginInfo:
@@ -233,12 +233,12 @@ class Plugin(ABC):
)
for subtask in self.subtasks
]
-
if self.transformation_rule_type:
tx_rule_model_info = msg.DynamicModelInfo.from_model(self.transformation_rule_type)
else:
tx_rule_model_info = None
-
+ plugin_tables = [stream(self.name).raw_model_table for stream in self.streams] + \
+ [stream.tool_model.__tablename__ for stream in self.streams]
return msg.PluginInfo(
name=self.name,
description=self.description,
@@ -247,7 +247,8 @@ class Plugin(ABC):
connection_model_info=msg.DynamicModelInfo.from_model(self.connection_type),
transformation_rule_model_info=tx_rule_model_info,
scope_model_info=msg.DynamicModelInfo.from_model(self.tool_scope_type),
- subtask_metas=subtask_metas
+ subtask_metas=subtask_metas,
+ tables=plugin_tables,
)
def _plugin_path(self):
diff --git a/backend/python/pydevlake/pydevlake/stream.py b/backend/python/pydevlake/pydevlake/stream.py
index ae7e421bd..91591bd6c 100644
--- a/backend/python/pydevlake/pydevlake/stream.py
+++ b/backend/python/pydevlake/pydevlake/stream.py
@@ -66,7 +66,7 @@ class Stream:
if self._raw_model is not None:
return self._raw_model
- table_name = f'_raw_{self.plugin_name}_{self.name}'
+ table_name = self.raw_model_table
# Look for existing raw model
for mapper in RawModel._sa_registry.mappers:
@@ -84,6 +84,10 @@ class Stream:
table.create(session.get_bind(), checkfirst=True)
return self._raw_model
+ @property
+ def raw_model_table(self):
+ return f'_raw_{self.plugin_name}_{self.name}'
+
def collect(self, state, context) -> Iterable[tuple[object, dict]]:
pass
diff --git a/backend/python/pydevlake/pydevlake/subtasks.py b/backend/python/pydevlake/pydevlake/subtasks.py
index c343c359f..3193bbad5 100644
--- a/backend/python/pydevlake/pydevlake/subtasks.py
+++ b/backend/python/pydevlake/pydevlake/subtasks.py
@@ -130,7 +130,7 @@ class Subtask:
return json.dumps({
"connection_id": ctx.connection.id,
"scope_id": ctx.scope.id
- })
+ }, separators=(',', ':'))
class Collector(Subtask):
diff --git a/backend/python/pydevlake/tests/stream_test.py b/backend/python/pydevlake/tests/stream_test.py
index d1caee668..362d0da6b 100644
--- a/backend/python/pydevlake/tests/stream_test.py
+++ b/backend/python/pydevlake/tests/stream_test.py
@@ -111,7 +111,7 @@ def test_extract_data(stream, raw_data, ctx):
with Session(ctx.engine) as session:
for each in raw_data:
raw_model = stream.raw_model(session)
- raw_model.params = json.dumps({"connection_id": ctx.connection.id, "scope_id": ctx.scope.id})
+ raw_model.params = json.dumps({"connection_id": ctx.connection.id, "scope_id": ctx.scope.id}, separators=(',', ':'))
session.add(raw_model(data=json.dumps(each)))
session.commit()
@@ -137,7 +137,7 @@ def test_convert_data(stream, raw_data, ctx):
connection_id=ctx.connection.id,
name=each["n"],
raw_data_table="_raw_dummy_model",
- raw_data_params=json.dumps({"connection_id": ctx.connection.id, "scope_id": ctx.scope.id})
+ raw_data_params=json.dumps({"connection_id": ctx.connection.id, "scope_id": ctx.scope.id}, separators=(',', ':'))
)
)
session.commit()
diff --git a/backend/resources/swagger/open_api_spec.json.tmpl b/backend/resources/swagger/open_api_spec.json.tmpl
index eda7eb40a..ad37b235e 100644
--- a/backend/resources/swagger/open_api_spec.json.tmpl
+++ b/backend/resources/swagger/open_api_spec.json.tmpl
@@ -147,6 +147,15 @@
},
{
"$$ref": "#/components/parameters/scopeId"
+ },
+ {
+ "name": "blueprints",
+ "required": false,
+ "description": "return blueprints using these scopes in the payload",
+ "in": "query",
+ "schema": {
+ "$$ref": "bool"
+ }
}
],
"responses": {
@@ -190,6 +199,33 @@
}
}
}
+ },
+ "delete": {
+ "description": "Delete a scope and its associated data",
+ "parameters": [
+ {
+ "$$ref": "#/components/parameters/connectionId"
+ },
+ {
+ "$$ref": "#/components/parameters/scopeId"
+ },
+ {
+ "name": "scope",
+ "required": true,
+ "in": "body",
+ "schema": {
+ "$$ref": "#/components/schemas/scope"
+ }
+ },
+ {
+ "name": "delete_data_only",
+ "required": false,
+ "in": "query",
+ "schema": {
+ "$$ref": "bool"
+ }
+ }
+ ]
}
},
"/plugins/{{.PluginName}}/connections/{connectionId}/scopes": {
@@ -201,6 +237,15 @@
},
{
"$$ref": "#/components/parameters/page"
+ },
+ {
+ "name": "blueprints",
+ "required": false,
+ "description": "return blueprints using these scopes in the payload",
+ "in": "query",
+ "schema": {
+ "$$ref": "bool"
+ }
}
],
"responses": {
diff --git a/backend/server/services/remote/models/conversion.go b/backend/server/services/remote/models/conversion.go
index 651947236..bd221ecaa 100644
--- a/backend/server/services/remote/models/conversion.go
+++ b/backend/server/services/remote/models/conversion.go
@@ -18,9 +18,12 @@ limitations under the License.
package models
import (
+ "encoding/json"
"fmt"
+ "github.com/apache/incubator-devlake/impls/dalgorm"
"reflect"
"strings"
+ "time"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
@@ -68,6 +71,33 @@ func GenerateStructType(schema map[string]any, encrypt bool, baseType reflect.Ty
return reflect.StructOf(structFields), nil
}
+func MapTo(x any, y any) errors.Error {
+ b, err := json.Marshal(x)
+ if err != nil {
+ return errors.Convert(err)
+ }
+ if err = json.Unmarshal(b, y); err != nil {
+ return errors.Convert(err)
+ }
+ return nil
+}
+
+func ToDatabaseMap(tableName string, ifc any, createdAt *time.Time, updatedAt *time.Time) (map[string]any, errors.Error) {
+ m := map[string]any{}
+ err := MapTo(ifc, &m)
+ if err != nil {
+ return nil, err
+ }
+ if createdAt != nil {
+ m["createdAt"] = createdAt
+ }
+ if updatedAt != nil {
+ m["updatedAt"] = updatedAt
+ }
+ m = dalgorm.ToDatabaseMap(tableName, m)
+ return m, nil
+}
+
func isBaseTypeField(fieldName string, baseType reflect.Type) bool {
fieldName = canonicalFieldName(fieldName)
for i := 0; i < baseType.NumField(); i++ {
diff --git a/backend/server/services/remote/models/models.go b/backend/server/services/remote/models/models.go
index a12553ed6..b220114e1 100644
--- a/backend/server/services/remote/models/models.go
+++ b/backend/server/services/remote/models/models.go
@@ -18,8 +18,6 @@ limitations under the License.
package models
import (
- "time"
-
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/models/common"
@@ -49,8 +47,16 @@ type PluginInfo struct {
Description string `json:"description"`
PluginPath string `json:"plugin_path" validate:"required"`
SubtaskMetas []SubtaskMeta `json:"subtask_metas" validate:"dive"`
+ Tables []string `json:"tables"`
}
+// Type aliases used by the API helper for better readability
+type (
+ RemoteScope any
+ RemoteTransformation any
+ RemoteConnection any
+)
+
type DynamicModelInfo struct {
JsonSchema map[string]any `json:"json_schema" validate:"required"`
TableName string `json:"table_name" validate:"required"`
@@ -61,7 +67,7 @@ func (d DynamicModelInfo) LoadDynamicTabler(encrypt bool, parentModel any) (*mod
}
type ScopeModel struct {
- common.NoPKModel `json:"-"`
+ common.NoPKModel `swaggerignore:"true"`
Id string `gorm:"primarykey;type:varchar(255)" json:"id"`
ConnectionId uint64 `gorm:"primaryKey" json:"connectionId"`
Name string `json:"name" validate:"required"`
@@ -69,11 +75,9 @@ type ScopeModel struct {
}
type TransformationModel struct {
- Id uint64 `gorm:"primaryKey" json:"id"`
- ConnectionId uint64 `json:"connectionId"`
- Name string `json:"name"`
- CreatedAt time.Time `json:"createdAt"`
- UpdatedAt time.Time `json:"updatedAt"`
+ common.Model
+ ConnectionId uint64 `json:"connectionId"`
+ Name string `json:"name"`
}
type SubtaskMeta struct {
diff --git a/backend/server/services/remote/models/plugin_remote.go b/backend/server/services/remote/models/plugin_remote.go
index a8984ea56..e039912a6 100644
--- a/backend/server/services/remote/models/plugin_remote.go
+++ b/backend/server/services/remote/models/plugin_remote.go
@@ -28,5 +28,6 @@ type RemotePlugin interface {
plugin.PluginTask
plugin.PluginMeta
plugin.PluginOpenApiSpec
+ plugin.PluginModel
RunMigrations(forceMigrate bool) errors.Error
}
diff --git a/backend/server/services/remote/plugin/default_api.go b/backend/server/services/remote/plugin/default_api.go
index d5044b251..31fc4a569 100644
--- a/backend/server/services/remote/plugin/default_api.go
+++ b/backend/server/services/remote/plugin/default_api.go
@@ -22,6 +22,7 @@ import (
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
"github.com/apache/incubator-devlake/server/services/remote/bridge"
+ remoteModel "github.com/apache/incubator-devlake/server/services/remote/models"
)
type pluginAPI struct {
@@ -65,8 +66,9 @@ func GetDefaultAPI(
"GET": papi.ListScopes,
},
"connections/:connectionId/scopes/:scopeId": {
- "GET": papi.GetScope,
- "PATCH": papi.PatchScope,
+ "GET": papi.GetScope,
+ "PATCH": papi.UpdateScope,
+ "DELETE": papi.DeleteScope,
},
"connections/:connectionId/remote-scopes": {
"GET": papi.GetRemoteScopes,
@@ -86,6 +88,22 @@ func GetDefaultAPI(
"PATCH": papi.PatchTransformationRule,
}
}
-
+ scopeHelper = createScopeHelper(papi)
return resources
}
+
+func createScopeHelper(pa *pluginAPI) *api.GenericScopeApiHelper[remoteModel.RemoteConnection, remoteModel.RemoteScope, remoteModel.RemoteTransformation] {
+ params := &api.ReflectionParameters{
+ ScopeIdFieldName: "Id",
+ ScopeIdColumnName: "id",
+ RawScopeParamName: "scope_id",
+ }
+ return api.NewGenericScopeHelper[remoteModel.RemoteConnection, remoteModel.RemoteScope, remoteModel.RemoteTransformation](
+ basicRes,
+ nil,
+ connectionHelper,
+ NewScopeDatabaseHelperImpl(pa, basicRes, params),
+ params,
+ &api.ScopeHelperOptions{},
+ )
+}
diff --git a/backend/server/services/remote/plugin/init.go b/backend/server/services/remote/plugin/init.go
index 6698a83d1..3401acba6 100644
--- a/backend/server/services/remote/plugin/init.go
+++ b/backend/server/services/remote/plugin/init.go
@@ -28,6 +28,7 @@ import (
var (
connectionHelper *api.ConnectionApiHelper
+ scopeHelper *api.GenericScopeApiHelper[models.RemoteConnection, models.RemoteScope, models.RemoteTransformation]
basicRes context.BasicRes
vld *validator.Validate
)
diff --git a/backend/server/services/remote/plugin/plugin_extensions.go b/backend/server/services/remote/plugin/plugin_extensions.go
index 17c241a3a..538e50f38 100644
--- a/backend/server/services/remote/plugin/plugin_extensions.go
+++ b/backend/server/services/remote/plugin/plugin_extensions.go
@@ -19,6 +19,7 @@ package plugin
import (
"encoding/json"
+ "fmt"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
@@ -54,7 +55,7 @@ func (p remoteDatasourcePlugin) MakeDataSourcePipelinePlanV200(connectionId uint
wrappedToolScope := p.scopeTabler.New()
err = api.CallDB(db.First, wrappedToolScope, dal.Where("id = ?", bpScope.Id))
if err != nil {
- return nil, nil, errors.NotFound.New("record not found")
+ return nil, nil, errors.Default.Wrap(err, fmt.Sprintf("error getting scope %s", bpScope.Name))
}
toolScope := models.ScopeModel{}
err := wrappedToolScope.To(&toolScope)
diff --git a/backend/server/services/remote/plugin/plugin_impl.go b/backend/server/services/remote/plugin/plugin_impl.go
index e46bd3157..259d2300c 100644
--- a/backend/server/services/remote/plugin/plugin_impl.go
+++ b/backend/server/services/remote/plugin/plugin_impl.go
@@ -43,6 +43,7 @@ type (
transformationRuleTabler *coreModels.DynamicTabler
resources map[string]map[string]plugin.ApiResourceHandler
openApiSpec string
+ tables []dal.Tabler
}
RemotePluginTaskData struct {
DbUrl string `json:"db_url"`
@@ -96,6 +97,9 @@ func newPlugin(info *models.PluginInfo, invoker bridge.Invoker) (*remotePluginIm
DomainTypes: subtask.DomainTypes,
})
}
+ for _, tableName := range info.Tables {
+ p.tables = append(p.tables, coreModels.NewDynamicTabler(tableName, nil))
+ }
return &p, nil
}
@@ -103,6 +107,10 @@ func (p *remotePluginImpl) SubTaskMetas() []plugin.SubTaskMeta {
return p.subtaskMetas
}
+func (p *remotePluginImpl) GetTablesInfo() []dal.Tabler {
+ return p.tables
+}
+
func (p *remotePluginImpl) PrepareTaskData(taskCtx plugin.TaskContext, options map[string]interface{}) (interface{}, errors.Error) {
dbUrl := taskCtx.GetConfig("db_url")
connectionId := uint64(options["connectionId"].(float64))
diff --git a/backend/server/services/remote/plugin/scope_api.go b/backend/server/services/remote/plugin/scope_api.go
index dada4db9b..77fd71a11 100644
--- a/backend/server/services/remote/plugin/scope_api.go
+++ b/backend/server/services/remote/plugin/scope_api.go
@@ -18,244 +18,112 @@ limitations under the License.
package plugin
import (
- "encoding/json"
- "net/http"
- "strconv"
-
"github.com/apache/incubator-devlake/server/services/remote/models"
-
"github.com/mitchellh/mapstructure"
+ "net/http"
- "github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
)
-// DTO that includes the transformation rule name
-type apiScopeResponse struct {
- Scope any `json:"-"`
- TransformationRuleName string `json:"transformationRuleName,omitempty"`
-}
-
-// MarshalJSON make Scope display inline
-func (r apiScopeResponse) MarshalJSON() ([]byte, error) {
- // encode scope to map
- scopeBytes, err := json.Marshal(r.Scope)
- if err != nil {
- return nil, err
- }
- var scopeMap map[string]interface{}
- err = json.Unmarshal(scopeBytes, &scopeMap)
- if err != nil {
- return nil, err
- }
-
- // encode other column (transformationRuleName) to map
- otherBytes, err := json.Marshal(struct {
- TransformationRuleName string `json:"transformationRuleName,omitempty"`
- }{
- TransformationRuleName: r.TransformationRuleName,
- })
- if err != nil {
- return nil, err
- }
-
- // merge the two maps
- var merged map[string]interface{}
- err = json.Unmarshal(otherBytes, &merged)
- if err != nil {
- return nil, err
- }
- for k, v := range scopeMap {
- merged[k] = v
- }
-
- // encode the merged map to JSON
- return json.Marshal(merged)
-}
-
type request struct {
Data []map[string]any `json:"data"`
}
func (pa *pluginAPI) PutScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- connectionId, _ := extractParam(input.Params)
- if connectionId == 0 {
- return nil, errors.BadInput.New("invalid connectionId")
- }
var scopes request
err := errors.Convert(mapstructure.Decode(input.Body, &scopes))
if err != nil {
return nil, errors.BadInput.Wrap(err, "decoding scope error")
}
- keeper := make(map[string]struct{})
- var createdScopes []any
- for _, scopeRaw := range scopes.Data {
- err = verifyScope(scopeRaw)
+ var slice []*models.RemoteScope
+ for _, scope := range scopes.Data {
+ obj := pa.scopeType.NewValue().(models.RemoteScope)
+ err = models.MapTo(scope, obj)
if err != nil {
return nil, err
}
- scopeId := scopeRaw["id"].(string)
- if _, ok := keeper[scopeId]; ok {
- return nil, errors.BadInput.New("duplicated item")
- } else {
- keeper[scopeId] = struct{}{}
- }
- scope := pa.scopeType.New()
- err = scope.From(&scopeRaw)
- if err != nil {
- return nil, err
- }
- // I don't know the reflection logic to do this in a batch...
- err = api.CallDB(basicRes.GetDal().CreateOrUpdate, scope)
- if err != nil {
- return nil, errors.Default.Wrap(err, "error on saving scope")
- }
- createdScopes = append(createdScopes, scope.Unwrap())
- }
-
- return &plugin.ApiResourceOutput{Body: createdScopes, Status: http.StatusOK}, nil
-}
-
-func (pa *pluginAPI) PatchScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- connectionId, scopeId := extractParam(input.Params)
- if connectionId == 0 {
- return nil, errors.BadInput.New("invalid connectionId")
+ slice = append(slice, &obj)
}
- db := basicRes.GetDal()
- scope := pa.scopeType.New()
- err := api.CallDB(db.First, scope, dal.Where("connection_id = ? AND id = ?", connectionId, scopeId))
+ apiScopes, err := scopeHelper.PutScopes(input, slice)
if err != nil {
- return nil, errors.Default.Wrap(err, "scope not found")
+ return nil, err
}
- err = verifyScope(input.Body)
+ response, err := convertScopeResponse(apiScopes...)
if err != nil {
return nil, err
}
- err = scope.From(&input.Body)
+ return &plugin.ApiResourceOutput{Body: response, Status: http.StatusOK}, nil
+}
+
+func (pa *pluginAPI) UpdateScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ apiScopes, err := scopeHelper.UpdateScope(input)
if err != nil {
- return nil, errors.Default.Wrap(err, "patch scope error")
+ return nil, err
}
- err = api.CallDB(db.Update, scope)
+ response, err := convertScopeResponse(apiScopes)
if err != nil {
- return nil, errors.Default.Wrap(err, "error on saving scope")
+ return nil, err
}
- return &plugin.ApiResourceOutput{Body: scope.Unwrap(), Status: http.StatusOK}, nil
+ return &plugin.ApiResourceOutput{Body: response[0], Status: http.StatusOK}, nil
}
func (pa *pluginAPI) ListScopes(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- connectionId, _ := extractParam(input.Params)
- if connectionId == 0 {
- return nil, errors.BadInput.New("invalid connectionId")
- }
- limit, offset := api.GetLimitOffset(input.Query, "pageSize", "page")
-
- if limit > 100 {
- return nil, errors.BadInput.New("Page limit cannot exceed 100")
- }
- db := basicRes.GetDal()
- scopes := pa.scopeType.NewSlice()
- err := api.CallDB(db.All, scopes, dal.Where("connection_id = ?", connectionId), dal.Limit(limit), dal.Offset(offset))
+ scopes, err := scopeHelper.GetScopes(input)
if err != nil {
return nil, err
}
- var scopeMap []map[string]any
- err = scopes.To(&scopeMap)
+ response, err := convertScopeResponse(scopes...)
if err != nil {
return nil, err
}
- if pa.txRuleType == nil {
- var apiScopes []apiScopeResponse
- for _, scope := range scopeMap {
- apiScopes = append(apiScopes, apiScopeResponse{Scope: scope})
- }
- return &plugin.ApiResourceOutput{Body: apiScopes, Status: http.StatusOK}, nil
- }
- var ruleIds []uint64
- for _, scopeModel := range scopeMap {
- if tid := uint64(scopeModel["transformationRuleId"].(float64)); tid > 0 {
- ruleIds = append(ruleIds, tid)
- }
- }
- rules := pa.txRuleType.NewSlice()
- if len(ruleIds) > 0 {
- err = api.CallDB(db.All, rules, dal.Select("id, name"),
- dal.Where("id IN (?)", ruleIds))
- if err != nil {
- return nil, err
- }
- }
- var transformationModels []models.TransformationModel
- err = rules.To(&transformationModels)
- if err != nil {
- return nil, err
- }
- names := make(map[uint64]string)
- for _, t := range transformationModels {
- names[t.Id] = t.Name
- }
- var apiScopes []apiScopeResponse
- for _, scope := range scopeMap {
- txRuleName, ok := names[uint64(scope["transformationRuleId"].(float64))]
- if ok {
- scopeRes := apiScopeResponse{
- Scope: scope,
- TransformationRuleName: txRuleName,
- }
- apiScopes = append(apiScopes, scopeRes)
- }
- }
-
- return &plugin.ApiResourceOutput{Body: apiScopes, Status: http.StatusOK}, nil
+ return &plugin.ApiResourceOutput{Body: response, Status: http.StatusOK}, nil
}
func (pa *pluginAPI) GetScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
- connectionId, scopeId := extractParam(input.Params)
- if connectionId == 0 {
- return nil, errors.BadInput.New("invalid connectionId")
- }
- if scopeId == `` {
- return nil, errors.BadInput.New("invalid scopeId")
- }
- rawScope := pa.scopeType.New()
- db := basicRes.GetDal()
- err := api.CallDB(db.First, rawScope, dal.Where("connection_id = ? AND id = ?", connectionId, scopeId))
- if db.IsErrorNotFound(err) {
- return nil, errors.NotFound.New("record not found")
- }
+ scope, err := scopeHelper.GetScope(input)
if err != nil {
return nil, err
}
- var scope models.ScopeModel
- err = rawScope.To(&scope)
+ response, err := convertScopeResponse(scope)
if err != nil {
return nil, err
}
- var rule models.TransformationModel
- if scope.TransformationRuleId > 0 {
- err = api.CallDB(db.First, &rule, dal.From(pa.txRuleType.TableName()), dal.Where("id = ?", scope.TransformationRuleId))
- if err != nil {
- return nil, errors.Default.Wrap(err, `no related transformationRule for scope`)
- }
- }
- return &plugin.ApiResourceOutput{Body: apiScopeResponse{rawScope.Unwrap(), rule.Name}, Status: http.StatusOK}, nil
-}
-
-func extractParam(params map[string]string) (uint64, string) {
- connectionId, _ := strconv.ParseUint(params["connectionId"], 10, 64)
- scopeId := params["scopeId"]
- return connectionId, scopeId
+ return &plugin.ApiResourceOutput{Body: response[0], Status: http.StatusOK}, nil
}
-func verifyScope(scope map[string]any) errors.Error {
- if connectionId, ok := scope["connectionId"]; !ok || connectionId.(float64) == 0 {
- return errors.BadInput.New("invalid connectionId")
+func (pa *pluginAPI) DeleteScope(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+ bps, err := scopeHelper.DeleteScope(input)
+ if err != nil {
+ return nil, err
}
+ return &plugin.ApiResourceOutput{Body: bps, Status: http.StatusOK}, nil
+}
- if scope["id"] == "" {
- return errors.BadInput.New("invalid scope ID")
+// convertScopeResponse adapt the "remote" scopes to a serializable api.ScopeRes
+func convertScopeResponse(scopes ...*api.ScopeRes[models.RemoteScope]) ([]map[string]any, errors.Error) {
+ var responses []map[string]any
+ for _, scope := range scopes {
+ resMap := map[string]any{}
+ err := models.MapTo(api.ScopeRes[map[string]any]{
+ Scope: nil, //ignore intentionally
+ TransformationRuleName: scope.TransformationRuleName,
+ Blueprints: scope.Blueprints,
+ }, &resMap)
+ if err != nil {
+ return nil, err
+ }
+ scopeMap := map[string]any{}
+ err = models.MapTo(scope.Scope, &scopeMap)
+ if err != nil {
+ return nil, err
+ }
+ delete(resMap, "Scope")
+ for k, v := range scopeMap {
+ resMap[k] = v
+ }
+ responses = append(responses, resMap)
}
-
- return nil
+ return responses, nil
}
diff --git a/backend/server/services/remote/plugin/scope_db_helper.go b/backend/server/services/remote/plugin/scope_db_helper.go
new file mode 100644
index 000000000..e9fdac371
--- /dev/null
+++ b/backend/server/services/remote/plugin/scope_db_helper.go
@@ -0,0 +1,147 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package plugin
+
+import (
+ "fmt"
+ "github.com/apache/incubator-devlake/core/context"
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+ "github.com/apache/incubator-devlake/server/services/remote/models"
+ "reflect"
+ "time"
+)
+
+type ScopeDatabaseHelperImpl struct {
+ api.ScopeDatabaseHelper[models.RemoteConnection, models.RemoteScope, models.RemoteTransformation]
+ pa *pluginAPI
+ db dal.Dal
+ params *api.ReflectionParameters
+ connHelper *api.ConnectionApiHelper
+}
+
+func NewScopeDatabaseHelperImpl(pa *pluginAPI, basicRes context.BasicRes, params *api.ReflectionParameters) *ScopeDatabaseHelperImpl {
+ return &ScopeDatabaseHelperImpl{
+ pa: pa,
+ db: basicRes.GetDal(),
+ params: params,
+ connHelper: connectionHelper,
+ }
+}
+
+func (s *ScopeDatabaseHelperImpl) VerifyConnection(connectionId uint64) errors.Error {
+ conn := s.pa.connType.New()
+ err := s.connHelper.FirstById(conn, connectionId)
+ if err != nil {
+ if s.db.IsErrorNotFound(err) {
+ return errors.BadInput.New("Invalid Connection Id")
+ }
+ return err
+ }
+ return nil
+}
+
+func (s *ScopeDatabaseHelperImpl) SaveScope(scopes []*models.RemoteScope) errors.Error {
+ now := time.Now()
+ return s.save(scopes, &now, &now)
+}
+
+func (s *ScopeDatabaseHelperImpl) UpdateScope(connectionId uint64, scopeId string, scope *models.RemoteScope) errors.Error {
+ // Update API on Gorm doesn't work with dynamic models. Need to do delete + create instead, unfortunately.
+ if err := s.DeleteScope(connectionId, scopeId); err != nil {
+ if !s.db.IsErrorNotFound(err) {
+ return err
+ }
+ }
+ now := time.Now()
+ return s.save([]*models.RemoteScope{scope}, nil, &now)
+}
+
+func (s *ScopeDatabaseHelperImpl) GetScope(connectionId uint64, scopeId string) (models.RemoteScope, errors.Error) {
+ query := dal.Where(fmt.Sprintf("connection_id = ? AND %s = ?", s.params.ScopeIdColumnName), connectionId, scopeId)
+ scope := s.pa.scopeType.New()
+ err := api.CallDB(s.db.First, scope, query)
+ if err != nil {
+ return nil, errors.Default.Wrap(err, "could not get scope")
+ }
+ return scope.Unwrap(), nil
+}
+
+func (s *ScopeDatabaseHelperImpl) ListScopes(input *plugin.ApiResourceInput, connectionId uint64) ([]*models.RemoteScope, errors.Error) {
+ limit, offset := api.GetLimitOffset(input.Query, "pageSize", "page")
+ scopes := s.pa.scopeType.NewSlice()
+ err := api.CallDB(s.db.All, scopes, dal.Where("connection_id = ?", connectionId), dal.Limit(limit), dal.Offset(offset))
+ if err != nil {
+ return nil, err
+ }
+ var result []*models.RemoteScope
+ for _, scope := range scopes.UnwrapSlice() {
+ scope := scope.(models.RemoteScope)
+ result = append(result, &scope)
+ }
+ return result, nil
+}
+
+func (s *ScopeDatabaseHelperImpl) DeleteScope(connectionId uint64, scopeId string) errors.Error {
+ rawScope := s.pa.scopeType.New()
+ return api.CallDB(s.db.Delete, rawScope, dal.Where("connection_id = ? AND id = ?", connectionId, scopeId))
+}
+
+func (s *ScopeDatabaseHelperImpl) GetTransformationRule(ruleId uint64) (models.RemoteTransformation, errors.Error) {
+ rule := s.pa.txRuleType.New()
+ err := api.CallDB(s.db.First, rule, dal.Where("id = ?", ruleId))
+ if err != nil {
+ return rule, err
+ }
+ return rule.Unwrap(), nil
+}
+
+func (s *ScopeDatabaseHelperImpl) ListTransformationRules(ruleIds []uint64) ([]*models.RemoteTransformation, errors.Error) {
+ rules := s.pa.txRuleType.NewSlice()
+ err := api.CallDB(s.db.All, rules, dal.Where("id IN (?)", ruleIds))
+ if err != nil {
+ return nil, err
+ }
+ var result []*models.RemoteTransformation
+ for _, rule := range rules.UnwrapSlice() {
+ rule := rule.(models.RemoteTransformation)
+ result = append(result, &rule)
+ }
+ return result, nil
+}
+
+func (s *ScopeDatabaseHelperImpl) save(scopes []*models.RemoteScope, createdAt *time.Time, updatedAt *time.Time) errors.Error {
+ var targets []map[string]any
+ for _, x := range scopes {
+ ifc := reflect.ValueOf(*x).Elem().Interface()
+ m, err := models.ToDatabaseMap(s.pa.scopeType.TableName(), ifc, createdAt, updatedAt)
+ if err != nil {
+ return err
+ }
+ targets = append(targets, m)
+ }
+ err := api.CallDB(s.db.Create, &targets, dal.From(s.pa.scopeType.TableName()))
+ if err != nil {
+ return errors.Default.Wrap(err, "could not save scope")
+ }
+ return nil
+}
+
+var _ api.ScopeDatabaseHelper[models.RemoteConnection, models.RemoteScope, models.RemoteTransformation] = &ScopeDatabaseHelperImpl{}
diff --git a/backend/test/e2e/remote/python_plugin_test.go b/backend/test/e2e/remote/python_plugin_test.go
index e20ca493b..08369edb1 100644
--- a/backend/test/e2e/remote/python_plugin_test.go
+++ b/backend/test/e2e/remote/python_plugin_test.go
@@ -131,6 +131,7 @@ func TestBlueprintV200(t *testing.T) {
})
rule := CreateTestTransformationRule(client, connection.ID)
scope := CreateTestScope(client, rule, connection.ID)
+
blueprint := client.CreateBasicBlueprintV2(
"Test blueprint",
&helper.BlueprintV2Config{
@@ -159,6 +160,13 @@ func TestBlueprintV200(t *testing.T) {
project := client.GetProject(projectName)
require.Equal(t, blueprint.Name, project.Blueprint.Name)
client.TriggerBlueprint(blueprint.ID)
+ scopesResponse := client.ListScopes(PLUGIN_NAME, connection.ID, true)
+ require.Equal(t, 1, len(scopesResponse))
+ require.Equal(t, 1, len(scopesResponse[0].Blueprints))
+ bps := client.DeleteScope(PLUGIN_NAME, connection.ID, scope.Id, false)
+ require.Equal(t, 1, len(bps))
+ scopesResponse = client.ListScopes(PLUGIN_NAME, connection.ID, true)
+ require.Equal(t, 0, len(scopesResponse))
}
func TestCreateTxRule(t *testing.T) {