You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by zh...@apache.org on 2023/01/05 11:44:03 UTC

[incubator-devlake] branch main updated: fix: maybe can fix-3 dbt zombie processes (#4118)

This is an automated email from the ASF dual-hosted git repository.

zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new ddafd61b0 fix: maybe can fix-3 dbt zombie processes (#4118)
ddafd61b0 is described below

commit ddafd61b013bcef82878961ea74aee6cba2d8885
Author: abeizn <zi...@merico.dev>
AuthorDate: Thu Jan 5 19:43:58 2023 +0800

    fix: maybe can fix-3 dbt zombie processes (#4118)
    
    * fix: maybe can fix-2 dbt zombie processes
    
    * fix: maybe can fix dbt zombie processes
---
 plugins/dbt/tasks/convertor.go | 42 +++---------------------------------------
 1 file changed, 3 insertions(+), 39 deletions(-)

diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index 542d21c0f..63a92ecd0 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -18,7 +18,6 @@ limitations under the License.
 package tasks
 
 import (
-	"bufio"
 	"encoding/json"
 	"net"
 	"net/url"
@@ -190,47 +189,12 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 	cmd := exec.Command(dbtExecParams[0], dbtExecParams[1:]...)
 	log.Info("dbt run script: ", cmd)
 
-	stdout, stdoutErr := cmd.StdoutPipe()
+	stdout, stdoutErr := cmd.CombinedOutput()
 	if stdoutErr != nil {
 		return errors.Convert(stdoutErr)
 	}
-
-	if err = errors.Convert(cmd.Start()); err != nil {
-		return err
-	}
-	defer stdout.Close()
-
-	// prevent zombie process
-	defer func() {
-		ProcessState, err := cmd.Process.Wait()
-		if err != nil || !ProcessState.Success() {
-			log.Error(err, "dbt run cmd.Wait() error")
-		}
-		log.Info("End of dbt program execution.")
-	}()
-
-	scanner := bufio.NewScanner(stdout)
-	var errStr string
-	for scanner.Scan() {
-		line := scanner.Text()
-		log.Info(line)
-		if strings.Contains(line, "Encountered an error") || errStr != "" {
-			errStr += line + "\n"
-		}
-		if strings.Contains(line, "of") && strings.Contains(line, "OK") {
-			taskCtx.IncProgress(1)
-		}
-	}
-	if err := errors.Convert(scanner.Err()); err != nil {
-		log.Error(err, "dbt read stdout failed.")
-		return err
-	}
-
-	// close stdout
-	if closeErr := stdout.Close(); closeErr != nil && err == nil {
-		log.Error(closeErr, "dbt close stdout failed.")
-		return errors.Convert(closeErr)
-	}
+	log.Info("dbt run log: ", string(stdout))
+	log.Info("End of dbt program execution.")
 
 	return nil
 }