You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2023/01/06 08:32:42 UTC

[incubator-devlake] branch release-v0.14 updated: fix: dbt zombie process, set dbt default threads=1 (#4132)

This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch release-v0.14
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v0.14 by this push:
     new f65db27f2 fix: dbt zombie process, set dbt default threads=1 (#4132)
f65db27f2 is described below

commit f65db27f2329695fb52bb738aa882e3644c8ab4c
Author: abeizn <zi...@merico.dev>
AuthorDate: Fri Jan 6 16:30:10 2023 +0800

    fix: dbt zombie process, set dbt default threads=1 (#4132)
---
 plugins/dbt/dbt.go             |  2 --
 plugins/dbt/tasks/convertor.go | 50 ++++++++++++++++++++++++++++++++++--------
 2 files changed, 41 insertions(+), 11 deletions(-)

diff --git a/plugins/dbt/dbt.go b/plugins/dbt/dbt.go
index 1a5e4b7b3..3fc6f5ef9 100644
--- a/plugins/dbt/dbt.go
+++ b/plugins/dbt/dbt.go
@@ -87,7 +87,6 @@ func main() {
 	failFast := dbtCmd.Flags().BoolP("failFast", "", false, "dbt fail fast")
 	profilesPath := dbtCmd.Flags().StringP("profilesPath", "", "/Users/abeizn/.dbt", "dbt profiles path")
 	profile := dbtCmd.Flags().StringP("profile", "", "default", "dbt profile")
-	threads := dbtCmd.Flags().IntP("threads", "", 1, "dbt threads")
 	noVersionCheck := dbtCmd.Flags().BoolP("noVersionCheck", "", false, "dbt no version check")
 	excludeModels := dbtCmd.Flags().StringSliceP("excludeModels", "", []string{}, "dbt exclude models")
 	selector := dbtCmd.Flags().StringP("selector", "", "", "dbt selector")
@@ -117,7 +116,6 @@ func main() {
 			"failFast":       *failFast,
 			"profilesPath":   *profilesPath,
 			"profile":        *profile,
-			"threads":        *threads,
 			"noVersionCheck": *noVersionCheck,
 			"excludeModels":  *excludeModels,
 			"selector":       *selector,
diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index 63a92ecd0..da4169563 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -18,6 +18,7 @@ limitations under the License.
 package tasks
 
 import (
+	"bufio"
 	"encoding/json"
 	"net"
 	"net/url"
@@ -44,7 +45,6 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 	projectVars := data.Options.ProjectVars
 	args := data.Options.Args
 	failFast := data.Options.FailFast
-	threads := data.Options.Threads
 	noVersionCheck := data.Options.NoVersionCheck
 	excludeModels := data.Options.ExcludeModels
 	selector := data.Options.Selector
@@ -127,7 +127,8 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 		}
 
 	}
-	dbtExecParams := []string{"dbt", "run", "--project-dir", projectPath}
+	//set default threads = 1, prevent dbt threads can not release, so occur zombie process
+	dbtExecParams := []string{"dbt", "run", "--project-dir", projectPath, "--threads", "1"}
 	if projectVars != nil {
 		jsonProjectVars, err := json.Marshal(projectVars)
 		if err != nil {
@@ -146,10 +147,6 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 	if failFast {
 		dbtExecParams = append(dbtExecParams, "--fail-fast")
 	}
-	if threads != 0 {
-		dbtExecParams = append(dbtExecParams, "--threads")
-		dbtExecParams = append(dbtExecParams, strconv.Itoa(threads))
-	}
 	if noVersionCheck {
 		dbtExecParams = append(dbtExecParams, "--no-version-check")
 	}
@@ -189,12 +186,47 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 	cmd := exec.Command(dbtExecParams[0], dbtExecParams[1:]...)
 	log.Info("dbt run script: ", cmd)
 
-	stdout, stdoutErr := cmd.CombinedOutput()
+	stdout, stdoutErr := cmd.StdoutPipe()
 	if stdoutErr != nil {
 		return errors.Convert(stdoutErr)
 	}
-	log.Info("dbt run log: ", string(stdout))
-	log.Info("End of dbt program execution.")
+
+	if err = errors.Convert(cmd.Start()); err != nil {
+		return err
+	}
+
+	// prevent zombie process
+	defer func() {
+		err := errors.Convert(cmd.Wait())
+		if err != nil {
+			log.Error(err, "The DBT project run failed!")
+		} else {
+			log.Info("The DBT project run ended.")
+		}
+	}()
+
+	scanner := bufio.NewScanner(stdout)
+	var errStr string
+	for scanner.Scan() {
+		line := scanner.Text()
+		log.Info(line)
+		if strings.Contains(line, "Encountered an error") || errStr != "" {
+			errStr += line + "\n"
+		}
+		if strings.Contains(line, "of") && strings.Contains(line, "OK") {
+			taskCtx.IncProgress(1)
+		}
+	}
+	if err := errors.Convert(scanner.Err()); err != nil {
+		log.Error(err, "dbt read stdout failed.")
+		return err
+	}
+
+	// close stdout
+	if closeErr := stdout.Close(); closeErr != nil && err == nil {
+		log.Error(closeErr, "dbt close stdout failed.")
+		return errors.Convert(closeErr)
+	}
 
 	return nil
 }