You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2022/10/19 11:45:53 UTC
[incubator-devlake] 01/04: feat: dbt plugin improvement
This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit add88b5747fe81c3cf991317503f174e5ed6221e
Author: long2ice <lo...@gmail.com>
AuthorDate: Wed Oct 19 12:02:57 2022 +0800
feat: dbt plugin improvement
---
config-ui/src/data/pipeline-config-samples/dbt.js | 4 +-
plugins/dbt/api/swagger.go | 42 ++++-----
plugins/dbt/dbt.go | 17 ++--
plugins/dbt/tasks/convertor.go | 100 ++++++++++++----------
plugins/dbt/tasks/git.go | 48 +++++++++++
plugins/dbt/tasks/task_data.go | 13 ++-
6 files changed, 141 insertions(+), 83 deletions(-)
diff --git a/config-ui/src/data/pipeline-config-samples/dbt.js b/config-ui/src/data/pipeline-config-samples/dbt.js
index 292a46d2..fac96b36 100644
--- a/config-ui/src/data/pipeline-config-samples/dbt.js
+++ b/config-ui/src/data/pipeline-config-samples/dbt.js
@@ -21,13 +21,15 @@ const dbtConfig = [
plugin: 'dbt',
options: {
projectPath: '/var/www/html/my-project',
+ projectGitURL: '',
projectName: 'myproject',
projectTarget: 'dev',
selectedModels: ['model_one', 'model_two'],
projectVars: {
demokey1: 'demovalue1',
demokey2: 'demovalue2'
- }
+ },
+ args: []
}
}
]
diff --git a/plugins/dbt/api/swagger.go b/plugins/dbt/api/swagger.go
index 841c2870..a14c986c 100644
--- a/plugins/dbt/api/swagger.go
+++ b/plugins/dbt/api/swagger.go
@@ -25,19 +25,23 @@ package api
// @Router /blueprints/dbt/blueprint-plan [post]
func _() {}
-type DbtBlueprintPlan [][]struct {
- Plugin string `json:"plugin"`
- Options struct {
- ProjectPath string `json:"projectPath"`
- ProjectName string `json:"projectName"`
- ProjectTarget string `json:"projectTarget"`
- SelectedModels []string `json:"selectedModels"`
- ProjectVars struct {
- Demokey1 string `json:"demokey1"`
- Demokey2 string `json:"demokey2"`
- } `json:"projectVars"`
- } `json:"options"`
+type Options struct {
+ ProjectPath string `json:"projectPath"`
+ ProjectGitURL string `json:"projectGitURL"`
+ ProjectName string `json:"projectName"`
+ ProjectTarget string `json:"projectTarget"`
+ SelectedModels []string `json:"selectedModels"`
+ Args []string `json:"args"`
+ ProjectVars struct {
+ Demokey1 string `json:"demokey1"`
+ Demokey2 string `json:"demokey2"`
+ } `json:"projectVars"`
}
+type Plan struct {
+ Plugin string `json:"plugin"`
+ Options Options `json:"options"`
+}
+type DbtBlueprintPlan [][]Plan
// @Summary pipelines plan for dbt
// @Description pipelines plan for dbt
@@ -47,16 +51,4 @@ type DbtBlueprintPlan [][]struct {
// @Router /pipelines/dbt/pipeline-plan [post]
func _() {}
-type DbtPipelinePlan [][]struct {
- Plugin string `json:"plugin"`
- Options struct {
- ProjectPath string `json:"projectPath"`
- ProjectName string `json:"projectName"`
- ProjectTarget string `json:"projectTarget"`
- SelectedModels []string `json:"selectedModels"`
- ProjectVars struct {
- Demokey1 string `json:"demokey1"`
- Demokey2 string `json:"demokey2"`
- } `json:"projectVars"`
- } `json:"options"`
-}
+type DbtPipelinePlan [][]Plan
diff --git a/plugins/dbt/dbt.go b/plugins/dbt/dbt.go
index f336cb97..6f95c2c9 100644
--- a/plugins/dbt/dbt.go
+++ b/plugins/dbt/dbt.go
@@ -27,8 +27,10 @@ import (
"github.com/spf13/cobra"
)
-var _ core.PluginMeta = (*Dbt)(nil)
-var _ core.PluginTask = (*Dbt)(nil)
+var (
+ _ core.PluginMeta = (*Dbt)(nil)
+ _ core.PluginTask = (*Dbt)(nil)
+)
type Dbt struct{}
@@ -38,6 +40,7 @@ func (plugin Dbt) Description() string {
func (plugin Dbt) SubTaskMetas() []core.SubTaskMeta {
return []core.SubTaskMeta{
+ tasks.GitMeta,
tasks.DbtConverterMeta,
}
}
@@ -81,16 +84,12 @@ func main() {
dbtCmd := &cobra.Command{Use: "dbt"}
_ = dbtCmd.MarkFlagRequired("projectPath")
projectPath := dbtCmd.Flags().StringP("projectPath", "p", "/Users/abeizn/demoapp", "user dbt project directory.")
-
- _ = dbtCmd.MarkFlagRequired("projectName")
+ projectGitURL := dbtCmd.Flags().StringP("projectGitURL", "g", "", "user dbt project git url.")
projectName := dbtCmd.Flags().StringP("projectName", "n", "demoapp", "user dbt project name.")
-
projectTarget := dbtCmd.Flags().StringP("projectTarget", "o", "dev", "this is the default target your dbt project will use.")
-
- _ = dbtCmd.MarkFlagRequired("selectedModels")
modelsSlice := []string{"my_first_dbt_model", "my_second_dbt_model"}
selectedModels := dbtCmd.Flags().StringSliceP("models", "m", modelsSlice, "dbt select models")
-
+ dbtArgs := dbtCmd.Flags().StringSliceP("args", "a", []string{}, "dbt run args")
projectVars := make(map[string]string)
projectVars["event_min_id"] = "7581"
projectVars["event_max_id"] = "7582"
@@ -107,6 +106,8 @@ func main() {
"projectTarget": *projectTarget,
"selectedModels": *selectedModels,
"projectVars": projectVarsConvert,
+ "projectGitURL": *projectGitURL,
+ "args": dbtArgs,
})
}
runner.RunCmd(dbtCmd)
diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index a0ca1004..265fd5c1 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -20,7 +20,6 @@ package tasks
import (
"bufio"
"encoding/json"
- "github.com/apache/incubator-devlake/errors"
"net"
"net/url"
"os"
@@ -28,6 +27,8 @@ import (
"strconv"
"strings"
+ "github.com/apache/incubator-devlake/errors"
+
"github.com/apache/incubator-devlake/plugins/core"
"github.com/spf13/viper"
)
@@ -41,60 +42,66 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
projectName := data.Options.ProjectName
projectTarget := data.Options.ProjectTarget
projectVars := data.Options.ProjectVars
- dbUrl := taskCtx.GetConfig("DB_URL")
- u, err := errors.Convert01(url.Parse(dbUrl))
+ args := data.Options.Args
+ err := errors.Convert(os.Chdir(projectPath))
if err != nil {
return err
}
- dbType := u.Scheme
- dbUsername := u.User.Username()
- dbPassword, _ := u.User.Password()
- dbServer, dbPort, _ := net.SplitHostPort(u.Host)
- dbDataBase := u.Path[1:]
- var dbSchema string
- flag := strings.Compare(dbType, "mysql")
- if flag == 0 {
- // mysql database
- dbSchema = dbDataBase
- } else {
- // other database
- mapQuery, err := errors.Convert01(url.ParseQuery(u.RawQuery))
+ _, err = errors.Convert01(os.Stat("profiles.yml"))
+ // if profiles.yml not exist, create it manually
+ if err != nil {
+ dbUrl := taskCtx.GetConfig("DB_URL")
+ u, err := errors.Convert01(url.Parse(dbUrl))
if err != nil {
return err
}
- if value, ok := mapQuery["search_path"]; ok {
- if len(value) < 1 {
- return errors.Default.New("DB_URL search_path parses error")
+ dbType := u.Scheme
+ dbUsername := u.User.Username()
+ dbPassword, _ := u.User.Password()
+ dbServer, dbPort, _ := net.SplitHostPort(u.Host)
+ dbDataBase := u.Path[1:]
+ var dbSchema string
+ flag := strings.Compare(dbType, "mysql")
+ if flag == 0 {
+ // mysql database
+ dbSchema = dbDataBase
+ } else {
+ // other database
+ mapQuery, err := errors.Convert01(url.ParseQuery(u.RawQuery))
+ if err != nil {
+ return err
}
- dbSchema = value[0]
+ if value, ok := mapQuery["search_path"]; ok {
+ if len(value) < 1 {
+ return errors.Default.New("DB_URL search_path parses error")
+ }
+ dbSchema = value[0]
+ } else {
+ dbSchema = "public"
+ }
+ }
+ config := viper.New()
+ config.Set(projectName+".target", projectTarget)
+ config.Set(projectName+".outputs."+projectTarget+".type", dbType)
+ dbPortInt, _ := strconv.Atoi(dbPort)
+ config.Set(projectName+".outputs."+projectTarget+".port", dbPortInt)
+ config.Set(projectName+".outputs."+projectTarget+".password", dbPassword)
+ config.Set(projectName+".outputs."+projectTarget+".schema", dbSchema)
+ if flag == 0 {
+ config.Set(projectName+".outputs."+projectTarget+".server", dbServer)
+ config.Set(projectName+".outputs."+projectTarget+".username", dbUsername)
+ config.Set(projectName+".outputs."+projectTarget+".database", dbDataBase)
} else {
- dbSchema = "public"
+ config.Set(projectName+".outputs."+projectTarget+".host", dbServer)
+ config.Set(projectName+".outputs."+projectTarget+".user", dbUsername)
+ config.Set(projectName+".outputs."+projectTarget+".dbname", dbDataBase)
+ }
+ err = errors.Convert(config.WriteConfigAs("profiles.yml"))
+ if err != nil {
+ return err
}
}
- err = errors.Convert(os.Chdir(projectPath))
- if err != nil {
- return err
- }
- config := viper.New()
- config.Set(projectName+".target", projectTarget)
- config.Set(projectName+".outputs."+projectTarget+".type", dbType)
- dbPortInt, _ := strconv.Atoi(dbPort)
- config.Set(projectName+".outputs."+projectTarget+".port", dbPortInt)
- config.Set(projectName+".outputs."+projectTarget+".password", dbPassword)
- config.Set(projectName+".outputs."+projectTarget+".schema", dbSchema)
- if flag == 0 {
- config.Set(projectName+".outputs."+projectTarget+".server", dbServer)
- config.Set(projectName+".outputs."+projectTarget+".username", dbUsername)
- config.Set(projectName+".outputs."+projectTarget+".database", dbDataBase)
- } else {
- config.Set(projectName+".outputs."+projectTarget+".host", dbServer)
- config.Set(projectName+".outputs."+projectTarget+".user", dbUsername)
- config.Set(projectName+".outputs."+projectTarget+".dbname", dbDataBase)
- }
- err = errors.Convert(config.WriteConfigAs("profiles.yml"))
- if err != nil {
- return err
- }
+ // if package.yml exist, install dbt dependencies
_, err = errors.Convert01(os.Stat("packages.yml"))
if err == nil {
cmd := exec.Command("dbt", "deps")
@@ -114,6 +121,9 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
}
dbtExecParams = append(dbtExecParams, "--select")
dbtExecParams = append(dbtExecParams, models...)
+ if args != nil {
+ dbtExecParams = append(dbtExecParams, args...)
+ }
cmd := exec.Command(dbtExecParams[0], dbtExecParams[1:]...)
log.Info("dbt run script: %v", cmd)
stdout, _ := cmd.StdoutPipe()
diff --git a/plugins/dbt/tasks/git.go b/plugins/dbt/tasks/git.go
new file mode 100644
index 00000000..d0e63d86
--- /dev/null
+++ b/plugins/dbt/tasks/git.go
@@ -0,0 +1,48 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package tasks
+
+import (
+ "os/exec"
+
+ "github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/plugins/core"
+)
+
+func Git(taskCtx core.SubTaskContext) errors.Error {
+ logger := taskCtx.GetLogger()
+ data := taskCtx.GetData().(*DbtTaskData)
+ if data.Options.ProjectGitURL == "" {
+ return nil
+ }
+ cmd := exec.Command("git", "clone", data.Options.ProjectGitURL, data.Options.ProjectPath)
+ logger.Info("start clone dbt project: %v", cmd)
+ out, err := cmd.CombinedOutput()
+ if err != nil {
+ logger.Error(err, "clone dbt project failed")
+ return errors.Convert(err)
+ }
+ logger.Info("clone dbt project success: %v", string(out))
+ return nil
+}
+
+var GitMeta = core.SubTaskMeta{
+ Name: "Git",
+ EntryPoint: Git,
+ EnabledByDefault: true,
+ Description: "Clone dbt project from git",
+}
diff --git a/plugins/dbt/tasks/task_data.go b/plugins/dbt/tasks/task_data.go
index 7733d836..7bb20a70 100644
--- a/plugins/dbt/tasks/task_data.go
+++ b/plugins/dbt/tasks/task_data.go
@@ -18,12 +18,17 @@ limitations under the License.
package tasks
type DbtOptions struct {
- ProjectPath string `json:"projectPath"`
- ProjectName string `json:"projectName"`
- ProjectTarget string `json:"projectTarget"`
+ ProjectPath string `json:"projectPath"`
+ ProjectName string `json:"projectName"`
+ ProjectTarget string `json:"projectTarget"`
+ // clone from git to projectPath if projectGitURL is not empty
+ ProjectGitURL string `json:"projectGitURL"`
+ // deprecated, use args instead
ProjectVars map[string]interface{} `json:"projectVars"`
SelectedModels []string `json:"selectedModels"`
- Tasks []string `json:"tasks,omitempty"`
+ // dbt run args
+ Args []string `json:"args"`
+ Tasks []string `json:"tasks,omitempty"`
}
type DbtTaskData struct {