You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2023/02/20 16:29:17 UTC

[incubator-devlake] branch main updated: feat: add pipeline step for bitbucket (#4459)

This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new b1d4a2c4d feat: add pipeline step for bitbucket (#4459)
b1d4a2c4d is described below

commit b1d4a2c4d788b0927210b44e25a47cfa5d816637
Author: Likyh <ya...@meri.co>
AuthorDate: Tue Feb 21 00:29:11 2023 +0800

    feat: add pipeline step for bitbucket (#4459)
    
    * feat: add pipeline step for bitbucket
    
    * fix: fix for test
---
 backend/plugins/bitbucket/impl/impl.go             |   6 +-
 backend/plugins/bitbucket/models/deployment.go     |  33 ++++---
 .../migrationscripts/20230215_add_pipeline_step.go |  49 ++++++++++
 .../{register.go => archived/pipeline_step.go}     |  34 ++++---
 .../bitbucket/models/migrationscripts/register.go  |   1 +
 .../models/{deployment.go => pipeline_step.go}     |  34 +++----
 backend/plugins/bitbucket/tasks/api_common.go      |  30 ++++++
 .../bitbucket/tasks/deployment_collector.go        |   5 +-
 .../bitbucket/tasks/deployment_convertor.go        | 107 ---------------------
 .../bitbucket/tasks/deployment_extractor.go        |  51 +++++-----
 backend/plugins/bitbucket/tasks/issue_collector.go |   3 +-
 .../bitbucket/tasks/issue_comment_collector.go     |  14 +--
 .../plugins/bitbucket/tasks/pipeline_collector.go  |   3 +-
 .../plugins/bitbucket/tasks/pipeline_convertor.go  |   2 +-
 ...ne_collector.go => pipeline_steps_collector.go} |  43 +++++----
 ...ne_convertor.go => pipeline_steps_convertor.go} |  90 ++++++++++-------
 .../bitbucket/tasks/pipeline_steps_extractor.go    | 104 ++++++++++++++++++++
 backend/plugins/bitbucket/tasks/pr_collector.go    |   3 +-
 .../bitbucket/tasks/pr_comment_collector.go        |  16 +--
 19 files changed, 381 insertions(+), 247 deletions(-)

diff --git a/backend/plugins/bitbucket/impl/impl.go b/backend/plugins/bitbucket/impl/impl.go
index 12f5c051f..0a11007bd 100644
--- a/backend/plugins/bitbucket/impl/impl.go
+++ b/backend/plugins/bitbucket/impl/impl.go
@@ -105,6 +105,10 @@ func (p Bitbucket) SubTaskMetas() []plugin.SubTaskMeta {
 		tasks.CollectApiDeploymentsMeta,
 		tasks.ExtractApiDeploymentsMeta,
 
+		// must run after deployment to match
+		tasks.CollectPipelineStepsMeta,
+		tasks.ExtractPipelineStepsMeta,
+
 		tasks.ConvertRepoMeta,
 		tasks.ConvertAccountsMeta,
 		tasks.ConvertPullRequestsMeta,
@@ -114,7 +118,7 @@ func (p Bitbucket) SubTaskMetas() []plugin.SubTaskMeta {
 		tasks.ConvertIssuesMeta,
 		tasks.ConvertIssueCommentsMeta,
 		tasks.ConvertPipelineMeta,
-		tasks.ConvertDeploymentMeta,
+		tasks.ConvertPipelineStepMeta,
 	}
 }
 
diff --git a/backend/plugins/bitbucket/models/deployment.go b/backend/plugins/bitbucket/models/deployment.go
index 3bb5bb1f9..6eb0425b2 100644
--- a/backend/plugins/bitbucket/models/deployment.go
+++ b/backend/plugins/bitbucket/models/deployment.go
@@ -23,21 +23,24 @@ import (
 )
 
 type BitbucketDeployment struct {
-	ConnectionId   uint64 `gorm:"primaryKey"`
-	BitbucketId    string `gorm:"primaryKey"`
-	PipelineId     string `gorm:"type:varchar(255)"`
-	Type           string `gorm:"type:varchar(255)"`
-	Name           string `gorm:"type:varchar(255)"`
-	Key            string `gorm:"type:varchar(255)"`
-	WebUrl         string `gorm:"type:varchar(255)"`
-	Status         string `gorm:"type:varchar(100)"`
-	StateUrl       string `gorm:"type:varchar(255)"`
-	CommitSha      string `gorm:"type:varchar(255)"`
-	CommitUrl      string `gorm:"type:varchar(255)"`
-	CreatedOn      *time.Time
-	StartedOn      *time.Time
-	CompletedOn    *time.Time
-	LastUpdateTime *time.Time
+	ConnectionId    uint64 `gorm:"primaryKey"`
+	BitbucketId     string `gorm:"primaryKey"`
+	PipelineId      string `gorm:"type:varchar(255)"`
+	StepId          string `gorm:"type:varchar(255)"`
+	Type            string `gorm:"type:varchar(255)"`
+	Name            string `gorm:"type:varchar(255)"`
+	Environment     string `gorm:"type:varchar(255)"`
+	EnvironmentType string `gorm:"type:varchar(255)"`
+	Key             string `gorm:"type:varchar(255)"`
+	WebUrl          string `gorm:"type:varchar(255)"`
+	Status          string `gorm:"type:varchar(100)"`
+	StateUrl        string `gorm:"type:varchar(255)"`
+	CommitSha       string `gorm:"type:varchar(255)"`
+	CommitUrl       string `gorm:"type:varchar(255)"`
+	CreatedOn       *time.Time
+	StartedOn       *time.Time
+	CompletedOn     *time.Time
+	LastUpdateTime  *time.Time
 	common.NoPKModel
 }
 
diff --git a/backend/plugins/bitbucket/models/migrationscripts/20230215_add_pipeline_step.go b/backend/plugins/bitbucket/models/migrationscripts/20230215_add_pipeline_step.go
new file mode 100644
index 000000000..43bb4af39
--- /dev/null
+++ b/backend/plugins/bitbucket/models/migrationscripts/20230215_add_pipeline_step.go
@@ -0,0 +1,49 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+	"github.com/apache/incubator-devlake/core/context"
+	"github.com/apache/incubator-devlake/core/errors"
+	"github.com/apache/incubator-devlake/helpers/migrationhelper"
+	"github.com/apache/incubator-devlake/plugins/bitbucket/models/migrationscripts/archived"
+)
+
+type BitbucketDeployment20230215 struct {
+	StepId          string `gorm:"type:varchar(255)"`
+	Environment     string `gorm:"type:varchar(255)"`
+	EnvironmentType string `gorm:"type:varchar(255)"`
+}
+
+func (BitbucketDeployment20230215) TableName() string {
+	return "_tool_bitbucket_deployments"
+}
+
+type addPipelineStep20230215 struct{}
+
+func (script *addPipelineStep20230215) Up(basicRes context.BasicRes) errors.Error {
+	return migrationhelper.AutoMigrateTables(basicRes, &archived.BitbucketPipelineStep{}, &BitbucketDeployment20230215{})
+}
+
+func (*addPipelineStep20230215) Version() uint64 {
+	return 20230215000009
+}
+
+func (*addPipelineStep20230215) Name() string {
+	return "add pipeline step"
+}
diff --git a/backend/plugins/bitbucket/models/migrationscripts/register.go b/backend/plugins/bitbucket/models/migrationscripts/archived/pipeline_step.go
similarity index 50%
copy from backend/plugins/bitbucket/models/migrationscripts/register.go
copy to backend/plugins/bitbucket/models/migrationscripts/archived/pipeline_step.go
index 183d1b268..84bf8e325 100644
--- a/backend/plugins/bitbucket/models/migrationscripts/register.go
+++ b/backend/plugins/bitbucket/models/migrationscripts/archived/pipeline_step.go
@@ -15,20 +15,30 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package migrationscripts
+package archived
 
 import (
-	plugin "github.com/apache/incubator-devlake/core/plugin"
+	"github.com/apache/incubator-devlake/core/models/migrationscripts/archived"
+	"time"
 )
 
