You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by li...@apache.org on 2023/01/03 13:04:13 UTC
[incubator-devlake] branch main updated: gitextractor should delete old data (#4089)
This is an automated email from the ASF dual-hosted git repository.
likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 5cfe8f0fe gitextractor should delete old data (#4089)
5cfe8f0fe is described below
commit 5cfe8f0febced111e125dc752fa1bbe02b16d548
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Tue Jan 3 21:04:08 2023 +0800
gitextractor should delete old data (#4089)
* fix: gitext doesn't delete previous data
* fix: previous running task should be set to FAILED on startup
---
plugins/gitextractor/impl/impl.go | 2 +-
plugins/gitextractor/main.go | 2 +-
plugins/gitextractor/store/database.go | 32 ++++++++++++++++++++++++++------
services/pipeline.go | 8 ++++++++
4 files changed, 36 insertions(+), 8 deletions(-)
diff --git a/plugins/gitextractor/impl/impl.go b/plugins/gitextractor/impl/impl.go
index c9ffee992..65fb4674b 100644
--- a/plugins/gitextractor/impl/impl.go
+++ b/plugins/gitextractor/impl/impl.go
@@ -64,7 +64,7 @@ func (plugin GitExtractor) PrepareTaskData(taskCtx core.TaskContext, options map
if err := op.Valid(); err != nil {
return nil, err
}
- storage := store.NewDatabase(taskCtx, op.Url)
+ storage := store.NewDatabase(taskCtx, op.RepoId)
repo, err := NewGitRepo(taskCtx.GetLogger(), storage, op)
if err != nil {
return nil, err
diff --git a/plugins/gitextractor/main.go b/plugins/gitextractor/main.go
index d5c94feab..b38b4caef 100644
--- a/plugins/gitextractor/main.go
+++ b/plugins/gitextractor/main.go
@@ -72,7 +72,7 @@ func main() {
}
// If we didn't specify output or dburl, we will use db by default
if storage == nil {
- storage = store.NewDatabase(basicRes, *url)
+ storage = store.NewDatabase(basicRes, *id)
}
defer storage.Close()
ctx := context.Background()
diff --git a/plugins/gitextractor/store/database.go b/plugins/gitextractor/store/database.go
index f34212b3c..257939bec 100644
--- a/plugins/gitextractor/store/database.go
+++ b/plugins/gitextractor/store/database.go
@@ -18,10 +18,11 @@ limitations under the License.
package store
import (
- "fmt"
- "github.com/apache/incubator-devlake/errors"
"reflect"
+ "github.com/apache/incubator-devlake/errors"
+
+ "github.com/apache/incubator-devlake/models/common"
"github.com/apache/incubator-devlake/models/domainlayer"
"github.com/apache/incubator-devlake/models/domainlayer/code"
"github.com/apache/incubator-devlake/models/domainlayer/crossdomain"
@@ -33,24 +34,35 @@ const BathSize = 100
type Database struct {
driver *helper.BatchSaveDivider
+ table string
+ params string
}
-func NewDatabase(basicRes core.BasicRes, repoUrl string) *Database {
- database := new(Database)
+func NewDatabase(basicRes core.BasicRes, repoId string) *Database {
+ database := &Database{
+ table: "gitextractor",
+ params: repoId,
+ }
database.driver = helper.NewBatchSaveDivider(
basicRes,
BathSize,
- "gitextractor",
- fmt.Sprintf(`{"RepoUrl": "%s"}`, repoUrl),
+ database.table,
+ database.params,
)
return database
}
+func (d *Database) updateRawDataFields(rawData *common.RawDataOrigin) {
+ rawData.RawDataTable = d.table
+ rawData.RawDataParams = d.params
+}
+
func (d *Database) RepoCommits(repoCommit *code.RepoCommit) errors.Error {
batch, err := d.driver.ForType(reflect.TypeOf(repoCommit))
if err != nil {
return err
}
+ d.updateRawDataFields(&repoCommit.RawDataOrigin)
return batch.Add(repoCommit)
}
@@ -65,6 +77,7 @@ func (d *Database) Commits(commit *code.Commit) errors.Error {
if err != nil {
return err
}
+ d.updateRawDataFields(&account.RawDataOrigin)
err = accountBatch.Add(account)
if err != nil {
return err
@@ -73,6 +86,7 @@ func (d *Database) Commits(commit *code.Commit) errors.Error {
if err != nil {
return err
}
+ d.updateRawDataFields(&account.RawDataOrigin)
return commitBatch.Add(commit)
}
@@ -81,6 +95,7 @@ func (d *Database) Refs(ref *code.Ref) errors.Error {
if err != nil {
return err
}
+ d.updateRawDataFields(&ref.RawDataOrigin)
return batch.Add(ref)
}
@@ -89,6 +104,7 @@ func (d *Database) CommitFiles(file *code.CommitFile) errors.Error {
if err != nil {
return err
}
+ d.updateRawDataFields(&file.RawDataOrigin)
return batch.Add(file)
}
@@ -97,6 +113,7 @@ func (d *Database) CommitFileComponents(commitFileComponent *code.CommitFileComp
if err != nil {
return err
}
+ d.updateRawDataFields(&commitFileComponent.RawDataOrigin)
return batch.Add(commitFileComponent)
}
@@ -105,6 +122,7 @@ func (d *Database) RepoSnapshot(snapshotElement *code.RepoSnapshot) errors.Error
if err != nil {
return err
}
+ d.updateRawDataFields(&snapshotElement.RawDataOrigin)
return batch.Add(snapshotElement)
}
@@ -113,6 +131,7 @@ func (d *Database) CommitLineChange(commitLineChange *code.CommitLineChange) err
if err != nil {
return err
}
+ d.updateRawDataFields(&commitLineChange.RawDataOrigin)
return batch.Add(commitLineChange)
}
@@ -125,6 +144,7 @@ func (d *Database) CommitParents(pp []*code.CommitParent) errors.Error {
return err
}
for _, cp := range pp {
+ d.updateRawDataFields(&cp.RawDataOrigin)
err = batch.Add(cp)
if err != nil {
return err
diff --git a/services/pipeline.go b/services/pipeline.go
index 341529600..92cc643a0 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -82,6 +82,14 @@ func pipelineServiceInit() {
if err != nil {
panic(err)
}
+ err = db.UpdateColumn(
+ &models.Task{},
+ "status", models.TASK_FAILED,
+ dal.Where("status = ?", models.TASK_RUNNING),
+ )
+ if err != nil {
+ panic(err)
+ }
}
err := ReloadBlueprints(cronManager)