You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2022/11/02 04:42:13 UTC

[incubator-devlake] branch main updated: feat: redirect dbt error log to pipeline error (#3644)

This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new e69f7933 feat: redirect dbt error log to pipeline error (#3644)
e69f7933 is described below

commit e69f793300985530bf38832d74809dda450cd953
Author: long2ice <lo...@gmail.com>
AuthorDate: Wed Nov 2 12:42:08 2022 +0800

    feat: redirect dbt error log to pipeline error (#3644)
---
 plugins/dbt/tasks/convertor.go | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index 8ddda37d..7941caf7 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -119,8 +119,10 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 		dbtExecParams = append(dbtExecParams, "--vars")
 		dbtExecParams = append(dbtExecParams, string(jsonProjectVars))
 	}
-	dbtExecParams = append(dbtExecParams, "--select")
-	dbtExecParams = append(dbtExecParams, models...)
+	if models != nil {
+		dbtExecParams = append(dbtExecParams, "--select")
+		dbtExecParams = append(dbtExecParams, models...)
+	}
 	if args != nil {
 		dbtExecParams = append(dbtExecParams, args...)
 	}
@@ -132,9 +134,13 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 		return err
 	}
 	scanner := bufio.NewScanner(stdout)
+	var errStr string
 	for scanner.Scan() {
 		line := scanner.Text()
 		log.Info(line)
+		if strings.Contains(line, "ERROR") || errStr != "" {
+			errStr += line + "\n"
+		}
 		if strings.Contains(line, "of") && strings.Contains(line, "OK") {
 			taskCtx.IncProgress(1)
 		}
@@ -144,7 +150,7 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 	}
 	err = errors.Convert(cmd.Wait())
 	if err != nil {
-		return err
+		return errors.Internal.New(errStr)
 	}
 	if !cmd.ProcessState.Success() {
 		log.Error(nil, "dbt run task error, please check!!!")