-// All return all the migration scripts
-func All() []plugin.MigrationScript {
-	return []plugin.MigrationScript{
-		new(addInitTables20220803),
-		new(addPipeline20220914),
-		new(addPrCommits20221008),
-		new(addDeployment20221013),
-		new(addRepoIdAndCommitShaField20221014),
-		new(addScope20230206),
-	}
+type BitbucketPipelineStep struct {
+	ConnectionId      uint64 `gorm:"primaryKey"`
+	BitbucketId       string `gorm:"primaryKey"`
+	PipelineId        string `gorm:"type:varchar(255)"`
+	Name              string `gorm:"type:varchar(255)"`
+	Trigger           string `gorm:"type:varchar(255)"`
+	State             string `gorm:"type:varchar(255)"`
+	Result            string `gorm:"type:varchar(255)"`
+	MaxTime           int
+	StartedOn         *time.Time
+	CompletedOn       *time.Time
+	DurationInSeconds int
+	BuildSecondsUsed  int
+	RunNumber         int
+	archived.NoPKModel
+}
+
+func (BitbucketPipelineStep) TableName() string {
+	return "_tool_bitbucket_pipeline_steps"
 }
diff --git a/backend/plugins/bitbucket/models/migrationscripts/register.go b/backend/plugins/bitbucket/models/migrationscripts/register.go
index 183d1b268..bff36d850 100644
--- a/backend/plugins/bitbucket/models/migrationscripts/register.go
+++ b/backend/plugins/bitbucket/models/migrationscripts/register.go
@@ -30,5 +30,6 @@ func All() []plugin.MigrationScript {
 		new(addDeployment20221013),
 		new(addRepoIdAndCommitShaField20221014),
 		new(addScope20230206),
+		new(addPipelineStep20230215),
 	}
 }
diff --git a/backend/plugins/bitbucket/models/deployment.go b/backend/plugins/bitbucket/models/pipeline_step.go
similarity index 53%
copy from backend/plugins/bitbucket/models/deployment.go
copy to backend/plugins/bitbucket/models/pipeline_step.go
index 3bb5bb1f9..7f08a4b1c 100644
--- a/backend/plugins/bitbucket/models/deployment.go
+++ b/backend/plugins/bitbucket/models/pipeline_step.go
@@ -22,25 +22,23 @@ import (
 	"time"
 )
 
