You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2022/06/15 07:56:29 UTC
[incubator-devlake] 02/12: add some helper for e2e test
This is an automated email from the ASF dual-hosted git repository.
warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit ccb526f6bb3dd8966c3cbe5f63ab0ab38411f42e
Author: linyh <ya...@meri.co>
AuthorDate: Tue Jun 14 22:27:37 2022 +0800
add some helper for e2e test
---
.github/workflows/test-e2e.yml | 1 +
e2e/database.go | 22 +++-
helpers/e2ehelper/data_flow_tester.go | 124 +++++++++++++++++++--
...{csv_file_iterator_test.go => csv_file_test.go} | 22 +++-
helpers/pluginhelper/csv_file_writer.go | 82 ++++++++++++++
plugins/helper/default_task_context.go | 5 +-
6 files changed, 237 insertions(+), 19 deletions(-)
diff --git a/.github/workflows/test-e2e.yml b/.github/workflows/test-e2e.yml
index 1a083811..1e42cff0 100644
--- a/.github/workflows/test-e2e.yml
+++ b/.github/workflows/test-e2e.yml
@@ -43,3 +43,4 @@ jobs:
run: |
cp .env.example .env
make e2e-test
+ make e2e-plugins
diff --git a/e2e/database.go b/e2e/database.go
index 42aecb29..c69db039 100644
--- a/e2e/database.go
+++ b/e2e/database.go
@@ -22,8 +22,10 @@ import (
"fmt"
"log"
"net/url"
+ "strings"
mysqlGorm "gorm.io/driver/mysql"
+ postgresGorm "gorm.io/driver/postgres"
"gorm.io/gorm"
)
@@ -34,10 +36,17 @@ func InitializeDb() (*sql.DB, error) {
if err != nil {
return nil, err
}
- if u.Scheme == "mysql" {
+
+ var db *sql.DB
+ switch strings.ToLower(u.Scheme) {
+ case "mysql":
dbUrl = fmt.Sprintf(("%s@tcp(%s)%s?%s"), u.User.String(), u.Host, u.Path, u.RawQuery)
+ db, err = sql.Open(u.Scheme, dbUrl)
+ case "postgresql", "postgres", "pg":
+ db, err = sql.Open(`pgx`, dbUrl)
+ default:
+ return nil, fmt.Errorf("invalid DB_URL:%s", dbUrl)
}
- db, err := sql.Open("mysql", dbUrl)
if err != nil {
return nil, err
}
@@ -58,3 +67,12 @@ func InitializeGormDb() (*gorm.DB, error) {
}
return db, nil
}
+
+func InitializeGormDb2() (*gorm.DB, error) {
+ connectionString := "merico:merico@tcp(localhost:3306)/lake"
+ db, err := gorm.Open(postgresGorm.Open(connectionString))
+ if err != nil {
+ return nil, err
+ }
+ return db, nil
+}
diff --git a/helpers/e2ehelper/data_flow_tester.go b/helpers/e2ehelper/data_flow_tester.go
index df9950b4..20625013 100644
--- a/helpers/e2ehelper/data_flow_tester.go
+++ b/helpers/e2ehelper/data_flow_tester.go
@@ -19,19 +19,24 @@ package e2ehelper
import (
"context"
+ "database/sql"
"fmt"
- "testing"
- "time"
-
"github.com/apache/incubator-devlake/config"
"github.com/apache/incubator-devlake/helpers/pluginhelper"
+ "github.com/apache/incubator-devlake/impl/dalgorm"
"github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/apache/incubator-devlake/runner"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"gorm.io/gorm"
+ "gorm.io/gorm/schema"
+ "os"
+ "strings"
+ "testing"
+ "time"
)
// DataFlowTester provides a universal data integrity validation facility to help `Plugin` verifying records between
@@ -58,6 +63,7 @@ import (
type DataFlowTester struct {
Cfg *viper.Viper
Db *gorm.DB
+ Dal dal.Dal
T *testing.T
Name string
Plugin core.PluginMeta
@@ -78,6 +84,7 @@ func NewDataFlowTester(t *testing.T, pluginName string, pluginMeta core.PluginMe
return &DataFlowTester{
Cfg: cfg,
Db: db,
+ Dal: dalgorm.NewDalgorm(db),
T: t,
Name: pluginName,
Plugin: pluginMeta,
@@ -94,7 +101,7 @@ func (t *DataFlowTester) ImportCsv(csvRelPath string, tableName string) {
if err != nil {
panic(err)
}
- t.FlushTable(tableName)
+ t.MigrateRawTableAndFlush(tableName)
// load rows and insert into target table
for csvIter.HasNext() {
// make sure
@@ -106,6 +113,32 @@ func (t *DataFlowTester) ImportCsv(csvRelPath string, tableName string) {
}
}
+// MigrateTableAndFlush migrate table and deletes all records from specified table
+func (t *DataFlowTester) MigrateRawTableAndFlush(rawRableName string) {
+ // flush target table
+ err := t.Db.Table(rawRableName).AutoMigrate(&helper.RawData{})
+ if err != nil {
+ panic(err)
+ }
+ err = t.Db.Exec(fmt.Sprintf("DELETE FROM %s", rawRableName)).Error
+ if err != nil {
+ panic(err)
+ }
+}
+
+// MigrateTableAndFlush migrate table and deletes all records from specified table
+func (t *DataFlowTester) MigrateTableAndFlush(dst schema.Tabler) {
+ // flush target table
+ err := t.Db.AutoMigrate(dst)
+ if err != nil {
+ panic(err)
+ }
+ err = t.Db.Delete(dst, `true`).Error
+ if err != nil {
+ panic(err)
+ }
+}
+
// FlushTable deletes all records from specified table
func (t *DataFlowTester) FlushTable(tableName string) {
// flush target table
@@ -127,8 +160,71 @@ func (t *DataFlowTester) Subtask(subtaskMeta core.SubTaskMeta, taskData interfac
// VerifyTable reads rows from csv file and compare with records from database one by one. You must specified the
// Primary Key Fields with `pkfields` so DataFlowTester could select the exact record from database, as well as which
// fields to compare with by specifying `targetfields` parameter.
-func (t *DataFlowTester) VerifyTable(tableName string, csvRelPath string, pkfields []string, targetfields []string) {
+func (t *DataFlowTester) CreateSnapshotOrVerify(dst schema.Tabler, csvRelPath string, pkfields []string, targetfields []string) {
+ _, err := os.Stat(csvRelPath)
+ if err == nil {
+ t.VerifyTable(dst, csvRelPath, pkfields, targetfields)
+ return
+ }
+
+ location, _ := time.LoadLocation(`UTC`)
+ allFields := []string{}
+ allFields = append(pkfields, targetfields...)
+ dbCursor, err := t.Dal.Cursor(
+ dal.Select(strings.Join(allFields, `,`)),
+ dal.From(dst.TableName()),
+ )
+ if err != nil {
+ panic(err)
+ }
+
+ columns, err := dbCursor.Columns()
+ if err != nil {
+ panic(err)
+ }
+ csvWriter := pluginhelper.NewCsvFileWriter(csvRelPath, columns)
+ defer csvWriter.Close()
+
+ // define how to scan value
+ columnTypes, _ := dbCursor.ColumnTypes()
+ forScanValues := make([]interface{}, len(allFields))
+ for i, columnType := range columnTypes {
+ if columnType.ScanType().Name() == `Time` || columnType.ScanType().Name() == `NullTime` {
+ forScanValues[i] = new(sql.NullTime)
+ } else {
+ forScanValues[i] = new(string)
+ }
+ }
+
+ for dbCursor.Next() {
+ err = dbCursor.Scan(forScanValues...)
+ if err != nil {
+ panic(err)
+ }
+ values := make([]string, len(allFields))
+ for i := range forScanValues {
+ switch forScanValues[i].(type) {
+ case *sql.NullTime:
+ value := forScanValues[i].(*sql.NullTime)
+ if value.Valid {
+ values[i] = value.Time.In(location).Format("2006-01-02T15:04:05.000-07:00")
+ } else {
+ values[i] = ``
+ }
+ default:
+ values[i] = fmt.Sprint(*forScanValues[i].(*string))
+ }
+ }
+ csvWriter.Write(values)
+ }
+}
+
+// VerifyTable reads rows from csv file and compare with records from database one by one. You must specified the
+// Primary Key Fields with `pkfields` so DataFlowTester could select the exact record from database, as well as which
+// fields to compare with by specifying `targetfields` parameter.
+func (t *DataFlowTester) VerifyTable(dst schema.Tabler, csvRelPath string, pkfields []string, targetfields []string) {
csvIter := pluginhelper.NewCsvFileIterator(csvRelPath)
+ location, _ := time.LoadLocation(`UTC`)
defer csvIter.Close()
var expectedTotal int64
@@ -139,11 +235,11 @@ func (t *DataFlowTester) VerifyTable(tableName string, csvRelPath string, pkfiel
pkvalues = append(pkvalues, expected[pkf])
}
actual := make(map[string]interface{})
- where := ""
+ where := []string{}
for _, field := range pkfields {
- where += fmt.Sprintf(" %s = ?", field)
+ where = append(where, fmt.Sprintf(" %s = ?", field))
}
- err := t.Db.Table(tableName).Where(where, pkvalues...).Find(actual).Error
+ err := t.Db.Table(dst.TableName()).Where(strings.Join(where, ` AND `), pkvalues...).Find(actual).Error
if err != nil {
panic(err)
}
@@ -152,17 +248,21 @@ func (t *DataFlowTester) VerifyTable(tableName string, csvRelPath string, pkfiel
switch actual[field].(type) {
// TODO: ensure testing database is in UTC timezone
case time.Time:
- actualValue = actual[field].(time.Time).Format("2006-01-02 15:04:05.000000000")
+ if actual[field] != nil {
+ actualValue = actual[field].(time.Time).In(location).Format("2006-01-02T15:04:05.000-07:00")
+ }
default:
- actualValue = fmt.Sprint(actual[field])
+ if actual[field] != nil {
+ actualValue = fmt.Sprint(actual[field])
+ }
}
- assert.Equal(t.T, expected[field], actualValue)
+ assert.Equal(t.T, expected[field], actualValue, fmt.Sprintf(`%s.%s not match`, dst.TableName(), field))
}
expectedTotal++
}
var actualTotal int64
- err := t.Db.Table(tableName).Count(&actualTotal).Error
+ err := t.Db.Table(dst.TableName()).Count(&actualTotal).Error
if err != nil {
panic(err)
}
diff --git a/helpers/pluginhelper/csv_file_iterator_test.go b/helpers/pluginhelper/csv_file_test.go
similarity index 58%
rename from helpers/pluginhelper/csv_file_iterator_test.go
rename to helpers/pluginhelper/csv_file_test.go
index 9c6c6394..644c88b3 100644
--- a/helpers/pluginhelper/csv_file_iterator_test.go
+++ b/helpers/pluginhelper/csv_file_test.go
@@ -17,12 +17,26 @@ limitations under the License.
package pluginhelper
-func ExampleCsvFileIterator() {
- iter := NewCsvFileIterator("/path/to/foobar.csv")
+import (
+ "fmt"
+ "github.com/magiconair/properties/assert"
+ "testing"
+)
+
+func TestExampleCsvFile(t *testing.T) {
+ tmpPath := t.TempDir()
+ filename := fmt.Sprintf(`%s/foobar.csv`, tmpPath)
+ println(filename)
+
+ writer := NewCsvFileWriter(filename, []string{"id", "name", "json", "created_at"})
+ writer.Write([]string{"123", "foobar", `{"url": "https://example.com"}`, "2022-05-05 09:56:43.438000000"})
+ writer.Close()
+
+ iter := NewCsvFileIterator(filename)
defer iter.Close()
for iter.HasNext() {
row := iter.Fetch()
- println(row["name"]) // foobar
- println(row["json"]) // {"url": "https://example.com"}
+ assert.Equal(t, row["name"], "foobar", "name not euqal")
+ assert.Equal(t, row["json"], `{"url": "https://example.com"}`, "json not euqal")
}
}
diff --git a/helpers/pluginhelper/csv_file_writer.go b/helpers/pluginhelper/csv_file_writer.go
new file mode 100644
index 00000000..c413c5ad
--- /dev/null
+++ b/helpers/pluginhelper/csv_file_writer.go
@@ -0,0 +1,82 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package pluginhelper
+
+import (
+ "encoding/csv"
+ "os"
+)
+
+// CsvFileWriter make writer for saving csv file easier, it write tuple to csv file
+//
+// Example CSV format (exported by dbeaver):
+//
+// "id","name","json","created_at"
+// 123,"foobar","{""url"": ""https://example.com""}","2022-05-05 09:56:43.438000000"
+//
+type CsvFileWriter struct {
+ file *os.File
+ writer *csv.Writer
+ fields []string
+}
+
+// NewCsvFileWriter create a `*CsvFileWriter` based on path to saving csv file
+func NewCsvFileWriter(csvPath string, fields []string) *CsvFileWriter {
+ // open csv file
+ csvFile, err := os.Create(csvPath)
+ if err != nil {
+ panic(err)
+ }
+ csvWriter := csv.NewWriter(csvFile)
+ // write field names
+ err = csvWriter.Write(fields)
+ if err != nil {
+ panic(err)
+ }
+ csvWriter.Flush()
+ if err != nil {
+ panic(err)
+ }
+ return &CsvFileWriter{
+ file: csvFile,
+ writer: csvWriter,
+ fields: fields,
+ }
+}
+
+// Close releases resource
+func (ci *CsvFileWriter) Close() {
+ ci.writer.Flush()
+ err := ci.file.Close()
+ if err != nil {
+ panic(err)
+ }
+}
+
+// Write the values into csv
+func (ci *CsvFileWriter) Write(values []string) {
+ err := ci.writer.Write(values)
+ if err != nil {
+ panic(err)
+ }
+}
+
+// Flush the wrote data into file physically
+func (ci *CsvFileWriter) Flush() {
+ ci.writer.Flush()
+}
diff --git a/plugins/helper/default_task_context.go b/plugins/helper/default_task_context.go
index 9f4b6747..dcccb49d 100644
--- a/plugins/helper/default_task_context.go
+++ b/plugins/helper/default_task_context.go
@@ -239,7 +239,7 @@ func (c *DefaultTaskContext) SubTaskContext(subtask string) (core.SubTaskContext
return nil, fmt.Errorf("subtask %s doesn't exist", subtask)
}
-// This returns a stand-alone core.SubTaskContext,
+// NewStandaloneSubTaskContext returns a stand-alone core.SubTaskContext,
// not attached to any core.TaskContext.
// Use this if you need to run/debug a subtask without
// going through the usual workflow.
@@ -265,6 +265,9 @@ func (c *DefaultTaskContext) SetData(data interface{}) {
var _ core.TaskContext = (*DefaultTaskContext)(nil)
func (c *DefaultSubTaskContext) TaskContext() core.TaskContext {
+ if c.taskCtx == nil {
+ return nil
+ }
return c.taskCtx
}