You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by zh...@apache.org on 2023/01/06 08:30:14 UTC
[incubator-devlake] branch main updated: fix: dbt zombie process, set dbt default threads=1 (#4132)
This is an automated email from the ASF dual-hosted git repository.
zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new e630f8d64 fix: dbt zombie process, set dbt default threads=1 (#4132)
e630f8d64 is described below
commit e630f8d645108a2ce76c1d0475d6ede05c35f285
Author: abeizn <zi...@merico.dev>
AuthorDate: Fri Jan 6 16:30:10 2023 +0800
fix: dbt zombie process, set dbt default threads=1 (#4132)
---
plugins/dbt/dbt.go | 2 --
plugins/dbt/tasks/convertor.go | 50 ++++++++++++++++++++++++++++++++++--------
2 files changed, 41 insertions(+), 11 deletions(-)
diff --git a/plugins/dbt/dbt.go b/plugins/dbt/dbt.go
index 910567b28..0dd882180 100644
--- a/plugins/dbt/dbt.go
+++ b/plugins/dbt/dbt.go
@@ -38,7 +38,6 @@ func main() {
failFast := dbtCmd.Flags().BoolP("failFast", "", false, "dbt fail fast")
profilesPath := dbtCmd.Flags().StringP("profilesPath", "", "/Users/abeizn/.dbt", "dbt profiles path")
profile := dbtCmd.Flags().StringP("profile", "", "default", "dbt profile")
- threads := dbtCmd.Flags().IntP("threads", "", 1, "dbt threads")
noVersionCheck := dbtCmd.Flags().BoolP("noVersionCheck", "", false, "dbt no version check")
excludeModels := dbtCmd.Flags().StringSliceP("excludeModels", "", []string{}, "dbt exclude models")
selector := dbtCmd.Flags().StringP("selector", "", "", "dbt selector")
@@ -68,7 +67,6 @@ func main() {
"failFast": *failFast,
"profilesPath": *profilesPath,
"profile": *profile,
- "threads": *threads,
"noVersionCheck": *noVersionCheck,
"excludeModels": *excludeModels,
"selector": *selector,
diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index 63a92ecd0..da4169563 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -18,6 +18,7 @@ limitations under the License.
package tasks
import (
+ "bufio"
"encoding/json"
"net"
"net/url"
@@ -44,7 +45,6 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
projectVars := data.Options.ProjectVars
args := data.Options.Args
failFast := data.Options.FailFast
- threads := data.Options.Threads
noVersionCheck := data.Options.NoVersionCheck
excludeModels := data.Options.ExcludeModels
selector := data.Options.Selector
@@ -127,7 +127,8 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
}
}
- dbtExecParams := []string{"dbt", "run", "--project-dir", projectPath}
+ //set default threads = 1, prevent dbt threads can not release, so occur zombie process
+ dbtExecParams := []string{"dbt", "run", "--project-dir", projectPath, "--threads", "1"}
if projectVars != nil {
jsonProjectVars, err := json.Marshal(projectVars)
if err != nil {
@@ -146,10 +147,6 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
if failFast {
dbtExecParams = append(dbtExecParams, "--fail-fast")
}
- if threads != 0 {
- dbtExecParams = append(dbtExecParams, "--threads")
- dbtExecParams = append(dbtExecParams, strconv.Itoa(threads))
- }
if noVersionCheck {
dbtExecParams = append(dbtExecParams, "--no-version-check")
}
@@ -189,12 +186,47 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
cmd := exec.Command(dbtExecParams[0], dbtExecParams[1:]...)
log.Info("dbt run script: ", cmd)
- stdout, stdoutErr := cmd.CombinedOutput()
+ stdout, stdoutErr := cmd.StdoutPipe()
if stdoutErr != nil {
return errors.Convert(stdoutErr)
}
- log.Info("dbt run log: ", string(stdout))
- log.Info("End of dbt program execution.")
+
+ if err = errors.Convert(cmd.Start()); err != nil {
+ return err
+ }
+
+ // prevent zombie process
+ defer func() {
+ err := errors.Convert(cmd.Wait())
+ if err != nil {
+ log.Error(err, "The DBT project run failed!")
+ } else {
+ log.Info("The DBT project run ended.")
+ }
+ }()
+
+ scanner := bufio.NewScanner(stdout)
+ var errStr string
+ for scanner.Scan() {
+ line := scanner.Text()
+ log.Info(line)
+ if strings.Contains(line, "Encountered an error") || errStr != "" {
+ errStr += line + "\n"
+ }
+ if strings.Contains(line, "of") && strings.Contains(line, "OK") {
+ taskCtx.IncProgress(1)
+ }
+ }
+ if err := errors.Convert(scanner.Err()); err != nil {
+ log.Error(err, "dbt read stdout failed.")
+ return err
+ }
+
+ // close stdout
+ if closeErr := stdout.Close(); closeErr != nil && err == nil {
+ log.Error(closeErr, "dbt close stdout failed.")
+ return errors.Convert(closeErr)
+ }
return nil
}