You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2022/11/02 12:00:55 UTC

[incubator-devlake] branch release-v0.14 updated: feat: export all dbt args explicit (#3650)

This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch release-v0.14
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v0.14 by this push:
     new cc4d1ea4 feat: export all dbt args explicit (#3650)
cc4d1ea4 is described below

commit cc4d1ea4e40d5d72119b993d5848d8608c3d4f7f
Author: long2ice <lo...@gmail.com>
AuthorDate: Wed Nov 2 20:00:41 2022 +0800

    feat: export all dbt args explicit (#3650)
    
    * feat: export all dbt args explicit
    
    * fix: dbt error detect
---
 config-ui/src/data/pipeline-config-samples/dbt.js | 11 +++++
 plugins/dbt/api/swagger.go                        | 11 +++++
 plugins/dbt/dbt.go                                | 22 ++++++++++
 plugins/dbt/tasks/convertor.go                    | 53 ++++++++++++++++++++++-
 plugins/dbt/tasks/task_data.go                    | 16 +++++--
 5 files changed, 109 insertions(+), 4 deletions(-)

diff --git a/config-ui/src/data/pipeline-config-samples/dbt.js b/config-ui/src/data/pipeline-config-samples/dbt.js
index fac96b36..ec51ecf8 100644
--- a/config-ui/src/data/pipeline-config-samples/dbt.js
+++ b/config-ui/src/data/pipeline-config-samples/dbt.js
@@ -29,6 +29,17 @@ const dbtConfig = [
           demokey1: 'demovalue1',
           demokey2: 'demovalue2'
         },
+        failFast: false,
+        profilesPath: '',
+        profile: '',
+        threads: 0,
+        noVersionCheck: false,
+        excludeModels: [],
+        selector: '',
+        state: '',
+        defer: false,
+        noDefer: false,
+        fullRefresh: false,
         args: []
       }
     }
diff --git a/plugins/dbt/api/swagger.go b/plugins/dbt/api/swagger.go
index a14c986c..a0e800af 100644
--- a/plugins/dbt/api/swagger.go
+++ b/plugins/dbt/api/swagger.go
@@ -32,6 +32,17 @@ type Options struct {
 	ProjectTarget  string   `json:"projectTarget"`
 	SelectedModels []string `json:"selectedModels"`
 	Args           []string `json:"args"`
+	FailFast       bool     `json:"failFast"`
+	ProfilesPath   string   `json:"profilesPath"`
+	Profile        string   `json:"profile"`
+	Threads        int      `json:"threads"`
+	NoVersionCheck bool     `json:"noVersionCheck"`
+	ExcludeModels  []string `json:"excludeModels"`
+	Selector       string   `json:"selector"`
+	State          string   `json:"state"`
+	Defer          bool     `json:"defer"`
+	NoDefer        bool     `json:"noDefer"`
+	FullRefresh    bool     `json:"fullRefresh"`
 	ProjectVars    struct {
 		Demokey1 string `json:"demokey1"`
 		Demokey2 string `json:"demokey2"`
diff --git a/plugins/dbt/dbt.go b/plugins/dbt/dbt.go
index 1a3c19cc..1a5e4b7b 100644
--- a/plugins/dbt/dbt.go
+++ b/plugins/dbt/dbt.go
@@ -84,6 +84,17 @@ func main() {
 	projectTarget := dbtCmd.Flags().StringP("projectTarget", "o", "dev", "this is the default target your dbt project will use.")
 	modelsSlice := []string{"my_first_dbt_model", "my_second_dbt_model"}
 	selectedModels := dbtCmd.Flags().StringSliceP("models", "m", modelsSlice, "dbt select models")
+	failFast := dbtCmd.Flags().BoolP("failFast", "", false, "dbt fail fast")
+	profilesPath := dbtCmd.Flags().StringP("profilesPath", "", "/Users/abeizn/.dbt", "dbt profiles path")
+	profile := dbtCmd.Flags().StringP("profile", "", "default", "dbt profile")
+	threads := dbtCmd.Flags().IntP("threads", "", 1, "dbt threads")
+	noVersionCheck := dbtCmd.Flags().BoolP("noVersionCheck", "", false, "dbt no version check")
+	excludeModels := dbtCmd.Flags().StringSliceP("excludeModels", "", []string{}, "dbt exclude models")
+	selector := dbtCmd.Flags().StringP("selector", "", "", "dbt selector")
+	state := dbtCmd.Flags().StringP("state", "", "", "dbt state")
+	deferFlag := dbtCmd.Flags().BoolP("defer", "", false, "dbt defer")
+	noDefer := dbtCmd.Flags().BoolP("noDefer", "", false, "dbt no defer")
+	fullRefresh := dbtCmd.Flags().BoolP("fullRefresh", "", false, "dbt full refresh")
 	dbtArgs := dbtCmd.Flags().StringSliceP("args", "a", []string{}, "dbt run args")
 	projectVars := make(map[string]string)
 	projectVars["event_min_id"] = "7581"
@@ -103,6 +114,17 @@ func main() {
 			"projectVars":    projectVarsConvert,
 			"projectGitURL":  *projectGitURL,
 			"args":           dbtArgs,
+			"failFast":       *failFast,
+			"profilesPath":   *profilesPath,
+			"profile":        *profile,
+			"threads":        *threads,
+			"noVersionCheck": *noVersionCheck,
+			"excludeModels":  *excludeModels,
+			"selector":       *selector,
+			"state":          *state,
+			"defer":          *deferFlag,
+			"noDefer":        *noDefer,
+			"fullRefresh":    *fullRefresh,
 		})
 	}
 	runner.RunCmd(dbtCmd)
diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index 3fd9ca36..caf86e5d 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -43,6 +43,18 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 	projectTarget := data.Options.ProjectTarget
 	projectVars := data.Options.ProjectVars
 	args := data.Options.Args
+	failFast := data.Options.FailFast
+	threads := data.Options.Threads
+	noVersionCheck := data.Options.NoVersionCheck
+	excludeModels := data.Options.ExcludeModels
+	selector := data.Options.Selector
+	state := data.Options.State
+	deferFlag := data.Options.Defer
+	noDefer := data.Options.NoDefer
+	fullRefresh := data.Options.FullRefresh
+	profilesPath := data.Options.ProfilesPath
+	profile := data.Options.Profile
+
 	err := errors.Convert(os.Chdir(projectPath))
 	if err != nil {
 		return err
@@ -126,6 +138,45 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 	if args != nil {
 		dbtExecParams = append(dbtExecParams, args...)
 	}
+	if failFast {
+		dbtExecParams = append(dbtExecParams, "--fail-fast")
+	}
+	if threads != 0 {
+		dbtExecParams = append(dbtExecParams, "--threads")
+		dbtExecParams = append(dbtExecParams, strconv.Itoa(threads))
+	}
+	if noVersionCheck {
+		dbtExecParams = append(dbtExecParams, "--no-version-check")
+	}
+	if excludeModels != nil {
+		dbtExecParams = append(dbtExecParams, "--exclude")
+		dbtExecParams = append(dbtExecParams, excludeModels...)
+	}
+	if selector != "" {
+		dbtExecParams = append(dbtExecParams, "--selector")
+		dbtExecParams = append(dbtExecParams, selector)
+	}
+	if state != "" {
+		dbtExecParams = append(dbtExecParams, "--state")
+		dbtExecParams = append(dbtExecParams, state)
+	}
+	if deferFlag {
+		dbtExecParams = append(dbtExecParams, "--defer")
+	}
+	if noDefer {
+		dbtExecParams = append(dbtExecParams, "--no-defer")
+	}
+	if fullRefresh {
+		dbtExecParams = append(dbtExecParams, "--full-refresh")
+	}
+	if profilesPath != "" {
+		dbtExecParams = append(dbtExecParams, "--profiles-dir")
+		dbtExecParams = append(dbtExecParams, profilesPath)
+	}
+	if profile != "" {
+		dbtExecParams = append(dbtExecParams, "--profile")
+		dbtExecParams = append(dbtExecParams, profile)
+	}
 	cmd := exec.Command(dbtExecParams[0], dbtExecParams[1:]...)
 	log.Info("dbt run script: ", cmd)
 	stdout, _ := cmd.StdoutPipe()
@@ -138,7 +189,7 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 	for scanner.Scan() {
 		line := scanner.Text()
 		log.Info(line)
-		if strings.Contains(line, "ERROR") || errStr != "" {
+		if strings.Contains(line, "Encountered an error") || errStr != "" {
 			errStr += line + "\n"
 		}
 		if strings.Contains(line, "of") && strings.Contains(line, "OK") {
diff --git a/plugins/dbt/tasks/task_data.go b/plugins/dbt/tasks/task_data.go
index 7bb20a70..d741954a 100644
--- a/plugins/dbt/tasks/task_data.go
+++ b/plugins/dbt/tasks/task_data.go
@@ -22,11 +22,21 @@ type DbtOptions struct {
 	ProjectName   string `json:"projectName"`
 	ProjectTarget string `json:"projectTarget"`
 	// clone from git to projectPath if projectGitURL is not empty
-	ProjectGitURL string `json:"projectGitURL"`
-	// deprecated, use args instead
+	ProjectGitURL  string                 `json:"projectGitURL"`
 	ProjectVars    map[string]interface{} `json:"projectVars"`
 	SelectedModels []string               `json:"selectedModels"`
-	// dbt run args
+	FailFast       bool                   `json:"failFast"`
+	ProfilesPath   string                 `json:"profilesPath"`
+	Profile        string                 `json:"profile"`
+	Threads        int                    `json:"threads"`
+	NoVersionCheck bool                   `json:"noVersionCheck"`
+	ExcludeModels  []string               `json:"excludeModels"`
+	Selector       string                 `json:"selector"`
+	State          string                 `json:"state"`
+	Defer          bool                   `json:"defer"`
+	NoDefer        bool                   `json:"noDefer"`
+	FullRefresh    bool                   `json:"fullRefresh"`
+	// deprecated, dbt run args
 	Args  []string `json:"args"`
 	Tasks []string `json:"tasks,omitempty"`
 }