You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2022/06/15 07:56:29 UTC

[incubator-devlake] 02/12: add some helper for e2e test

This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git

commit ccb526f6bb3dd8966c3cbe5f63ab0ab38411f42e
Author: linyh <ya...@meri.co>
AuthorDate: Tue Jun 14 22:27:37 2022 +0800

    add some helper for e2e test
---
 .github/workflows/test-e2e.yml                     |   1 +
 e2e/database.go                                    |  22 +++-
 helpers/e2ehelper/data_flow_tester.go              | 124 +++++++++++++++++++--
 ...{csv_file_iterator_test.go => csv_file_test.go} |  22 +++-
 helpers/pluginhelper/csv_file_writer.go            |  82 ++++++++++++++
 plugins/helper/default_task_context.go             |   5 +-
 6 files changed, 237 insertions(+), 19 deletions(-)

diff --git a/.github/workflows/test-e2e.yml b/.github/workflows/test-e2e.yml
index 1a083811..1e42cff0 100644
--- a/.github/workflows/test-e2e.yml
+++ b/.github/workflows/test-e2e.yml
@@ -43,3 +43,4 @@ jobs:
         run: |
           cp .env.example .env
           make e2e-test
+          make e2e-plugins
diff --git a/e2e/database.go b/e2e/database.go
index 42aecb29..c69db039 100644
--- a/e2e/database.go
+++ b/e2e/database.go
@@ -22,8 +22,10 @@ import (
 	"fmt"
 	"log"
 	"net/url"
+	"strings"
 
 	mysqlGorm "gorm.io/driver/mysql"
+	postgresGorm "gorm.io/driver/postgres"
 	"gorm.io/gorm"
 )
 
