You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ma...@apache.org on 2023/01/05 10:07:23 UTC

[incubator-devlake] branch main updated: fix: maybe can fix-2 dbt zombie processes (#4116)

This is an automated email from the ASF dual-hosted git repository.

mappjzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 3aa9dd0b4 fix: maybe can fix-2 dbt zombie processes (#4116)
3aa9dd0b4 is described below

commit 3aa9dd0b4c57f5532f2c1d62875fbdffdbfbe4c8
Author: abeizn <zi...@merico.dev>
AuthorDate: Thu Jan 5 18:07:18 2023 +0800

    fix: maybe can fix-2 dbt zombie processes (#4116)
---
 plugins/dbt/tasks/convertor.go | 36 +++++++++++++++++++++++-------------
 1 file changed, 23 insertions(+), 13 deletions(-)

diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index 7115e660f..542d21c0f 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -115,11 +115,18 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 	defaultPackagesPath := filepath.Join(projectPath, "packages.yml")
 	_, err = errors.Convert01(os.Stat(defaultPackagesPath))
 	if err == nil {
-		cmd := exec.Command("dbt", "deps")
-		err = errors.Convert(cmd.Start())
-		if err != nil {
+		cmdDeps := exec.Command("dbt", "deps")
+		log.Info("dbt deps run script: ", cmdDeps)
+		// prevent zombie process
+		defer func() {
+			if err := errors.Convert(cmdDeps.Wait()); err != nil {
+				log.Error(nil, "dbt deps run cmd.cmdDeps() error")
+			}
+		}()
+		if err = errors.Convert(cmdDeps.Start()); err != nil {
 			return err
 		}
+
 	}
 	dbtExecParams := []string{"dbt", "run", "--project-dir", projectPath}
 	if projectVars != nil {
@@ -182,23 +189,24 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 	}
 	cmd := exec.Command(dbtExecParams[0], dbtExecParams[1:]...)
 	log.Info("dbt run script: ", cmd)
-	stdout, _ := cmd.StdoutPipe()
+
+	stdout, stdoutErr := cmd.StdoutPipe()
+	if stdoutErr != nil {
+		return errors.Convert(stdoutErr)
+	}
+
 	if err = errors.Convert(cmd.Start()); err != nil {
 		return err
 	}
-	// ProcessState contains information about an exited process, available after a call to Wait.
-	defer func() {
-		if !cmd.ProcessState.Success() {
-			log.Error(nil, "dbt run task error, please check!!!")
-		}
-	}()
+	defer stdout.Close()
 
 	// prevent zombie process
 	defer func() {
-		err := errors.Convert(cmd.Wait())
-		if err != nil {
-			log.Error(nil, "dbt run cmd.Wait() error")
+		ProcessState, err := cmd.Process.Wait()
+		if err != nil || !ProcessState.Success() {
+			log.Error(err, "dbt run cmd.Wait() error")
 		}
+		log.Info("End of dbt program execution.")
 	}()
 
 	scanner := bufio.NewScanner(stdout)
@@ -214,11 +222,13 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 		}
 	}
 	if err := errors.Convert(scanner.Err()); err != nil {
+		log.Error(err, "dbt read stdout failed.")
 		return err
 	}
 
 	// close stdout
 	if closeErr := stdout.Close(); closeErr != nil && err == nil {
+		log.Error(closeErr, "dbt close stdout failed.")
 		return errors.Convert(closeErr)
 	}