-type BitbucketDeployment struct {
-	ConnectionId   uint64 `gorm:"primaryKey"`
-	BitbucketId    string `gorm:"primaryKey"`
-	PipelineId     string `gorm:"type:varchar(255)"`
-	Type           string `gorm:"type:varchar(255)"`
-	Name           string `gorm:"type:varchar(255)"`
-	Key            string `gorm:"type:varchar(255)"`
-	WebUrl         string `gorm:"type:varchar(255)"`
-	Status         string `gorm:"type:varchar(100)"`
-	StateUrl       string `gorm:"type:varchar(255)"`
-	CommitSha      string `gorm:"type:varchar(255)"`
-	CommitUrl      string `gorm:"type:varchar(255)"`
-	CreatedOn      *time.Time
-	StartedOn      *time.Time
-	CompletedOn    *time.Time
-	LastUpdateTime *time.Time
+type BitbucketPipelineStep struct {
+	ConnectionId      uint64 `gorm:"primaryKey"`
+	BitbucketId       string `gorm:"primaryKey"`
+	PipelineId        string `gorm:"type:varchar(255)"`
+	Name              string `gorm:"type:varchar(255)"`
+	Trigger           string `gorm:"type:varchar(255)"`
+	State             string `gorm:"type:varchar(255)"`
+	Result            string `gorm:"type:varchar(255)"`
+	MaxTime           int
+	StartedOn         *time.Time
+	CompletedOn       *time.Time
+	DurationInSeconds int
+	BuildSecondsUsed  int
+	RunNumber         int
 	common.NoPKModel
 }
 
-func (BitbucketDeployment) TableName() string {
-	return "_tool_bitbucket_deployments"
+func (BitbucketPipelineStep) TableName() string {
+	return "_tool_bitbucket_pipeline_steps"
 }
diff --git a/backend/plugins/bitbucket/tasks/api_common.go b/backend/plugins/bitbucket/tasks/api_common.go
index 9774bd38d..c3d994ce6 100644
--- a/backend/plugins/bitbucket/tasks/api_common.go
+++ b/backend/plugins/bitbucket/tasks/api_common.go
@@ -39,6 +39,10 @@ type BitbucketInput struct {
 	BitbucketId int
 }
 
+type BitbucketUuidInput struct {
+	BitbucketId string
+}
+
 type BitbucketPagination struct {
 	Values  []interface{} `json:"values"`
 	PageLen int           `json:"pagelen"`
@@ -218,6 +222,32 @@ func GetIssuesIterator(taskCtx plugin.SubTaskContext, collectorWithState *api.Ap
 	return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(BitbucketInput{}))
 }
 
+func GetPipelinesIterator(taskCtx plugin.SubTaskContext, collectorWithState *api.ApiCollectorStateManager) (*api.DalCursorIterator, errors.Error) {
+	db := taskCtx.GetDal()
+	data := taskCtx.GetData().(*BitbucketTaskData)
+	clauses := []dal.Clause{
+		dal.Select("bpr.bitbucket_id"),
+		dal.From("_tool_bitbucket_pipelines bpr"),
+		dal.Where(
+			`bpr.repo_id = ? and bpr.connection_id = ?`,
+			data.Options.FullName, data.Options.ConnectionId,
+		),
+	}
+	if collectorWithState.CreatedDateAfter != nil {
+		clauses = append(clauses, dal.Where("bitbucket_created_on > ?", *collectorWithState.CreatedDateAfter))
+	}
+	if collectorWithState.IsIncremental() {
+		clauses = append(clauses, dal.Where("bitbucket_complete_on > ?", *collectorWithState.LatestState.LatestSuccessStart))
+	}
+	// construct the input iterator
+	cursor, err := db.Cursor(clauses...)
+	if err != nil {
+		return nil, err
+	}
+
+	return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(BitbucketUuidInput{}))
+}
+
 func ignoreHTTPStatus404(res *http.Response) errors.Error {
 	if res.StatusCode == http.StatusUnauthorized {
 		return errors.Unauthorized.New("authentication failed, please check your AccessToken")
diff --git a/backend/plugins/bitbucket/tasks/deployment_collector.go b/backend/plugins/bitbucket/tasks/deployment_collector.go
index 4e7ed014f..bf7f4f4ca 100644
--- a/backend/plugins/bitbucket/tasks/deployment_collector.go
+++ b/backend/plugins/bitbucket/tasks/deployment_collector.go
@@ -42,10 +42,11 @@ func CollectApiDeployments(taskCtx plugin.SubTaskContext) errors.Error {
 		PageSize:           50,
 		Incremental:        false,
 		UrlTemplate:        "repositories/{{ .Params.FullName }}/deployments/",
-		Query: GetQueryFields(`values.type,values.uuid,` +
+		Query: GetQueryFields(`values.type,values.uuid,values.environment.name,values.environment.environment_type.name,values.step.uuid,` +
 			`values.release.pipeline,values.release.key,values.release.name,values.release.url,values.release.created_on,` +
 			`values.release.commit.hash,values.release.commit.links.html,` +
-			`values.state.name,values.state.url,values.state.started_on,values.state.completed_on,values.last_update_time`),
+			`values.state.name,values.state.url,values.state.started_on,values.state.completed_on,values.last_update_time,` +
+			`page,pagelen,size`),
 		ResponseParser: GetRawMessageFromResponse,
 		GetTotalPages:  GetTotalPagesFromResponse,
 	})
diff --git a/backend/plugins/bitbucket/tasks/deployment_convertor.go b/backend/plugins/bitbucket/tasks/deployment_convertor.go
deleted file mode 100644
index 2b3c410b0..000000000
--- a/backend/plugins/bitbucket/tasks/deployment_convertor.go
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package tasks
-
-import (
-	"github.com/apache/incubator-devlake/core/dal"
-	"github.com/apache/incubator-devlake/core/errors"
-	"github.com/apache/incubator-devlake/core/models/domainlayer"
-	"github.com/apache/incubator-devlake/core/models/domainlayer/devops"
-	"github.com/apache/incubator-devlake/core/models/domainlayer/didgen"
-	plugin "github.com/apache/incubator-devlake/core/plugin"
-	"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-	"github.com/apache/incubator-devlake/plugins/bitbucket/models"
-	"reflect"
-	"time"
-)
-
-var ConvertDeploymentMeta = plugin.SubTaskMeta{
-	Name:             "convertDeployments",
-	EntryPoint:       ConvertDeployments,
-	EnabledByDefault: true,
-	Description:      "Convert tool layer table bitbucket_pipeline into domain layer table pipeline",
-	DomainTypes:      []string{plugin.DOMAIN_TYPE_CROSS},
-}
-
-func ConvertDeployments(taskCtx plugin.SubTaskContext) errors.Error {
-	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_DEPLOYMENT_TABLE)
-	db := taskCtx.GetDal()
-
-	cursor, err := db.Cursor(dal.From(models.BitbucketDeployment{}))
-	if err != nil {
-		return err
-	}
-	defer cursor.Close()
-
-	pipelineIdGen := didgen.NewDomainIdGenerator(&models.BitbucketDeployment{})
-
-	converter, err := api.NewDataConverter(api.DataConverterArgs{
-		InputRowType:       reflect.TypeOf(models.BitbucketDeployment{}),
-		Input:              cursor,
-		RawDataSubTaskArgs: *rawDataSubTaskArgs,
-		Convert: func(inputRow interface{}) ([]interface{}, errors.Error) {
-			bitbucketDeployment := inputRow.(*models.BitbucketDeployment)
-
-			startedAt := bitbucketDeployment.CreatedOn
-			if bitbucketDeployment.StartedOn != nil {
-				startedAt = bitbucketDeployment.StartedOn
-			}
-			domainDeployment := &devops.CICDTask{
-				DomainEntity: domainlayer.DomainEntity{
-					Id: pipelineIdGen.Generate(data.Options.ConnectionId, bitbucketDeployment.BitbucketId),
-				},
-				Name: didgen.NewDomainIdGenerator(&models.BitbucketPipeline{}).
-					Generate(data.Options.ConnectionId, bitbucketDeployment.Name),
-				PipelineId: bitbucketDeployment.PipelineId,
-				Result: devops.GetResult(&devops.ResultRule{
-					Failed:  []string{models.FAILED, models.ERROR, models.UNDEPLOYED},
-					Abort:   []string{models.STOPPED, models.SKIPPED},
-					Success: []string{models.SUCCESSFUL, models.COMPLETED},
-					Manual:  []string{models.PAUSED, models.HALTED},
-					Default: devops.SUCCESS,
-				}, bitbucketDeployment.Status),
-				Status: devops.GetStatus(&devops.StatusRule{
-					InProgress: []string{models.IN_PROGRESS, models.PENDING, models.BUILDING},
-					Default:    devops.DONE,
-				}, bitbucketDeployment.Status),
-				Type:         bitbucketDeployment.Type,
-				StartedDate:  *startedAt,
-				FinishedDate: bitbucketDeployment.CompletedOn,
-			}
-			// rebuild the FinishedDate and DurationSec by Status
-			finishedAt := time.Now()
-			if domainDeployment.Status != devops.DONE {
-				domainDeployment.FinishedDate = nil
-			} else if bitbucketDeployment.CompletedOn != nil {
-				finishedAt = *bitbucketDeployment.CompletedOn
-			}
-			durationTime := finishedAt.Sub(*startedAt)
-			domainDeployment.DurationSec = uint64(durationTime.Seconds())
-
-			return []interface{}{
-				domainDeployment,
-			}, nil
-		},
-	})
-
-	if err != nil {
-		return err
-	}
-
-	return converter.Execute()
-}
diff --git a/backend/plugins/bitbucket/tasks/deployment_extractor.go b/backend/plugins/bitbucket/tasks/deployment_extractor.go
index 1b5fa12ea..a93877378 100644
--- a/backend/plugins/bitbucket/tasks/deployment_extractor.go
+++ b/backend/plugins/bitbucket/tasks/deployment_extractor.go
@@ -30,12 +30,15 @@ type bitbucketApiDeploymentsResponse struct {
 	Type string `json:"type"`
 	UUID string `json:"uuid"`
 	//Key  string `json:"key"`
-	//Step struct {
-	//	UUID string `json:"uuid"`
-	//} `json:"step"`
-	//Environment struct {
-	//	UUID string `json:"uuid"`
-	//} `json:"environment"`
+	Step struct {
+		UUID string `json:"uuid"`
+	} `json:"step"`
+	Environment struct {
+		Name            string `json:"name"`
+		EnvironmentType struct {
+			Name string `json:"name"`
+		} `json:"environment_type"`
+	} `json:"environment"`
 	Release struct {
 		//Type     string `json:"type"`
 		//UUID     string `json:"uuid"`
@@ -93,24 +96,24 @@ func ExtractApiDeployments(taskCtx plugin.SubTaskContext) errors.Error {
 			}
 
 			bitbucketDeployment := &models.BitbucketDeployment{
-				ConnectionId:   data.Options.ConnectionId,
-				BitbucketId:    bitbucketApiDeployments.UUID,
-				PipelineId:     bitbucketApiDeployments.Release.Pipeline.UUID,
-				Type:           bitbucketApiDeployments.Type,
-				Name:           bitbucketApiDeployments.Release.Name,
-				Key:            bitbucketApiDeployments.Release.Key,
-				WebUrl:         bitbucketApiDeployments.Release.URL,
-				CommitSha:      bitbucketApiDeployments.Release.Commit.Hash,
-				CommitUrl:      bitbucketApiDeployments.Release.Commit.Links.HTML.Href,
-				Status:         bitbucketApiDeployments.State.Name,
-				StateUrl:       bitbucketApiDeployments.State.URL,
-				CreatedOn:      bitbucketApiDeployments.Release.CreatedOn,
-				StartedOn:      bitbucketApiDeployments.State.StartedOn,
-				CompletedOn:    bitbucketApiDeployments.State.CompletedOn,
-				LastUpdateTime: bitbucketApiDeployments.LastUpdateTime,
-			}
-			if err != nil {
-				return nil, err
+				ConnectionId:    data.Options.ConnectionId,
+				BitbucketId:     bitbucketApiDeployments.UUID,
+				PipelineId:      bitbucketApiDeployments.Release.Pipeline.UUID,
+				StepId:          bitbucketApiDeployments.Step.UUID,
+				Type:            bitbucketApiDeployments.Type,
+				Name:            bitbucketApiDeployments.Release.Name,
+				Environment:     bitbucketApiDeployments.Environment.Name,
+				EnvironmentType: bitbucketApiDeployments.Environment.EnvironmentType.Name,
+				Key:             bitbucketApiDeployments.Release.Key,
+				WebUrl:          bitbucketApiDeployments.Release.URL,
+				CommitSha:       bitbucketApiDeployments.Release.Commit.Hash,
+				CommitUrl:       bitbucketApiDeployments.Release.Commit.Links.HTML.Href,
+				Status:          bitbucketApiDeployments.State.Name,
+				StateUrl:        bitbucketApiDeployments.State.URL,
+				CreatedOn:       bitbucketApiDeployments.Release.CreatedOn,
+				StartedOn:       bitbucketApiDeployments.State.StartedOn,
+				CompletedOn:     bitbucketApiDeployments.State.CompletedOn,
+				LastUpdateTime:  bitbucketApiDeployments.LastUpdateTime,
 			}
 
 			results := make([]interface{}, 0, 2)
diff --git a/backend/plugins/bitbucket/tasks/issue_collector.go b/backend/plugins/bitbucket/tasks/issue_collector.go
index 6294f5c04..9725ddc90 100644
--- a/backend/plugins/bitbucket/tasks/issue_collector.go
+++ b/backend/plugins/bitbucket/tasks/issue_collector.go
@@ -48,7 +48,8 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext) errors.Error {
 		Query: GetQueryCreatedAndUpdated(
 			`values.type,values.id,values.links.self,`+
 				`values.title,values.content.raw,values.reporter,values.assignee,`+
-				`values.state,values.milestone.id,values.component,values.priority,values.created_on,values.updated_on`,
+				`values.state,values.milestone.id,values.component,values.priority,values.created_on,values.updated_on,`+
+				`page,pagelen,size`,
 			collectorWithState),
 		GetTotalPages:  GetTotalPagesFromResponse,
 		ResponseParser: GetRawMessageFromResponse,
diff --git a/backend/plugins/bitbucket/tasks/issue_comment_collector.go b/backend/plugins/bitbucket/tasks/issue_comment_collector.go
index dc9645348..8493f0844 100644
--- a/backend/plugins/bitbucket/tasks/issue_comment_collector.go
+++ b/backend/plugins/bitbucket/tasks/issue_comment_collector.go
@@ -48,12 +48,14 @@ func CollectApiIssueComments(taskCtx plugin.SubTaskContext) errors.Error {
 	defer iterator.Close()
 
 	err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-		ApiClient:      data.ApiClient,
-		PageSize:       100,
-		Incremental:    collectorWithState.IsIncremental(),
-		Input:          iterator,
-		UrlTemplate:    "repositories/{{ .Params.FullName }}/issues/{{ .Input.BitbucketId }}/comments",
-		Query:          GetQueryFields(`values.type,values.id,values.created_on,values.updated_on,values.content,values.issue.id,values.user`),
+		ApiClient:   data.ApiClient,
+		PageSize:    100,
+		Incremental: collectorWithState.IsIncremental(),
+		Input:       iterator,
+		UrlTemplate: "repositories/{{ .Params.FullName }}/issues/{{ .Input.BitbucketId }}/comments",
+		Query: GetQueryFields(
+			`values.type,values.id,values.created_on,values.updated_on,values.content,values.issue.id,values.user,` +
+				`page,pagelen,size`),
 		GetTotalPages:  GetTotalPagesFromResponse,
 		ResponseParser: GetRawMessageFromResponse,
 		AfterResponse:  ignoreHTTPStatus404,
diff --git a/backend/plugins/bitbucket/tasks/pipeline_collector.go b/backend/plugins/bitbucket/tasks/pipeline_collector.go
index d49559c87..86ab06d51 100644
--- a/backend/plugins/bitbucket/tasks/pipeline_collector.go
+++ b/backend/plugins/bitbucket/tasks/pipeline_collector.go
@@ -48,7 +48,8 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext) errors.Error {
 		Query: GetQueryCreatedAndUpdated(
 			`values.uuid,values.type,values.state.name,values.state.result.name,values.state.result.type,values.state.stage.name,values.state.stage.type,`+
 				`values.target.ref_name,values.target.commit.hash,`+
-				`values.created_on,values.completed_on,values.duration_in_seconds,values.links.self`,
+				`values.created_on,values.completed_on,values.duration_in_seconds,values.links.self,`+
+				`page,pagelen,size`,
 			collectorWithState),
 		ResponseParser: GetRawMessageFromResponse,
 		GetTotalPages:  GetTotalPagesFromResponse,
diff --git a/backend/plugins/bitbucket/tasks/pipeline_convertor.go b/backend/plugins/bitbucket/tasks/pipeline_convertor.go
index 83a293983..d0f9edd0d 100644
--- a/backend/plugins/bitbucket/tasks/pipeline_convertor.go
+++ b/backend/plugins/bitbucket/tasks/pipeline_convertor.go
@@ -35,7 +35,7 @@ var ConvertPipelineMeta = plugin.SubTaskMeta{
 	EntryPoint:       ConvertPipelines,
 	EnabledByDefault: true,
 	Description:      "Convert tool layer table bitbucket_pipeline into domain layer table pipeline",
-	DomainTypes:      []string{plugin.DOMAIN_TYPE_CROSS},
+	DomainTypes:      []string{plugin.DOMAIN_TYPE_CICD},
 }
 
 func ConvertPipelines(taskCtx plugin.SubTaskContext) errors.Error {
diff --git a/backend/plugins/bitbucket/tasks/pipeline_collector.go b/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
similarity index 58%
copy from backend/plugins/bitbucket/tasks/pipeline_collector.go
copy to backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
index d49559c87..77299e013 100644
--- a/backend/plugins/bitbucket/tasks/pipeline_collector.go
+++ b/backend/plugins/bitbucket/tasks/pipeline_steps_collector.go
@@ -19,43 +19,52 @@ package tasks
 
 import (
 	"github.com/apache/incubator-devlake/core/errors"
-	plugin "github.com/apache/incubator-devlake/core/plugin"
+	"github.com/apache/incubator-devlake/core/plugin"
 	helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
 )
 
-const RAW_PIPELINE_TABLE = "bitbucket_api_pipelines"
+const RAW_PIPELINE_STEPS_TABLE = "bitbucket_pipeline_steps"
 
-var CollectApiPipelinesMeta = plugin.SubTaskMeta{
-	Name:             "collectApiPipelines",
-	EntryPoint:       CollectApiPipelines,
+var _ plugin.SubTaskEntryPoint = CollectPipelineSteps
+
+var CollectPipelineStepsMeta = plugin.SubTaskMeta{
+	Name:             "CollectPipelineSteps",
+	EntryPoint:       CollectPipelineSteps,
 	EnabledByDefault: true,
-	Description:      "Collect pipeline data from bitbucket api",
+	Description:      "Collect PipelineSteps data from Bitbucket api",
 	DomainTypes:      []string{plugin.DOMAIN_TYPE_CICD},
 }
 
-func CollectApiPipelines(taskCtx plugin.SubTaskContext) errors.Error {
-	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_PIPELINE_TABLE)
+func CollectPipelineSteps(taskCtx plugin.SubTaskContext) errors.Error {
+	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_PIPELINE_STEPS_TABLE)
+
 	collectorWithState, err := helper.NewApiCollectorWithState(*rawDataSubTaskArgs, data.CreatedDateAfter)
 	if err != nil {
 		return err
 	}
 
+	iterator, err := GetPipelinesIterator(taskCtx, collectorWithState)
+	if err != nil {
+		return err
+	}
+	defer iterator.Close()
+
 	err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
 		ApiClient:   data.ApiClient,
-		PageSize:    50,
+		PageSize:    100,
 		Incremental: collectorWithState.IsIncremental(),
-		UrlTemplate: "repositories/{{ .Params.FullName }}/pipelines/",
-		Query: GetQueryCreatedAndUpdated(
-			`values.uuid,values.type,values.state.name,values.state.result.name,values.state.result.type,values.state.stage.name,values.state.stage.type,`+
-				`values.target.ref_name,values.target.commit.hash,`+
-				`values.created_on,values.completed_on,values.duration_in_seconds,values.links.self`,
-			collectorWithState),
-		ResponseParser: GetRawMessageFromResponse,
+		Input:       iterator,
+		UrlTemplate: "repositories/{{ .Params.FullName }}/pipelines/{{ .Input.BitbucketId }}/steps/",
+		Query: GetQueryFields(
+			`values.type,values.name,values.uuid,values.pipeline.uuid,values.trigger.type,` +
+				`values.state.name,values.state.result.name,values.maxTime,values.started_on,` +
+				`values.completed_on,values.duration_in_seconds,values.build_seconds_used,values.run_number,` +
+				`page,pagelen,size`),
 		GetTotalPages:  GetTotalPagesFromResponse,
+		ResponseParser: GetRawMessageFromResponse,
 	})
 	if err != nil {
 		return err
 	}
-
 	return collectorWithState.Execute()
 }
diff --git a/backend/plugins/bitbucket/tasks/pipeline_convertor.go b/backend/plugins/bitbucket/tasks/pipeline_steps_convertor.go
similarity index 51%
copy from backend/plugins/bitbucket/tasks/pipeline_convertor.go
copy to backend/plugins/bitbucket/tasks/pipeline_steps_convertor.go
index 83a293983..3ea42a2ef 100644
--- a/backend/plugins/bitbucket/tasks/pipeline_convertor.go
+++ b/backend/plugins/bitbucket/tasks/pipeline_steps_convertor.go
@@ -27,72 +27,94 @@ import (
 	"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
 	"github.com/apache/incubator-devlake/plugins/bitbucket/models"
 	"reflect"
-	"time"
 )
 
-var ConvertPipelineMeta = plugin.SubTaskMeta{
-	Name:             "convertPipelines",
-	EntryPoint:       ConvertPipelines,
+var ConvertPipelineStepMeta = plugin.SubTaskMeta{
+	Name:             "convertPipelineSteps",
+	EntryPoint:       ConvertPipelineSteps,
 	EnabledByDefault: true,
 	Description:      "Convert tool layer table bitbucket_pipeline into domain layer table pipeline",
-	DomainTypes:      []string{plugin.DOMAIN_TYPE_CROSS},
+	DomainTypes:      []string{plugin.DOMAIN_TYPE_CICD},
 }
 
-func ConvertPipelines(taskCtx plugin.SubTaskContext) errors.Error {
-	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_PIPELINE_TABLE)
+func ConvertPipelineSteps(taskCtx plugin.SubTaskContext) errors.Error {
+	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_PIPELINE_STEPS_TABLE)
 	db := taskCtx.GetDal()
 
-	cursor, err := db.Cursor(dal.From(models.BitbucketPipeline{}))
+	deploymentPattern := data.Options.DeploymentPattern
+	productionPattern := data.Options.ProductionPattern
+	regexEnricher := api.NewRegexEnricher()
+	err := regexEnricher.AddRegexp(deploymentPattern, productionPattern)
+	if err != nil {
+		return err
+	}
+
+	cursor, err := db.Cursor(dal.From(models.BitbucketPipelineStep{}))
 	if err != nil {
 		return err
 	}
 	defer cursor.Close()
 
+	pipelineStepIdGen := didgen.NewDomainIdGenerator(&models.BitbucketPipelineStep{})
 	pipelineIdGen := didgen.NewDomainIdGenerator(&models.BitbucketPipeline{})
 
 	converter, err := api.NewDataConverter(api.DataConverterArgs{
-		InputRowType:       reflect.TypeOf(models.BitbucketPipeline{}),
+		InputRowType:       reflect.TypeOf(models.BitbucketPipelineStep{}),
 		Input:              cursor,
 		RawDataSubTaskArgs: *rawDataSubTaskArgs,
 		Convert: func(inputRow interface{}) ([]interface{}, errors.Error) {
-			bitbucketPipeline := inputRow.(*models.BitbucketPipeline)
+			bitbucketPipelineStep := inputRow.(*models.BitbucketPipelineStep)
 
-			createdAt := time.Now()
-			if bitbucketPipeline.BitbucketCreatedOn != nil {
-				createdAt = *bitbucketPipeline.BitbucketCreatedOn
-			}
-			results := make([]interface{}, 0, 2)
-			domainPipelineCommit := &devops.CiCDPipelineCommit{
-				PipelineId: pipelineIdGen.Generate(data.Options.ConnectionId, bitbucketPipeline.BitbucketId),
-				RepoId: didgen.NewDomainIdGenerator(&models.BitbucketRepo{}).
-					Generate(bitbucketPipeline.ConnectionId, bitbucketPipeline.RepoId),
-				CommitSha: bitbucketPipeline.CommitSha,
-				Branch:    bitbucketPipeline.RefName,
-			}
-			domainPipeline := &devops.CICDPipeline{
+			domainTask := &devops.CICDTask{
 				DomainEntity: domainlayer.DomainEntity{
-					Id: pipelineIdGen.Generate(data.Options.ConnectionId, bitbucketPipeline.BitbucketId),
+					Id: pipelineStepIdGen.Generate(data.Options.ConnectionId, bitbucketPipelineStep.BitbucketId),
 				},
-				Name: didgen.NewDomainIdGenerator(&models.BitbucketPipeline{}).
-					Generate(data.Options.ConnectionId, bitbucketPipeline.RefName),
+				Name:       bitbucketPipelineStep.Name,
+				PipelineId: pipelineIdGen.Generate(data.Options.ConnectionId, bitbucketPipelineStep.PipelineId),
 				Result: devops.GetResult(&devops.ResultRule{
 					Failed:  []string{models.FAILED, models.ERROR, models.UNDEPLOYED},
 					Abort:   []string{models.STOPPED, models.SKIPPED},
 					Success: []string{models.SUCCESSFUL, models.COMPLETED},
 					Manual:  []string{models.PAUSED, models.HALTED},
 					Default: devops.SUCCESS,
-				}, bitbucketPipeline.Result),
+				}, bitbucketPipelineStep.Result),
 				Status: devops.GetStatus(&devops.StatusRule{
 					InProgress: []string{models.IN_PROGRESS, models.PENDING, models.BUILDING},
 					Default:    devops.DONE,
-				}, bitbucketPipeline.Status),
-				Type:         "CI/CD",
-				CreatedDate:  createdAt,
-				DurationSec:  bitbucketPipeline.DurationInSeconds,
-				FinishedDate: bitbucketPipeline.BitbucketCompleteOn,
+				}, bitbucketPipelineStep.State),
 			}
-			results = append(results, domainPipelineCommit, domainPipeline)
-			return results, nil
+			if bitbucketPipelineStep.StartedOn != nil {
+				domainTask.StartedDate = *bitbucketPipelineStep.StartedOn
+			}
+			// rebuild the FinishedDate
+			if domainTask.Status == devops.DONE {
+				domainTask.FinishedDate = bitbucketPipelineStep.CompletedOn
+				domainTask.DurationSec = uint64(bitbucketPipelineStep.DurationInSeconds)
+			}
+
+			bitbucketDeployment := &models.BitbucketDeployment{}
+			deploymentErr := db.First(bitbucketDeployment, dal.Where(`step_id=?`, bitbucketPipelineStep.BitbucketId))
+			if deploymentErr == nil {
+				domainTask.Type = devops.DEPLOYMENT
+				if bitbucketDeployment.EnvironmentType == `Production` {
+					domainTask.Environment = devops.PRODUCTION
+				} else if bitbucketDeployment.EnvironmentType == `Staging` {
+					domainTask.Environment = devops.STAGING
+				} else if bitbucketDeployment.EnvironmentType == `Test` {
+					domainTask.Environment = devops.TESTING
+				}
+			}
+			if domainTask.Type == `` {
+				domainTask.Type = regexEnricher.GetEnrichResult(deploymentPattern, bitbucketPipelineStep.Name, devops.DEPLOYMENT)
+				if domainTask.Type != `` {
+					// only check env after type recognized
+					domainTask.Environment = regexEnricher.GetEnrichResult(productionPattern, bitbucketPipelineStep.Name, devops.PRODUCTION)
+				}
+			}
+
+			return []interface{}{
+				domainTask,
+			}, nil
 		},
 	})
 
diff --git a/backend/plugins/bitbucket/tasks/pipeline_steps_extractor.go b/backend/plugins/bitbucket/tasks/pipeline_steps_extractor.go
new file mode 100644
index 000000000..e89c60587
--- /dev/null
+++ b/backend/plugins/bitbucket/tasks/pipeline_steps_extractor.go
@@ -0,0 +1,104 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+	"encoding/json"
+	"github.com/apache/incubator-devlake/core/errors"
+	"github.com/apache/incubator-devlake/core/plugin"
+	helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+	"github.com/apache/incubator-devlake/plugins/bitbucket/models"
+	"time"
+)
+
+var _ plugin.SubTaskEntryPoint = ExtractPipelineSteps
+
+var ExtractPipelineStepsMeta = plugin.SubTaskMeta{
+	Name:             "ExtractPipelineSteps",
+	EntryPoint:       ExtractPipelineSteps,
+	EnabledByDefault: true,
+	Description:      "Extract raw data into tool layer table bitbucket_pipeline_steps",
+	DomainTypes:      []string{plugin.DOMAIN_TYPE_CICD},
+}
+
+type BitbucketPipelineStepsResponse struct {
+	Type     string `json:"type"`
+	Name     string `json:"name"`
+	Uuid     string `json:"uuid"`
+	Pipeline struct {
+		//Type string `json:"type"`
+		Uuid string `json:"uuid"`
+	} `json:"pipeline"`
+	Trigger struct {
+		Type string `json:"type"`
+	} `json:"trigger"`
+	State struct {
+		Name string `json:"name"`
+		//Type   string `json:"type"`
+		Result struct {
+			Name string `json:"name"`
+			//Type string `json:"type"`
+		} `json:"result"`
+	} `json:"state"`
+	MaxTime           int        `json:"maxTime"`
+	StartedOn         *time.Time `json:"started_on"`
+	CompletedOn       *time.Time `json:"completed_on"`
+	DurationInSeconds int        `json:"duration_in_seconds"`
+	BuildSecondsUsed  int        `json:"build_seconds_used"`
+	RunNumber         int        `json:"run_number"`
+}
+
+func ExtractPipelineSteps(taskCtx plugin.SubTaskContext) errors.Error {
+	rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx, RAW_PIPELINE_STEPS_TABLE)
+
+	extractor, err := helper.NewApiExtractor(helper.ApiExtractorArgs{
+		RawDataSubTaskArgs: *rawDataSubTaskArgs,
+
+		Extract: func(resData *helper.RawData) ([]interface{}, errors.Error) {
+			apiPipelineStep := &BitbucketPipelineStepsResponse{}
+			err := errors.Convert(json.Unmarshal(resData.Data, apiPipelineStep))
+			if err != nil {
+				return nil, err
+			}
+
+			bitbucketStep := &models.BitbucketPipelineStep{
+				ConnectionId:      data.Options.ConnectionId,
+				BitbucketId:       apiPipelineStep.Uuid,
+				PipelineId:        apiPipelineStep.Pipeline.Uuid,
+				Name:              apiPipelineStep.Name,
+				Trigger:           apiPipelineStep.Trigger.Type,
+				State:             apiPipelineStep.State.Name,
+				Result:            apiPipelineStep.State.Result.Name,
+				MaxTime:           apiPipelineStep.MaxTime,
+				StartedOn:         apiPipelineStep.StartedOn,
+				CompletedOn:       apiPipelineStep.CompletedOn,
+				DurationInSeconds: apiPipelineStep.DurationInSeconds,
+				BuildSecondsUsed:  apiPipelineStep.BuildSecondsUsed,
+				RunNumber:         apiPipelineStep.RunNumber,
+			}
+			return []interface{}{
+				bitbucketStep,
+			}, nil
+		},
+	})
+	if err != nil {
+		return err
+	}
+
+	return extractor.Execute()
+}
diff --git a/backend/plugins/bitbucket/tasks/pr_collector.go b/backend/plugins/bitbucket/tasks/pr_collector.go
index 671f320db..5b84c0309 100644
--- a/backend/plugins/bitbucket/tasks/pr_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_collector.go
@@ -52,7 +52,8 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error {
 			`values.id,values.comment_count,values.type,values.state,values.title,values.description,`+
 				`values.merge_commit.hash,values.merge_commit.date,values.links.html,values.author,values.created_on,values.updated_on,`+
 				`values.destination.branch.name,values.destination.commit.hash,values.destination.repository.full_name,`+
-				`values.source.branch.name,values.source.commit.hash,values.source.repository.full_name`,
+				`values.source.branch.name,values.source.commit.hash,values.source.repository.full_name,`+
+				`page,pagelen,size`,
 			collectorWithState),
 		GetTotalPages:  GetTotalPagesFromResponse,
 		ResponseParser: GetRawMessageFromResponse,
