You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2023/02/09 06:27:58 UTC
[incubator-devlake] branch release-v0.15 updated: fix: dbt error not return (#4237) (#4366)
This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch release-v0.15
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/release-v0.15 by this push:
new 1bec691fc fix: dbt error not return (#4237) (#4366)
1bec691fc is described below
commit 1bec691fcf0927c8b3c1a2fbf9dbb167d48de6d4
Author: long2ice <ji...@merico.dev>
AuthorDate: Thu Feb 9 14:27:54 2023 +0800
fix: dbt error not return (#4237) (#4366)
---
plugins/dbt/tasks/convertor.go | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index da4169563..8c3d7ff50 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -45,6 +45,7 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
projectVars := data.Options.ProjectVars
args := data.Options.Args
failFast := data.Options.FailFast
+ threads := data.Options.Threads
noVersionCheck := data.Options.NoVersionCheck
excludeModels := data.Options.ExcludeModels
selector := data.Options.Selector
@@ -118,7 +119,7 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
log.Info("dbt deps run script: ", cmdDeps)
// prevent zombie process
defer func() {
- if err := errors.Convert(cmdDeps.Wait()); err != nil {
+ if err = errors.Convert(cmdDeps.Wait()); err != nil {
log.Error(nil, "dbt deps run cmd.cmdDeps() error")
}
}()
@@ -147,6 +148,10 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
if failFast {
dbtExecParams = append(dbtExecParams, "--fail-fast")
}
+ if threads != 0 {
+ dbtExecParams = append(dbtExecParams, "--threads")
+ dbtExecParams = append(dbtExecParams, strconv.Itoa(threads))
+ }
if noVersionCheck {
dbtExecParams = append(dbtExecParams, "--no-version-check")
}
@@ -196,17 +201,18 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
}
// prevent zombie process
+ var errStr string
defer func() {
- err := errors.Convert(cmd.Wait())
+ err = errors.Convert(cmd.Wait())
if err != nil {
log.Error(err, "The DBT project run failed!")
+ err = errors.SubtaskErr.New(errStr)
} else {
log.Info("The DBT project run ended.")
}
}()
scanner := bufio.NewScanner(stdout)
- var errStr string
for scanner.Scan() {
line := scanner.Text()
log.Info(line)
@@ -217,7 +223,7 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
taskCtx.IncProgress(1)
}
}
- if err := errors.Convert(scanner.Err()); err != nil {
+ if err = errors.Convert(scanner.Err()); err != nil {
log.Error(err, "dbt read stdout failed.")
return err
}
@@ -228,7 +234,7 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
return errors.Convert(closeErr)
}
- return nil
+ return err
}
var DbtConverterMeta = core.SubTaskMeta{