You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2023/03/21 12:18:20 UTC
[incubator-devlake] branch main updated: feat: set project mapping by plugin org (#4706)
This is an automated email from the ASF dual-hosted git repository.
warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 8e915cd59 feat: set project mapping by plugin org (#4706)
8e915cd59 is described below
commit 8e915cd59f94ff35f1753784e4178f057b2c920c
Author: mindlesscloud <li...@merico.dev>
AuthorDate: Tue Mar 21 20:18:14 2023 +0800
feat: set project mapping by plugin org (#4706)
---
backend/core/dal/dal.go | 4 +-
backend/core/plugin/plugin_blueprint.go | 13 +++-
backend/plugins/org/api/types.go | 10 ++-
backend/plugins/org/e2e/project_mapping_test.go | 75 ++++++++++++++++++++++
.../org/e2e/snapshot_tables/project_mapping.csv | 3 +
backend/plugins/org/impl/impl.go | 23 +++++++
backend/plugins/org/tasks/project_mapping.go | 69 ++++++++++++++++++++
backend/plugins/org/tasks/task_data.go | 32 ++++++++-
backend/server/services/blueprint_makeplan_v200.go | 34 ++++------
.../services/blueprint_makeplan_v200_test.go | 14 +++-
10 files changed, 252 insertions(+), 25 deletions(-)
diff --git a/backend/core/dal/dal.go b/backend/core/dal/dal.go
index 5abbea8f3..a41e679a9 100644
--- a/backend/core/dal/dal.go
+++ b/backend/core/dal/dal.go
@@ -19,6 +19,7 @@ package dal
import (
"database/sql"
+ "fmt"
"reflect"
"github.com/apache/incubator-devlake/core/errors"
@@ -239,7 +240,8 @@ func GetPrimarykeyColumnNames(d Dal, dst Tabler) (names []string, err errors.Err
return
}
for _, pkColumn := range pkColumns {
- names = append(names, pkColumn.Name())
+ // in case the column name is a reserved identifier
+ names = append(names, fmt.Sprintf("%s.%s", dst.TableName(), pkColumn.Name()))
}
return
}
diff --git a/backend/core/plugin/plugin_blueprint.go b/backend/core/plugin/plugin_blueprint.go
index 3a9fe34e6..9487bccb4 100644
--- a/backend/core/plugin/plugin_blueprint.go
+++ b/backend/core/plugin/plugin_blueprint.go
@@ -141,6 +141,11 @@ type MetricPluginBlueprintV200 interface {
MakeMetricPluginPipelinePlanV200(projectName string, options json.RawMessage) (PipelinePlan, errors.Error)
}
+// ProjectMapper is implemented by the plugin org, which binding project and scopes
+type ProjectMapper interface {
+ MapProject(projectName string, scopes []Scope) (PipelinePlan, errors.Error)
+}
+
// CompositeDataSourcePluginBlueprintV200 is for unit test
type CompositeDataSourcePluginBlueprintV200 interface {
PluginMeta
@@ -153,13 +158,19 @@ type CompositeMetricPluginBlueprintV200 interface {
MetricPluginBlueprintV200
}
-// CompositeMetricPluginBlueprintV200 is for unit test
+// CompositePluginBlueprintV200 is for unit test
type CompositePluginBlueprintV200 interface {
PluginMeta
DataSourcePluginBlueprintV200
MetricPluginBlueprintV200
}
+// CompositeProjectMapper is for unit test
+type CompositeProjectMapper interface {
+ PluginMeta
+ ProjectMapper
+}
+
type BlueprintSyncPolicy struct {
Version string `json:"version" validate:"required,semver,oneof=1.0.0"`
SkipOnFail bool `json:"skipOnFail"`
diff --git a/backend/plugins/org/api/types.go b/backend/plugins/org/api/types.go
index 8fa56946e..6dc296011 100644
--- a/backend/plugins/org/api/types.go
+++ b/backend/plugins/org/api/types.go
@@ -18,9 +18,11 @@ limitations under the License.
package api
import (
+ "strings"
+
+ "github.com/apache/incubator-devlake/core/models/common"
"github.com/apache/incubator-devlake/core/models/domainlayer"
"github.com/apache/incubator-devlake/core/models/domainlayer/crossdomain"
- "strings"
)
const TimeFormat = "2006-01-02"
@@ -267,6 +269,12 @@ func (*projectMapping) toDomainLayer(tt []projectMapping) []*crossdomain.Project
ProjectName: t.ProjectName,
Table: t.Table,
RowId: t.RowId,
+ NoPKModel: common.NoPKModel{
+ RawDataOrigin: common.RawDataOrigin{
+ // set the RawDataParams equals to projectName. In the case of importing from CSV file, records would be deleted in terms of this field
+ RawDataParams: t.ProjectName,
+ },
+ },
})
}
return result
diff --git a/backend/plugins/org/e2e/project_mapping_test.go b/backend/plugins/org/e2e/project_mapping_test.go
new file mode 100644
index 000000000..b66abae4b
--- /dev/null
+++ b/backend/plugins/org/e2e/project_mapping_test.go
@@ -0,0 +1,75 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package e2e
+
+import (
+ "testing"
+
+ "github.com/apache/incubator-devlake/core/models/domainlayer/crossdomain"
+ "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/e2ehelper"
+ "github.com/apache/incubator-devlake/plugins/org/impl"
+ "github.com/apache/incubator-devlake/plugins/org/tasks"
+)
+
+type scope struct {
+ id string
+ tableName string
+}
+
+func (s scope) ScopeId() string {
+ return s.id
+}
+
+func (s scope) ScopeName() string {
+ panic("implement me")
+}
+
+func (s scope) TableName() string {
+ return s.tableName
+}
+
+func TestProjectMappingDataFlow(t *testing.T) {
+ dataflowTester := e2ehelper.NewDataFlowTester(t, "org", impl.Org{})
+ scopes := []plugin.Scope{scope{
+ id: "bitbucket:BitbucketRepo:4:thenicetgp/lake",
+ tableName: "boards",
+ }, scope{
+ id: "github:GithubRepo:1:1",
+ tableName: "repos",
+ }}
+ taskData := &tasks.TaskData{
+ Options: &tasks.Options{
+ ProjectMappings: []tasks.ProjectMapping{tasks.NewProjectMapping("my_project", scopes)},
+ },
+ }
+
+ // import raw data table
+ dataflowTester.FlushTabler(&crossdomain.ProjectMapping{})
+
+ dataflowTester.Subtask(tasks.SetProjectMappingMeta, taskData)
+ dataflowTester.VerifyTable(
+ crossdomain.ProjectMapping{},
+ "./snapshot_tables/project_mapping.csv",
+ e2ehelper.ColumnWithRawData(
+ "project_name",
+ "table",
+ "row_id",
+ ),
+ )
+}
diff --git a/backend/plugins/org/e2e/snapshot_tables/project_mapping.csv b/backend/plugins/org/e2e/snapshot_tables/project_mapping.csv
new file mode 100644
index 000000000..6f4620393
--- /dev/null
+++ b/backend/plugins/org/e2e/snapshot_tables/project_mapping.csv
@@ -0,0 +1,3 @@
+project_name,table,row_id,_raw_data_params,_raw_data_table,_raw_data_id,_raw_data_remark
+my_project,boards,bitbucket:BitbucketRepo:4:thenicetgp/lake,my_project,,0,
+my_project,repos,github:GithubRepo:1:1,my_project,,0,
diff --git a/backend/plugins/org/impl/impl.go b/backend/plugins/org/impl/impl.go
index 47808d9dc..97f1b23b1 100644
--- a/backend/plugins/org/impl/impl.go
+++ b/backend/plugins/org/impl/impl.go
@@ -31,6 +31,7 @@ var _ plugin.PluginMeta = (*Org)(nil)
var _ plugin.PluginInit = (*Org)(nil)
var _ plugin.PluginTask = (*Org)(nil)
var _ plugin.PluginModel = (*Org)(nil)
+var _ plugin.ProjectMapper = (*Org)(nil)
type Org struct {
handlers *api.Handlers
@@ -52,9 +53,31 @@ func (p Org) Description() string {
func (p Org) SubTaskMetas() []plugin.SubTaskMeta {
return []plugin.SubTaskMeta{
tasks.ConnectUserAccountsExactMeta,
+ tasks.SetProjectMappingMeta,
}
}
+func (p Org) MapProject(projectName string, scopes []plugin.Scope) (plugin.PipelinePlan, errors.Error) {
+ var plan plugin.PipelinePlan
+ var stage plugin.PipelineStage
+
+ // construct task options for Org
+ options := make(map[string]interface{})
+ options["projectMappings"] = []tasks.ProjectMapping{tasks.NewProjectMapping(projectName, scopes)}
+
+ subtasks, err := helper.MakePipelinePlanSubtasks([]plugin.SubTaskMeta{tasks.SetProjectMappingMeta}, []string{plugin.DOMAIN_TYPE_CROSS})
+ if err != nil {
+ return nil, err
+ }
+ stage = append(stage, &plugin.PipelineTask{
+ Plugin: "org",
+ Subtasks: subtasks,
+ Options: options,
+ })
+ plan = append(plan, stage)
+ return plan, nil
+}
+
func (p Org) PrepareTaskData(taskCtx plugin.TaskContext, options map[string]interface{}) (interface{}, errors.Error) {
var op tasks.Options
err := helper.Decode(options, &op, nil)
diff --git a/backend/plugins/org/tasks/project_mapping.go b/backend/plugins/org/tasks/project_mapping.go
new file mode 100644
index 000000000..4af980710
--- /dev/null
+++ b/backend/plugins/org/tasks/project_mapping.go
@@ -0,0 +1,69 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/models/common"
+ "github.com/apache/incubator-devlake/core/models/domainlayer/crossdomain"
+ "github.com/apache/incubator-devlake/core/plugin"
+)
+
+var SetProjectMappingMeta = plugin.SubTaskMeta{
+ Name: "setProjectMapping",
+ EntryPoint: SetProjectMapping,
+ EnabledByDefault: true,
+ Description: "set project mapping",
+ DomainTypes: []string{plugin.DOMAIN_TYPE_CROSS},
+}
+
+// SetProjectMapping binds projects and scopes
+func SetProjectMapping(taskCtx plugin.SubTaskContext) errors.Error {
+ db := taskCtx.GetDal()
+ data := taskCtx.GetData().(*TaskData)
+ var err errors.Error
+
+ for _, mapping := range data.Options.ProjectMappings {
+ err = db.Delete(&crossdomain.ProjectMapping{}, dal.Where("project_name = ?", mapping.ProjectName))
+ if err != nil {
+ return err
+ }
+ var projectMappings []crossdomain.ProjectMapping
+ for _, scope := range mapping.Scopes {
+ projectMappings = append(projectMappings, crossdomain.ProjectMapping{
+ ProjectName: mapping.ProjectName,
+ Table: scope.Table,
+ RowId: scope.RowID,
+ NoPKModel: common.NoPKModel{
+ RawDataOrigin: common.RawDataOrigin{
+ // set the RawDataParams equals to projectName. In the case of importing from CSV file, records would be deleted in terms of this field
+ RawDataParams: mapping.ProjectName,
+ },
+ },
+ })
+ }
+ if len(projectMappings) > 0 {
+ err = db.CreateOrUpdate(projectMappings)
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
diff --git a/backend/plugins/org/tasks/task_data.go b/backend/plugins/org/tasks/task_data.go
index f78913d2b..ca2a9b300 100644
--- a/backend/plugins/org/tasks/task_data.go
+++ b/backend/plugins/org/tasks/task_data.go
@@ -17,8 +17,38 @@ limitations under the License.
package tasks
+import "github.com/apache/incubator-devlake/core/plugin"
+
type Options struct {
- ConnectionId uint64 `json:"connectionId"`
+ ConnectionId uint64 `json:"connectionId"`
+ ProjectMappings []ProjectMapping `json:"projectMappings"`
+}
+
+// ProjectMapping represents the relations between project and scopes
+type ProjectMapping struct {
+ ProjectName string `json:"projectName"`
+ Scopes []Scope `json:"scopes"`
+}
+
+// Scope represents a scope by specifies the table and id
+type Scope struct {
+ Table string `json:"table"`
+ RowID string `json:"rowId"`
+}
+
+// NewProjectMapping is the construct function of ProjectMapping
+func NewProjectMapping(projectName string, pluginScopes []plugin.Scope) ProjectMapping {
+ var scopes []Scope
+ for _, ps := range pluginScopes {
+ scopes = append(scopes, Scope{
+ Table: ps.TableName(),
+ RowID: ps.ScopeId(),
+ })
+ }
+ return ProjectMapping{
+ ProjectName: projectName,
+ Scopes: scopes,
+ }
}
type TaskData struct {
diff --git a/backend/server/services/blueprint_makeplan_v200.go b/backend/server/services/blueprint_makeplan_v200.go
index 81047807d..3296521aa 100644
--- a/backend/server/services/blueprint_makeplan_v200.go
+++ b/backend/server/services/blueprint_makeplan_v200.go
@@ -20,11 +20,9 @@ package services
import (
"encoding/json"
"fmt"
- "github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
- "github.com/apache/incubator-devlake/core/models/domainlayer/crossdomain"
"github.com/apache/incubator-devlake/core/plugin"
)
@@ -50,24 +48,6 @@ func GeneratePlanJsonV200(
}
}
}
- // refresh project_mapping table to reflect project/scopes relationship
- if len(projectName) != 0 {
- err = db.Delete(&crossdomain.ProjectMapping{}, dal.Where("project_name = ?", projectName))
- if err != nil {
- return nil, err
- }
- for _, scope := range scopes {
- projectMapping := &crossdomain.ProjectMapping{
- ProjectName: projectName,
- Table: scope.TableName(),
- RowId: scope.ScopeId(),
- }
- err = db.CreateOrUpdate(projectMapping)
- if err != nil {
- return nil, err
- }
- }
- }
return plan, err
}
@@ -141,7 +121,21 @@ func genPlanJsonV200(
)
}
}
+ var planForProjectMapping plugin.PipelinePlan
+ if projectName != "" {
+ p, err := plugin.GetPlugin("org")
+ if err != nil {
+ return nil, nil, err
+ }
+ if pluginBp, ok := p.(plugin.ProjectMapper); ok {
+ planForProjectMapping, err = pluginBp.MapProject(projectName, scopes)
+ if err != nil {
+ return nil, nil, err
+ }
+ }
+ }
plan := SequencializePipelinePlans(
+ planForProjectMapping,
ParallelizePipelinePlans(sourcePlans...),
ParallelizePipelinePlans(metricPlans...),
)
diff --git a/backend/server/services/blueprint_makeplan_v200_test.go b/backend/server/services/blueprint_makeplan_v200_test.go
index fef7b4cc5..7563f98c2 100644
--- a/backend/server/services/blueprint_makeplan_v200_test.go
+++ b/backend/server/services/blueprint_makeplan_v200_test.go
@@ -19,7 +19,6 @@ package services
import (
"encoding/json"
- mockplugin "github.com/apache/incubator-devlake/mocks/core/plugin"
"testing"
"github.com/apache/incubator-devlake/core/models"
@@ -27,6 +26,8 @@ import (
"github.com/apache/incubator-devlake/core/models/domainlayer/code"
"github.com/apache/incubator-devlake/core/models/domainlayer/ticket"
"github.com/apache/incubator-devlake/core/plugin"
+ mockplugin "github.com/apache/incubator-devlake/mocks/core/plugin"
+ "github.com/apache/incubator-devlake/plugins/org/tasks"
"github.com/stretchr/testify/assert"
)
@@ -68,8 +69,18 @@ func TestMakePlanV200(t *testing.T) {
dora := new(mockplugin.CompositeMetricPluginBlueprintV200)
dora.On("MakeMetricPluginPipelinePlanV200", projectName, json.RawMessage("{}")).Return(doraOutputPlan, nil)
+ // mock org plugin
+ org := new(mockplugin.CompositeProjectMapper)
+ orgPlan := plugin.PipelinePlan{
+ {
+ {Plugin: "org", Subtasks: []string{"setProjectMapping"}, Options: map[string]interface{}{"projectMappings": []interface{}{tasks.NewProjectMapping(projectName, githubOutputScopes)}}},
+ },
+ }
+ org.On("MapProject", projectName, githubOutputScopes).Return(orgPlan, nil)
+
// expectation, establish expectation before any code being launch to avoid unwanted modification
expectedPlan := make(plugin.PipelinePlan, 0)
+ expectedPlan = append(expectedPlan, orgPlan...)
expectedPlan = append(expectedPlan, githubOutputPlan...)
expectedPlan = append(expectedPlan, doraOutputPlan...)
expectedScopes := append(make([]plugin.Scope, 0), githubOutputScopes...)
@@ -77,6 +88,7 @@ func TestMakePlanV200(t *testing.T) {
// plugin registration
plugin.RegisterPlugin(githubName, github)
plugin.RegisterPlugin(doraName, dora)
+ plugin.RegisterPlugin("org", org)
// put them together and call GeneratePlanJsonV200
connections, _ := json.Marshal([]*plugin.BlueprintConnectionV200{