You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2022/10/19 11:45:53 UTC

[incubator-devlake] 01/04: feat: dbt plugin improvement

This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git

commit add88b5747fe81c3cf991317503f174e5ed6221e
Author: long2ice <lo...@gmail.com>
AuthorDate: Wed Oct 19 12:02:57 2022 +0800

    feat: dbt plugin improvement
---
 config-ui/src/data/pipeline-config-samples/dbt.js |   4 +-
 plugins/dbt/api/swagger.go                        |  42 ++++-----
 plugins/dbt/dbt.go                                |  17 ++--
 plugins/dbt/tasks/convertor.go                    | 100 ++++++++++++----------
 plugins/dbt/tasks/git.go                          |  48 +++++++++++
 plugins/dbt/tasks/task_data.go                    |  13 ++-
 6 files changed, 141 insertions(+), 83 deletions(-)

diff --git a/config-ui/src/data/pipeline-config-samples/dbt.js b/config-ui/src/data/pipeline-config-samples/dbt.js
index 292a46d2..fac96b36 100644
--- a/config-ui/src/data/pipeline-config-samples/dbt.js
+++ b/config-ui/src/data/pipeline-config-samples/dbt.js
@@ -21,13 +21,15 @@ const dbtConfig = [
       plugin: 'dbt',
       options: {
         projectPath: '/var/www/html/my-project',
+        projectGitURL: '',
         projectName: 'myproject',
         projectTarget: 'dev',
         selectedModels: ['model_one', 'model_two'],
         projectVars: {
           demokey1: 'demovalue1',
           demokey2: 'demovalue2'
-        }
+        },
+        args: []
       }
     }
   ]
diff --git a/plugins/dbt/api/swagger.go b/plugins/dbt/api/swagger.go
index 841c2870..a14c986c 100644
--- a/plugins/dbt/api/swagger.go
+++ b/plugins/dbt/api/swagger.go
@@ -25,19 +25,23 @@ package api
 // @Router /blueprints/dbt/blueprint-plan [post]
 func _() {}
 
