You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by li...@apache.org on 2023/01/03 13:04:13 UTC

[incubator-devlake] branch main updated: gitextractor should delete old data (#4089)

This is an automated email from the ASF dual-hosted git repository.

likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cfe8f0fe gitextractor should delete old data (#4089)
5cfe8f0fe is described below

commit 5cfe8f0febced111e125dc752fa1bbe02b16d548
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Tue Jan 3 21:04:08 2023 +0800

    gitextractor should delete old data (#4089)
    
    * fix: gitext doesn't delete previous data
    
    * fix: previous running task should be set to FAILED on startup
---
 plugins/gitextractor/impl/impl.go      |  2 +-
 plugins/gitextractor/main.go           |  2 +-
 plugins/gitextractor/store/database.go | 32 ++++++++++++++++++++++++++------
 services/pipeline.go                   |  8 ++++++++
 4 files changed, 36 insertions(+), 8 deletions(-)

diff --git a/plugins/gitextractor/impl/impl.go b/plugins/gitextractor/impl/impl.go
index c9ffee992..65fb4674b 100644
--- a/plugins/gitextractor/impl/impl.go
+++ b/plugins/gitextractor/impl/impl.go
@@ -64,7 +64,7 @@ func (plugin GitExtractor) PrepareTaskData(taskCtx core.TaskContext, options map
 	if err := op.Valid(); err != nil {
 		return nil, err
 	}
-	storage := store.NewDatabase(taskCtx, op.Url)
+	storage := store.NewDatabase(taskCtx, op.RepoId)
 	repo, err := NewGitRepo(taskCtx.GetLogger(), storage, op)
 	if err != nil {
 		return nil, err
diff --git a/plugins/gitextractor/main.go b/plugins/gitextractor/main.go
index d5c94feab..b38b4caef 100644
--- a/plugins/gitextractor/main.go
+++ b/plugins/gitextractor/main.go
@@ -72,7 +72,7 @@ func main() {
 	}
 	// If we didn't specify output or dburl, we will use db by default
 	if storage == nil {
-		storage = store.NewDatabase(basicRes, *url)
+		storage = store.NewDatabase(basicRes, *id)
 	}
 	defer storage.Close()
 	ctx := context.Background()
diff --git a/plugins/gitextractor/store/database.go b/plugins/gitextractor/store/database.go
index f34212b3c..257939bec 100644
--- a/plugins/gitextractor/store/database.go
+++ b/plugins/gitextractor/store/database.go
@@ -18,10 +18,11 @@ limitations under the License.
 package store
 
 import (
-	"fmt"
-	"github.com/apache/incubator-devlake/errors"
 	"reflect"
 
+	"github.com/apache/incubator-devlake/errors"
+
+	"github.com/apache/incubator-devlake/models/common"
 	"github.com/apache/incubator-devlake/models/domainlayer"
 	"github.com/apache/incubator-devlake/models/domainlayer/code"
 	"github.com/apache/incubator-devlake/models/domainlayer/crossdomain"
@@ -33,24 +34,35 @@ const BathSize = 100
 
 type Database struct {
 	driver *helper.BatchSaveDivider
+	table  string
+	params string
 }
 
-func NewDatabase(basicRes core.BasicRes, repoUrl string) *Database {
-	database := new(Database)
+func NewDatabase(basicRes core.BasicRes, repoId string) *Database {
+	database := &Database{
+		table:  "gitextractor",
+		params: repoId,
+	}
 	database.driver = helper.NewBatchSaveDivider(
 		basicRes,
 		BathSize,
-		"gitextractor",
-		fmt.Sprintf(`{"RepoUrl": "%s"}`, repoUrl),
+		database.table,
+		database.params,
 	)
 	return database
 }
 
+func (d *Database) updateRawDataFields(rawData *common.RawDataOrigin) {
+	rawData.RawDataTable = d.table
+	rawData.RawDataParams = d.params
+}
+
 func (d *Database) RepoCommits(repoCommit *code.RepoCommit) errors.Error {
 	batch, err := d.driver.ForType(reflect.TypeOf(repoCommit))
 	if err != nil {
 		return err
 	}
+	d.updateRawDataFields(&repoCommit.RawDataOrigin)
 	return batch.Add(repoCommit)
 }
 
@@ -65,6 +77,7 @@ func (d *Database) Commits(commit *code.Commit) errors.Error {
 	if err != nil {
 		return err
 	}
+	d.updateRawDataFields(&account.RawDataOrigin)
 	err = accountBatch.Add(account)
 	if err != nil {
 		return err
@@ -73,6 +86,7 @@ func (d *Database) Commits(commit *code.Commit) errors.Error {
 	if err != nil {
 		return err
 	}
+	d.updateRawDataFields(&account.RawDataOrigin)
 	return commitBatch.Add(commit)
 }
 
@@ -81,6 +95,7 @@ func (d *Database) Refs(ref *code.Ref) errors.Error {
 	if err != nil {
 		return err
 	}
+	d.updateRawDataFields(&ref.RawDataOrigin)
 	return batch.Add(ref)
 }
 
@@ -89,6 +104,7 @@ func (d *Database) CommitFiles(file *code.CommitFile) errors.Error {
 	if err != nil {
 		return err
 	}
+	d.updateRawDataFields(&file.RawDataOrigin)
 	return batch.Add(file)
 }
 
@@ -97,6 +113,7 @@ func (d *Database) CommitFileComponents(commitFileComponent *code.CommitFileComp
 	if err != nil {
 		return err
 	}
+	d.updateRawDataFields(&commitFileComponent.RawDataOrigin)
 	return batch.Add(commitFileComponent)
 }
 
@@ -105,6 +122,7 @@ func (d *Database) RepoSnapshot(snapshotElement *code.RepoSnapshot) errors.Error
 	if err != nil {
 		return err
 	}
+	d.updateRawDataFields(&snapshotElement.RawDataOrigin)
 	return batch.Add(snapshotElement)
 }
 
@@ -113,6 +131,7 @@ func (d *Database) CommitLineChange(commitLineChange *code.CommitLineChange) err
 	if err != nil {
 		return err
 	}
+	d.updateRawDataFields(&commitLineChange.RawDataOrigin)
 	return batch.Add(commitLineChange)
 }
 
@@ -125,6 +144,7 @@ func (d *Database) CommitParents(pp []*code.CommitParent) errors.Error {
 		return err
 	}
 	for _, cp := range pp {
+		d.updateRawDataFields(&cp.RawDataOrigin)
 		err = batch.Add(cp)
 		if err != nil {
 			return err
diff --git a/services/pipeline.go b/services/pipeline.go
index 341529600..92cc643a0 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -82,6 +82,14 @@ func pipelineServiceInit() {
 		if err != nil {
 			panic(err)
 		}
+		err = db.UpdateColumn(
+			&models.Task{},
+			"status", models.TASK_FAILED,
+			dal.Where("status = ?", models.TASK_RUNNING),
+		)
+		if err != nil {
+			panic(err)
+		}
 	}
 
 	err := ReloadBlueprints(cronManager)