@@ -34,10 +36,17 @@ func InitializeDb() (*sql.DB, error) {
 	if err != nil {
 		return nil, err
 	}
-	if u.Scheme == "mysql" {
+
+	var db *sql.DB
+	switch strings.ToLower(u.Scheme) {
+	case "mysql":
 		dbUrl = fmt.Sprintf(("%s@tcp(%s)%s?%s"), u.User.String(), u.Host, u.Path, u.RawQuery)
+		db, err = sql.Open(u.Scheme, dbUrl)
+	case "postgresql", "postgres", "pg":
+		db, err = sql.Open(`pgx`, dbUrl)
+	default:
+		return nil, fmt.Errorf("invalid DB_URL:%s", dbUrl)
 	}
-	db, err := sql.Open("mysql", dbUrl)
 	if err != nil {
 		return nil, err
 	}
@@ -58,3 +67,12 @@ func InitializeGormDb() (*gorm.DB, error) {
 	}
 	return db, nil
 }
+
+func InitializeGormDb2() (*gorm.DB, error) {
+	connectionString := "merico:merico@tcp(localhost:3306)/lake"
+	db, err := gorm.Open(postgresGorm.Open(connectionString))
+	if err != nil {
+		return nil, err
+	}
+	return db, nil
+}
diff --git a/helpers/e2ehelper/data_flow_tester.go b/helpers/e2ehelper/data_flow_tester.go
index df9950b4..20625013 100644
--- a/helpers/e2ehelper/data_flow_tester.go
+++ b/helpers/e2ehelper/data_flow_tester.go
@@ -19,19 +19,24 @@ package e2ehelper
 
 import (
 	"context"
+	"database/sql"
 	"fmt"
-	"testing"
-	"time"
-
 	"github.com/apache/incubator-devlake/config"
 	"github.com/apache/incubator-devlake/helpers/pluginhelper"
+	"github.com/apache/incubator-devlake/impl/dalgorm"
 	"github.com/apache/incubator-devlake/logger"
 	"github.com/apache/incubator-devlake/plugins/core"
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/apache/incubator-devlake/runner"
 	"github.com/spf13/viper"
 	"github.com/stretchr/testify/assert"
 	"gorm.io/gorm"
+	"gorm.io/gorm/schema"
+	"os"
+	"strings"
+	"testing"
+	"time"
 )
 
 // DataFlowTester provides a universal data integrity validation facility to help `Plugin` verifying records between
@@ -58,6 +63,7 @@ import (
 type DataFlowTester struct {
 	Cfg    *viper.Viper
 	Db     *gorm.DB
+	Dal    dal.Dal
 	T      *testing.T
 	Name   string
 	Plugin core.PluginMeta
@@ -78,6 +84,7 @@ func NewDataFlowTester(t *testing.T, pluginName string, pluginMeta core.PluginMe
 	return &DataFlowTester{
 		Cfg:    cfg,
 		Db:     db,
+		Dal:    dalgorm.NewDalgorm(db),
 		T:      t,
 		Name:   pluginName,
 		Plugin: pluginMeta,
@@ -94,7 +101,7 @@ func (t *DataFlowTester) ImportCsv(csvRelPath string, tableName string) {
 	if err != nil {
 		panic(err)
 	}
-	t.FlushTable(tableName)
+	t.MigrateRawTableAndFlush(tableName)
 	// load rows and insert into target table
 	for csvIter.HasNext() {
 		// make sure
@@ -106,6 +113,32 @@ func (t *DataFlowTester) ImportCsv(csvRelPath string, tableName string) {
 	}
 }
 
+// MigrateTableAndFlush migrate table and deletes all records from specified table
+func (t *DataFlowTester) MigrateRawTableAndFlush(rawRableName string) {
+	// flush target table
+	err := t.Db.Table(rawRableName).AutoMigrate(&helper.RawData{})
+	if err != nil {
+		panic(err)
+	}
+	err = t.Db.Exec(fmt.Sprintf("DELETE FROM %s", rawRableName)).Error
+	if err != nil {
+		panic(err)
+	}
+}
+
+// MigrateTableAndFlush migrate table and deletes all records from specified table
+func (t *DataFlowTester) MigrateTableAndFlush(dst schema.Tabler) {
+	// flush target table
+	err := t.Db.AutoMigrate(dst)
+	if err != nil {
+		panic(err)
+	}
+	err = t.Db.Delete(dst, `true`).Error
+	if err != nil {
+		panic(err)
+	}
+}
+
 // FlushTable deletes all records from specified table
 func (t *DataFlowTester) FlushTable(tableName string) {
 	// flush target table
@@ -127,8 +160,71 @@ func (t *DataFlowTester) Subtask(subtaskMeta core.SubTaskMeta, taskData interfac
 // VerifyTable reads rows from csv file and compare with records from database one by one. You must specified the
 // Primary Key Fields with `pkfields` so DataFlowTester could select the exact record from database, as well as which
 // fields to compare with by specifying `targetfields` parameter.
-func (t *DataFlowTester) VerifyTable(tableName string, csvRelPath string, pkfields []string, targetfields []string) {
+func (t *DataFlowTester) CreateSnapshotOrVerify(dst schema.Tabler, csvRelPath string, pkfields []string, targetfields []string) {
+	_, err := os.Stat(csvRelPath)
+	if err == nil {
+		t.VerifyTable(dst, csvRelPath, pkfields, targetfields)
+		return
+	}
+
+	location, _ := time.LoadLocation(`UTC`)
+	allFields := []string{}
+	allFields = append(pkfields, targetfields...)
+	dbCursor, err := t.Dal.Cursor(
+		dal.Select(strings.Join(allFields, `,`)),
+		dal.From(dst.TableName()),
+	)
+	if err != nil {
+		panic(err)
+	}
+
+	columns, err := dbCursor.Columns()
+	if err != nil {
+		panic(err)
+	}
+	csvWriter := pluginhelper.NewCsvFileWriter(csvRelPath, columns)
+	defer csvWriter.Close()
+
+	// define how to scan value
+	columnTypes, _ := dbCursor.ColumnTypes()
+	forScanValues := make([]interface{}, len(allFields))
+	for i, columnType := range columnTypes {
+		if columnType.ScanType().Name() == `Time` || columnType.ScanType().Name() == `NullTime` {
+			forScanValues[i] = new(sql.NullTime)
+		} else {
+			forScanValues[i] = new(string)
+		}
+	}
+
+	for dbCursor.Next() {
+		err = dbCursor.Scan(forScanValues...)
+		if err != nil {
+			panic(err)
+		}
+		values := make([]string, len(allFields))
+		for i := range forScanValues {
+			switch forScanValues[i].(type) {
+			case *sql.NullTime:
+				value := forScanValues[i].(*sql.NullTime)
+				if value.Valid {
+					values[i] = value.Time.In(location).Format("2006-01-02T15:04:05.000-07:00")
+				} else {
+					values[i] = ``
+				}
+			default:
+				values[i] = fmt.Sprint(*forScanValues[i].(*string))
+			}
+		}
+		csvWriter.Write(values)
+	}
+}
+
+// VerifyTable reads rows from csv file and compare with records from database one by one. You must specified the
+// Primary Key Fields with `pkfields` so DataFlowTester could select the exact record from database, as well as which
+// fields to compare with by specifying `targetfields` parameter.
+func (t *DataFlowTester) VerifyTable(dst schema.Tabler, csvRelPath string, pkfields []string, targetfields []string) {
 	csvIter := pluginhelper.NewCsvFileIterator(csvRelPath)
+	location, _ := time.LoadLocation(`UTC`)
 	defer csvIter.Close()
 
 	var expectedTotal int64
@@ -139,11 +235,11 @@ func (t *DataFlowTester) VerifyTable(tableName string, csvRelPath string, pkfiel
 			pkvalues = append(pkvalues, expected[pkf])
 		}
 		actual := make(map[string]interface{})
-		where := ""
+		where := []string{}
 		for _, field := range pkfields {
-			where += fmt.Sprintf(" %s = ?", field)
+			where = append(where, fmt.Sprintf(" %s = ?", field))
 		}
-		err := t.Db.Table(tableName).Where(where, pkvalues...).Find(actual).Error
+		err := t.Db.Table(dst.TableName()).Where(strings.Join(where, ` AND `), pkvalues...).Find(actual).Error
 		if err != nil {
 			panic(err)
 		}
@@ -152,17 +248,21 @@ func (t *DataFlowTester) VerifyTable(tableName string, csvRelPath string, pkfiel
 			switch actual[field].(type) {
 			// TODO: ensure testing database is in UTC timezone
 			case time.Time:
-				actualValue = actual[field].(time.Time).Format("2006-01-02 15:04:05.000000000")
+				if actual[field] != nil {
+					actualValue = actual[field].(time.Time).In(location).Format("2006-01-02T15:04:05.000-07:00")
+				}
 			default:
-				actualValue = fmt.Sprint(actual[field])
+				if actual[field] != nil {
+					actualValue = fmt.Sprint(actual[field])
+				}
 			}
-			assert.Equal(t.T, expected[field], actualValue)
+			assert.Equal(t.T, expected[field], actualValue, fmt.Sprintf(`%s.%s not match`, dst.TableName(), field))
 		}
 		expectedTotal++
 	}
 
 	var actualTotal int64
-	err := t.Db.Table(tableName).Count(&actualTotal).Error
+	err := t.Db.Table(dst.TableName()).Count(&actualTotal).Error
 	if err != nil {
 		panic(err)
 	}
diff --git a/helpers/pluginhelper/csv_file_iterator_test.go b/helpers/pluginhelper/csv_file_test.go
similarity index 58%
rename from helpers/pluginhelper/csv_file_iterator_test.go
rename to helpers/pluginhelper/csv_file_test.go
index 9c6c6394..644c88b3 100644
--- a/helpers/pluginhelper/csv_file_iterator_test.go
+++ b/helpers/pluginhelper/csv_file_test.go
@@ -17,12 +17,26 @@ limitations under the License.
 
 package pluginhelper
 
-func ExampleCsvFileIterator() {
-	iter := NewCsvFileIterator("/path/to/foobar.csv")
+import (
+	"fmt"
+	"github.com/magiconair/properties/assert"
+	"testing"
+)
+
+func TestExampleCsvFile(t *testing.T) {
+	tmpPath := t.TempDir()
+	filename := fmt.Sprintf(`%s/foobar.csv`, tmpPath)
+	println(filename)
+
+	writer := NewCsvFileWriter(filename, []string{"id", "name", "json", "created_at"})
+	writer.Write([]string{"123", "foobar", `{"url": "https://example.com"}`, "2022-05-05 09:56:43.438000000"})
+	writer.Close()
+
+	iter := NewCsvFileIterator(filename)
 	defer iter.Close()
 	for iter.HasNext() {
 		row := iter.Fetch()
-		println(row["name"]) // foobar
-		println(row["json"]) // {"url": "https://example.com"}
+		assert.Equal(t, row["name"], "foobar", "name not euqal")
+		assert.Equal(t, row["json"], `{"url": "https://example.com"}`, "json not euqal")
 	}
 }
diff --git a/helpers/pluginhelper/csv_file_writer.go b/helpers/pluginhelper/csv_file_writer.go
new file mode 100644
index 00000000..c413c5ad
--- /dev/null
+++ b/helpers/pluginhelper/csv_file_writer.go
@@ -0,0 +1,82 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package pluginhelper
+
+import (
+	"encoding/csv"
+	"os"
+)
+
+// CsvFileWriter make writer for saving csv file easier, it write tuple to csv file
+//
+// Example CSV format (exported by dbeaver):
+//
+//   "id","name","json","created_at"
+//   123,"foobar","{""url"": ""https://example.com""}","2022-05-05 09:56:43.438000000"
+//
+type CsvFileWriter struct {
+	file   *os.File
+	writer *csv.Writer
+	fields []string
+}
+
+// NewCsvFileWriter create a `*CsvFileWriter` based on path to saving csv file
+func NewCsvFileWriter(csvPath string, fields []string) *CsvFileWriter {
+	// open csv file
+	csvFile, err := os.Create(csvPath)
+	if err != nil {
+		panic(err)
+	}
+	csvWriter := csv.NewWriter(csvFile)
+	// write field names
+	err = csvWriter.Write(fields)
+	if err != nil {
+		panic(err)
+	}
+	csvWriter.Flush()
+	if err != nil {
+		panic(err)
+	}
+	return &CsvFileWriter{
+		file:   csvFile,
+		writer: csvWriter,
+		fields: fields,
+	}
+}
+
+// Close releases resource
+func (ci *CsvFileWriter) Close() {
+	ci.writer.Flush()
+	err := ci.file.Close()
+	if err != nil {
+		panic(err)
+	}
+}
+
+// Write the values into csv
+func (ci *CsvFileWriter) Write(values []string) {
+	err := ci.writer.Write(values)
+	if err != nil {
+		panic(err)
+	}
+}
+
+// Flush the wrote data into file physically
+func (ci *CsvFileWriter) Flush() {
+	ci.writer.Flush()
+}
diff --git a/plugins/helper/default_task_context.go b/plugins/helper/default_task_context.go
index 9f4b6747..dcccb49d 100644
--- a/plugins/helper/default_task_context.go
+++ b/plugins/helper/default_task_context.go
@@ -239,7 +239,7 @@ func (c *DefaultTaskContext) SubTaskContext(subtask string) (core.SubTaskContext
 	return nil, fmt.Errorf("subtask %s doesn't exist", subtask)
 }
 
-// This returns a stand-alone core.SubTaskContext,
+// NewStandaloneSubTaskContext returns a stand-alone core.SubTaskContext,
 // not attached to any core.TaskContext.
 // Use this if you need to run/debug a subtask without
 // going through the usual workflow.
@@ -265,6 +265,9 @@ func (c *DefaultTaskContext) SetData(data interface{}) {
 var _ core.TaskContext = (*DefaultTaskContext)(nil)
 
 func (c *DefaultSubTaskContext) TaskContext() core.TaskContext {
+	if c.taskCtx == nil {
+		return nil
+	}
 	return c.taskCtx
 }