You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2022/11/02 12:00:55 UTC
[incubator-devlake] branch release-v0.14 updated: feat: export all dbt args explicit (#3650)
This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch release-v0.14
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/release-v0.14 by this push:
new cc4d1ea4 feat: export all dbt args explicit (#3650)
cc4d1ea4 is described below
commit cc4d1ea4e40d5d72119b993d5848d8608c3d4f7f
Author: long2ice <lo...@gmail.com>
AuthorDate: Wed Nov 2 20:00:41 2022 +0800
feat: export all dbt args explicit (#3650)
* feat: export all dbt args explicit
* fix: dbt error detect
---
config-ui/src/data/pipeline-config-samples/dbt.js | 11 +++++
plugins/dbt/api/swagger.go | 11 +++++
plugins/dbt/dbt.go | 22 ++++++++++
plugins/dbt/tasks/convertor.go | 53 ++++++++++++++++++++++-
plugins/dbt/tasks/task_data.go | 16 +++++--
5 files changed, 109 insertions(+), 4 deletions(-)
diff --git a/config-ui/src/data/pipeline-config-samples/dbt.js b/config-ui/src/data/pipeline-config-samples/dbt.js
index fac96b36..ec51ecf8 100644
--- a/config-ui/src/data/pipeline-config-samples/dbt.js
+++ b/config-ui/src/data/pipeline-config-samples/dbt.js
@@ -29,6 +29,17 @@ const dbtConfig = [
demokey1: 'demovalue1',
demokey2: 'demovalue2'
},
+ failFast: false,
+ profilesPath: '',
+ profile: '',
+ threads: 0,
+ noVersionCheck: false,
+ excludeModels: [],
+ selector: '',
+ state: '',
+ defer: false,
+ noDefer: false,
+ fullRefresh: false,
args: []
}
}
diff --git a/plugins/dbt/api/swagger.go b/plugins/dbt/api/swagger.go
index a14c986c..a0e800af 100644
--- a/plugins/dbt/api/swagger.go
+++ b/plugins/dbt/api/swagger.go
@@ -32,6 +32,17 @@ type Options struct {
ProjectTarget string `json:"projectTarget"`
SelectedModels []string `json:"selectedModels"`
Args []string `json:"args"`
+ FailFast bool `json:"failFast"`
+ ProfilesPath string `json:"profilesPath"`
+ Profile string `json:"profile"`
+ Threads int `json:"threads"`
+ NoVersionCheck bool `json:"noVersionCheck"`
+ ExcludeModels []string `json:"excludeModels"`
+ Selector string `json:"selector"`
+ State string `json:"state"`
+ Defer bool `json:"defer"`
+ NoDefer bool `json:"noDefer"`
+ FullRefresh bool `json:"fullRefresh"`
ProjectVars struct {
Demokey1 string `json:"demokey1"`
Demokey2 string `json:"demokey2"`
diff --git a/plugins/dbt/dbt.go b/plugins/dbt/dbt.go
index 1a3c19cc..1a5e4b7b 100644
--- a/plugins/dbt/dbt.go
+++ b/plugins/dbt/dbt.go
@@ -84,6 +84,17 @@ func main() {
projectTarget := dbtCmd.Flags().StringP("projectTarget", "o", "dev", "this is the default target your dbt project will use.")
modelsSlice := []string{"my_first_dbt_model", "my_second_dbt_model"}
selectedModels := dbtCmd.Flags().StringSliceP("models", "m", modelsSlice, "dbt select models")
+ failFast := dbtCmd.Flags().BoolP("failFast", "", false, "dbt fail fast")
+ profilesPath := dbtCmd.Flags().StringP("profilesPath", "", "/Users/abeizn/.dbt", "dbt profiles path")
+ profile := dbtCmd.Flags().StringP("profile", "", "default", "dbt profile")
+ threads := dbtCmd.Flags().IntP("threads", "", 1, "dbt threads")
+ noVersionCheck := dbtCmd.Flags().BoolP("noVersionCheck", "", false, "dbt no version check")
+ excludeModels := dbtCmd.Flags().StringSliceP("excludeModels", "", []string{}, "dbt exclude models")
+ selector := dbtCmd.Flags().StringP("selector", "", "", "dbt selector")
+ state := dbtCmd.Flags().StringP("state", "", "", "dbt state")
+ deferFlag := dbtCmd.Flags().BoolP("defer", "", false, "dbt defer")
+ noDefer := dbtCmd.Flags().BoolP("noDefer", "", false, "dbt no defer")
+ fullRefresh := dbtCmd.Flags().BoolP("fullRefresh", "", false, "dbt full refresh")
dbtArgs := dbtCmd.Flags().StringSliceP("args", "a", []string{}, "dbt run args")
projectVars := make(map[string]string)
projectVars["event_min_id"] = "7581"
@@ -103,6 +114,17 @@ func main() {
"projectVars": projectVarsConvert,
"projectGitURL": *projectGitURL,
"args": dbtArgs,
+ "failFast": *failFast,
+ "profilesPath": *profilesPath,
+ "profile": *profile,
+ "threads": *threads,
+ "noVersionCheck": *noVersionCheck,
+ "excludeModels": *excludeModels,
+ "selector": *selector,
+ "state": *state,
+ "defer": *deferFlag,
+ "noDefer": *noDefer,
+ "fullRefresh": *fullRefresh,
})
}
runner.RunCmd(dbtCmd)
diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index 3fd9ca36..caf86e5d 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -43,6 +43,18 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
projectTarget := data.Options.ProjectTarget
projectVars := data.Options.ProjectVars
args := data.Options.Args
+ failFast := data.Options.FailFast
+ threads := data.Options.Threads
+ noVersionCheck := data.Options.NoVersionCheck
+ excludeModels := data.Options.ExcludeModels
+ selector := data.Options.Selector
+ state := data.Options.State
+ deferFlag := data.Options.Defer
+ noDefer := data.Options.NoDefer
+ fullRefresh := data.Options.FullRefresh
+ profilesPath := data.Options.ProfilesPath
+ profile := data.Options.Profile
+
err := errors.Convert(os.Chdir(projectPath))
if err != nil {
return err
@@ -126,6 +138,45 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
if args != nil {
dbtExecParams = append(dbtExecParams, args...)
}
+ if failFast {
+ dbtExecParams = append(dbtExecParams, "--fail-fast")
+ }
+ if threads != 0 {
+ dbtExecParams = append(dbtExecParams, "--threads")
+ dbtExecParams = append(dbtExecParams, strconv.Itoa(threads))
+ }
+ if noVersionCheck {
+ dbtExecParams = append(dbtExecParams, "--no-version-check")
+ }
+ if excludeModels != nil {
+ dbtExecParams = append(dbtExecParams, "--exclude")
+ dbtExecParams = append(dbtExecParams, excludeModels...)
+ }
+ if selector != "" {
+ dbtExecParams = append(dbtExecParams, "--selector")
+ dbtExecParams = append(dbtExecParams, selector)
+ }
+ if state != "" {
+ dbtExecParams = append(dbtExecParams, "--state")
+ dbtExecParams = append(dbtExecParams, state)
+ }
+ if deferFlag {
+ dbtExecParams = append(dbtExecParams, "--defer")
+ }
+ if noDefer {
+ dbtExecParams = append(dbtExecParams, "--no-defer")
+ }
+ if fullRefresh {
+ dbtExecParams = append(dbtExecParams, "--full-refresh")
+ }
+ if profilesPath != "" {
+ dbtExecParams = append(dbtExecParams, "--profiles-dir")
+ dbtExecParams = append(dbtExecParams, profilesPath)
+ }
+ if profile != "" {
+ dbtExecParams = append(dbtExecParams, "--profile")
+ dbtExecParams = append(dbtExecParams, profile)
+ }
cmd := exec.Command(dbtExecParams[0], dbtExecParams[1:]...)
log.Info("dbt run script: ", cmd)
stdout, _ := cmd.StdoutPipe()
@@ -138,7 +189,7 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
for scanner.Scan() {
line := scanner.Text()
log.Info(line)
- if strings.Contains(line, "ERROR") || errStr != "" {
+ if strings.Contains(line, "Encountered an error") || errStr != "" {
errStr += line + "\n"
}
if strings.Contains(line, "of") && strings.Contains(line, "OK") {
diff --git a/plugins/dbt/tasks/task_data.go b/plugins/dbt/tasks/task_data.go
index 7bb20a70..d741954a 100644
--- a/plugins/dbt/tasks/task_data.go
+++ b/plugins/dbt/tasks/task_data.go
@@ -22,11 +22,21 @@ type DbtOptions struct {
ProjectName string `json:"projectName"`
ProjectTarget string `json:"projectTarget"`
// clone from git to projectPath if projectGitURL is not empty
- ProjectGitURL string `json:"projectGitURL"`
- // deprecated, use args instead
+ ProjectGitURL string `json:"projectGitURL"`
ProjectVars map[string]interface{} `json:"projectVars"`
SelectedModels []string `json:"selectedModels"`
- // dbt run args
+ FailFast bool `json:"failFast"`
+ ProfilesPath string `json:"profilesPath"`
+ Profile string `json:"profile"`
+ Threads int `json:"threads"`
+ NoVersionCheck bool `json:"noVersionCheck"`
+ ExcludeModels []string `json:"excludeModels"`
+ Selector string `json:"selector"`
+ State string `json:"state"`
+ Defer bool `json:"defer"`
+ NoDefer bool `json:"noDefer"`
+ FullRefresh bool `json:"fullRefresh"`
+ // deprecated, dbt run args
Args []string `json:"args"`
Tasks []string `json:"tasks,omitempty"`
}