diff --git a/backend/plugins/bitbucket/tasks/pr_comment_collector.go b/backend/plugins/bitbucket/tasks/pr_comment_collector.go
index 53b6f4ab6..1e5756778 100644
--- a/backend/plugins/bitbucket/tasks/pr_comment_collector.go
+++ b/backend/plugins/bitbucket/tasks/pr_comment_collector.go
@@ -19,7 +19,7 @@ package tasks
 
 import (
 	"github.com/apache/incubator-devlake/core/errors"
-	plugin "github.com/apache/incubator-devlake/core/plugin"
+	"github.com/apache/incubator-devlake/core/plugin"
 	helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
 )
 
@@ -48,12 +48,14 @@ func CollectApiPullRequestsComments(taskCtx plugin.SubTaskContext) errors.Error
 	defer iterator.Close()
 
 	err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-		ApiClient:      data.ApiClient,
-		PageSize:       100,
-		Incremental:    collectorWithState.IsIncremental(),
-		Input:          iterator,
-		UrlTemplate:    "repositories/{{ .Params.FullName }}/pullrequests/{{ .Input.BitbucketId }}/comments",
-		Query:          GetQueryFields(`values.id,values.type,values.created_on,values.updated_on,values.content.raw,values.pullrequest.id,values.user`),
+		ApiClient:   data.ApiClient,
+		PageSize:    100,
+		Incremental: collectorWithState.IsIncremental(),
+		Input:       iterator,
+		UrlTemplate: "repositories/{{ .Params.FullName }}/pullrequests/{{ .Input.BitbucketId }}/comments",
+		Query: GetQueryFields(
+			`values.id,values.type,values.created_on,values.updated_on,values.content.raw,values.pullrequest.id,values.user,` +
+				`page,pagelen,size`),
 		GetTotalPages:  GetTotalPagesFromResponse,
 		ResponseParser: GetRawMessageFromResponse,
 	})