-type DbtBlueprintPlan [][]struct {
-	Plugin  string `json:"plugin"`
-	Options struct {
-		ProjectPath    string   `json:"projectPath"`
-		ProjectName    string   `json:"projectName"`
-		ProjectTarget  string   `json:"projectTarget"`
-		SelectedModels []string `json:"selectedModels"`
-		ProjectVars    struct {
-			Demokey1 string `json:"demokey1"`
-			Demokey2 string `json:"demokey2"`
-		} `json:"projectVars"`
-	} `json:"options"`
+type Options struct {
+	ProjectPath    string   `json:"projectPath"`
+	ProjectGitURL  string   `json:"projectGitURL"`
+	ProjectName    string   `json:"projectName"`
+	ProjectTarget  string   `json:"projectTarget"`
+	SelectedModels []string `json:"selectedModels"`
+	Args           []string `json:"args"`
+	ProjectVars    struct {
+		Demokey1 string `json:"demokey1"`
+		Demokey2 string `json:"demokey2"`
+	} `json:"projectVars"`
 }
+type Plan struct {
+	Plugin  string  `json:"plugin"`
+	Options Options `json:"options"`
+}
+type DbtBlueprintPlan [][]Plan
 
 // @Summary pipelines plan for dbt
 // @Description pipelines plan for dbt
@@ -47,16 +51,4 @@ type DbtBlueprintPlan [][]struct {
 // @Router /pipelines/dbt/pipeline-plan [post]
 func _() {}
 
-type DbtPipelinePlan [][]struct {
-	Plugin  string `json:"plugin"`
-	Options struct {
-		ProjectPath    string   `json:"projectPath"`
-		ProjectName    string   `json:"projectName"`
-		ProjectTarget  string   `json:"projectTarget"`
-		SelectedModels []string `json:"selectedModels"`
-		ProjectVars    struct {
-			Demokey1 string `json:"demokey1"`
-			Demokey2 string `json:"demokey2"`
-		} `json:"projectVars"`
-	} `json:"options"`
-}
+type DbtPipelinePlan [][]Plan
diff --git a/plugins/dbt/dbt.go b/plugins/dbt/dbt.go
index f336cb97..6f95c2c9 100644
--- a/plugins/dbt/dbt.go
+++ b/plugins/dbt/dbt.go
@@ -27,8 +27,10 @@ import (
 	"github.com/spf13/cobra"
 )
 
-var _ core.PluginMeta = (*Dbt)(nil)
-var _ core.PluginTask = (*Dbt)(nil)
+var (
+	_ core.PluginMeta = (*Dbt)(nil)
+	_ core.PluginTask = (*Dbt)(nil)
+)
 
 type Dbt struct{}
 
@@ -38,6 +40,7 @@ func (plugin Dbt) Description() string {
 
 func (plugin Dbt) SubTaskMetas() []core.SubTaskMeta {
 	return []core.SubTaskMeta{
+		tasks.GitMeta,
 		tasks.DbtConverterMeta,
 	}
 }
@@ -81,16 +84,12 @@ func main() {
 	dbtCmd := &cobra.Command{Use: "dbt"}
 	_ = dbtCmd.MarkFlagRequired("projectPath")
 	projectPath := dbtCmd.Flags().StringP("projectPath", "p", "/Users/abeizn/demoapp", "user dbt project directory.")
-
-	_ = dbtCmd.MarkFlagRequired("projectName")
+	projectGitURL := dbtCmd.Flags().StringP("projectGitURL", "g", "", "user dbt project git url.")
 	projectName := dbtCmd.Flags().StringP("projectName", "n", "demoapp", "user dbt project name.")
-
 	projectTarget := dbtCmd.Flags().StringP("projectTarget", "o", "dev", "this is the default target your dbt project will use.")
-
-	_ = dbtCmd.MarkFlagRequired("selectedModels")
 	modelsSlice := []string{"my_first_dbt_model", "my_second_dbt_model"}
 	selectedModels := dbtCmd.Flags().StringSliceP("models", "m", modelsSlice, "dbt select models")
-
+	dbtArgs := dbtCmd.Flags().StringSliceP("args", "a", []string{}, "dbt run args")
 	projectVars := make(map[string]string)
 	projectVars["event_min_id"] = "7581"
 	projectVars["event_max_id"] = "7582"
@@ -107,6 +106,8 @@ func main() {
 			"projectTarget":  *projectTarget,
 			"selectedModels": *selectedModels,
 			"projectVars":    projectVarsConvert,
+			"projectGitURL":  *projectGitURL,
+			"args":           dbtArgs,
 		})
 	}
 	runner.RunCmd(dbtCmd)
diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index a0ca1004..265fd5c1 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -20,7 +20,6 @@ package tasks
 import (
 	"bufio"
 	"encoding/json"
-	"github.com/apache/incubator-devlake/errors"
 	"net"
 	"net/url"
 	"os"
@@ -28,6 +27,8 @@ import (
 	"strconv"
 	"strings"
 
+	"github.com/apache/incubator-devlake/errors"
+
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/spf13/viper"
 )
@@ -41,60 +42,66 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 	projectName := data.Options.ProjectName
 	projectTarget := data.Options.ProjectTarget
 	projectVars := data.Options.ProjectVars
-	dbUrl := taskCtx.GetConfig("DB_URL")
-	u, err := errors.Convert01(url.Parse(dbUrl))
+	args := data.Options.Args
+	err := errors.Convert(os.Chdir(projectPath))
 	if err != nil {
 		return err
 	}
-	dbType := u.Scheme
-	dbUsername := u.User.Username()
-	dbPassword, _ := u.User.Password()
-	dbServer, dbPort, _ := net.SplitHostPort(u.Host)
-	dbDataBase := u.Path[1:]
-	var dbSchema string
-	flag := strings.Compare(dbType, "mysql")
-	if flag == 0 {
-		// mysql database
-		dbSchema = dbDataBase
-	} else {
-		// other database
-		mapQuery, err := errors.Convert01(url.ParseQuery(u.RawQuery))
+	_, err = errors.Convert01(os.Stat("profiles.yml"))
+	// if profiles.yml not exist, create it manually
+	if err != nil {
+		dbUrl := taskCtx.GetConfig("DB_URL")
+		u, err := errors.Convert01(url.Parse(dbUrl))
 		if err != nil {
 			return err
 		}
-		if value, ok := mapQuery["search_path"]; ok {
-			if len(value) < 1 {
-				return errors.Default.New("DB_URL search_path parses error")
+		dbType := u.Scheme
+		dbUsername := u.User.Username()
+		dbPassword, _ := u.User.Password()
+		dbServer, dbPort, _ := net.SplitHostPort(u.Host)
+		dbDataBase := u.Path[1:]
+		var dbSchema string
+		flag := strings.Compare(dbType, "mysql")
+		if flag == 0 {
+			// mysql database
+			dbSchema = dbDataBase
+		} else {
+			// other database
+			mapQuery, err := errors.Convert01(url.ParseQuery(u.RawQuery))
+			if err != nil {
+				return err
 			}
-			dbSchema = value[0]
+			if value, ok := mapQuery["search_path"]; ok {
+				if len(value) < 1 {
+					return errors.Default.New("DB_URL search_path parses error")
+				}
+				dbSchema = value[0]
+			} else {
+				dbSchema = "public"
+			}
+		}
+		config := viper.New()
+		config.Set(projectName+".target", projectTarget)
+		config.Set(projectName+".outputs."+projectTarget+".type", dbType)
+		dbPortInt, _ := strconv.Atoi(dbPort)
+		config.Set(projectName+".outputs."+projectTarget+".port", dbPortInt)
+		config.Set(projectName+".outputs."+projectTarget+".password", dbPassword)
+		config.Set(projectName+".outputs."+projectTarget+".schema", dbSchema)
+		if flag == 0 {
+			config.Set(projectName+".outputs."+projectTarget+".server", dbServer)
+			config.Set(projectName+".outputs."+projectTarget+".username", dbUsername)
+			config.Set(projectName+".outputs."+projectTarget+".database", dbDataBase)
 		} else {
-			dbSchema = "public"
+			config.Set(projectName+".outputs."+projectTarget+".host", dbServer)
+			config.Set(projectName+".outputs."+projectTarget+".user", dbUsername)
+			config.Set(projectName+".outputs."+projectTarget+".dbname", dbDataBase)
+		}
+		err = errors.Convert(config.WriteConfigAs("profiles.yml"))
+		if err != nil {
+			return err
 		}
 	}
-	err = errors.Convert(os.Chdir(projectPath))
-	if err != nil {
-		return err
-	}
-	config := viper.New()
-	config.Set(projectName+".target", projectTarget)
-	config.Set(projectName+".outputs."+projectTarget+".type", dbType)
-	dbPortInt, _ := strconv.Atoi(dbPort)
-	config.Set(projectName+".outputs."+projectTarget+".port", dbPortInt)
-	config.Set(projectName+".outputs."+projectTarget+".password", dbPassword)
-	config.Set(projectName+".outputs."+projectTarget+".schema", dbSchema)
-	if flag == 0 {
-		config.Set(projectName+".outputs."+projectTarget+".server", dbServer)
-		config.Set(projectName+".outputs."+projectTarget+".username", dbUsername)
-		config.Set(projectName+".outputs."+projectTarget+".database", dbDataBase)
-	} else {
-		config.Set(projectName+".outputs."+projectTarget+".host", dbServer)
-		config.Set(projectName+".outputs."+projectTarget+".user", dbUsername)
-		config.Set(projectName+".outputs."+projectTarget+".dbname", dbDataBase)
-	}
-	err = errors.Convert(config.WriteConfigAs("profiles.yml"))
-	if err != nil {
-		return err
-	}
+	// if package.yml exist, install dbt dependencies
 	_, err = errors.Convert01(os.Stat("packages.yml"))
 	if err == nil {
 		cmd := exec.Command("dbt", "deps")
@@ -114,6 +121,9 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error {
 	}
 	dbtExecParams = append(dbtExecParams, "--select")
 	dbtExecParams = append(dbtExecParams, models...)
+	if args != nil {
+		dbtExecParams = append(dbtExecParams, args...)
+	}
 	cmd := exec.Command(dbtExecParams[0], dbtExecParams[1:]...)
 	log.Info("dbt run script: %v", cmd)
 	stdout, _ := cmd.StdoutPipe()
diff --git a/plugins/dbt/tasks/git.go b/plugins/dbt/tasks/git.go
new file mode 100644
index 00000000..d0e63d86
--- /dev/null
+++ b/plugins/dbt/tasks/git.go
@@ -0,0 +1,48 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+	http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package tasks
+
+import (
+	"os/exec"
+
+	"github.com/apache/incubator-devlake/errors"
+	"github.com/apache/incubator-devlake/plugins/core"
+)
+
+func Git(taskCtx core.SubTaskContext) errors.Error {
+	logger := taskCtx.GetLogger()
+	data := taskCtx.GetData().(*DbtTaskData)
+	if data.Options.ProjectGitURL == "" {
+		return nil
+	}
+	cmd := exec.Command("git", "clone", data.Options.ProjectGitURL, data.Options.ProjectPath)
+	logger.Info("start clone dbt project: %v", cmd)
+	out, err := cmd.CombinedOutput()
+	if err != nil {
+		logger.Error(err, "clone dbt project failed")
+		return errors.Convert(err)
+	}
+	logger.Info("clone dbt project success: %v", string(out))
+	return nil
+}
+
+var GitMeta = core.SubTaskMeta{
+	Name:             "Git",
+	EntryPoint:       Git,
+	EnabledByDefault: true,
+	Description:      "Clone dbt project from git",
+}
diff --git a/plugins/dbt/tasks/task_data.go b/plugins/dbt/tasks/task_data.go
index 7733d836..7bb20a70 100644
--- a/plugins/dbt/tasks/task_data.go
+++ b/plugins/dbt/tasks/task_data.go
@@ -18,12 +18,17 @@ limitations under the License.
 package tasks
 
 type DbtOptions struct {
-	ProjectPath    string                 `json:"projectPath"`
-	ProjectName    string                 `json:"projectName"`
-	ProjectTarget  string                 `json:"projectTarget"`
+	ProjectPath   string `json:"projectPath"`
+	ProjectName   string `json:"projectName"`
+	ProjectTarget string `json:"projectTarget"`
+	// clone from git to projectPath if projectGitURL is not empty
+	ProjectGitURL string `json:"projectGitURL"`
+	// deprecated, use args instead
 	ProjectVars    map[string]interface{} `json:"projectVars"`
 	SelectedModels []string               `json:"selectedModels"`
-	Tasks          []string               `json:"tasks,omitempty"`
+	// dbt run args
+	Args  []string `json:"args"`
+	Tasks []string `json:"tasks,omitempty"`
 }
 
 type DbtTaskData struct {