You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2023/02/09 06:27:58 UTC

[incubator-devlake] branch release-v0.15 updated: fix: dbt error not return (#4237) (#4366)

This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch release-v0.15
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v0.15 by this push:
     new 1bec691fc fix: dbt error not return (#4237) (#4366)
1bec691fc is described below

commit 1bec691fcf0927c8b3c1a2fbf9dbb167d48de6d4
Author: long2ice <ji...@merico.dev>
AuthorDate: Thu Feb 9 14:27:54 2023 +0800

    fix: dbt error not return (#4237) (#4366)
---
 plugins/dbt/tasks/convertor.go | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)

diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index da4169563..8c3d7ff50 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -45,6 +45,7 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 	projectVars := data.Options.ProjectVars
 	args := data.Options.Args
 	failFast := data.Options.FailFast
+	threads := data.Options.Threads
 	noVersionCheck := data.Options.NoVersionCheck
 	excludeModels := data.Options.ExcludeModels
 	selector := data.Options.Selector
@@ -118,7 +119,7 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 		log.Info("dbt deps run script: ", cmdDeps)
 		// prevent zombie process
 		defer func() {
-			if err := errors.Convert(cmdDeps.Wait()); err != nil {
+			if err = errors.Convert(cmdDeps.Wait()); err != nil {
 				log.Error(nil, "dbt deps run cmd.cmdDeps() error")
 			}
 		}()
@@ -147,6 +148,10 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 	if failFast {
 		dbtExecParams = append(dbtExecParams, "--fail-fast")
 	}
+	if threads != 0 {
+		dbtExecParams = append(dbtExecParams, "--threads")
+		dbtExecParams = append(dbtExecParams, strconv.Itoa(threads))
+	}
 	if noVersionCheck {
 		dbtExecParams = append(dbtExecParams, "--no-version-check")
 	}
@@ -196,17 +201,18 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 	}
 
 	// prevent zombie process
+	var errStr string
 	defer func() {
-		err := errors.Convert(cmd.Wait())
+		err = errors.Convert(cmd.Wait())
 		if err != nil {
 			log.Error(err, "The DBT project run failed!")
+			err = errors.SubtaskErr.New(errStr)
 		} else {
 			log.Info("The DBT project run ended.")
 		}
 	}()
 
 	scanner := bufio.NewScanner(stdout)
-	var errStr string
 	for scanner.Scan() {
 		line := scanner.Text()
 		log.Info(line)
@@ -217,7 +223,7 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 			taskCtx.IncProgress(1)
 		}
 	}
-	if err := errors.Convert(scanner.Err()); err != nil {
+	if err = errors.Convert(scanner.Err()); err != nil {
 		log.Error(err, "dbt read stdout failed.")
 		return err
 	}
@@ -228,7 +234,7 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 		return errors.Convert(closeErr)
 	}
 
-	return nil
+	return err
 }
 
 var DbtConverterMeta = core.SubTaskMeta{