You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2023/02/24 09:34:20 UTC

[incubator-devlake] branch main updated: feat: enhance the plugin `customize` (#4301)

This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d26fb7e2 feat: enhance the plugin `customize` (#4301)
3d26fb7e2 is described below

commit 3d26fb7e27154c6057c09bcc60004f1545fd0d13
Author: mindlesscloud <li...@merico.dev>
AuthorDate: Fri Feb 24 17:34:15 2023 +0800

    feat: enhance the plugin `customize` (#4301)
    
    * feat: enhance the plugin
    
    * feat: import csv
    
    * refactor: seperate the CSV file uploading endpoints
    
    * feat: add field.IsCustomizedField
    
    * refactor: remove raw data columns from _tool_customized_fields
    
    * fix: check customized column name
    
    * fix: fix e2e of bitbucket
---
 backend/core/dal/dal.go                            |  32 ++-
 backend/helpers/e2ehelper/data_flow_tester.go      |   6 +-
 backend/helpers/pluginhelper/csv_file_iterator.go  |  32 ++-
 backend/helpers/pluginhelper/csv_file_test.go      |   2 +-
 backend/impls/dalgorm/dalgorm.go                   |  23 +-
 backend/plugins/bitbucket/e2e/repo_test.go         |   2 +-
 backend/plugins/customize/api/csv.go               | 111 +++++++++
 backend/plugins/customize/api/{api.go => field.go} | 148 +++++-------
 .../customize/e2e/customized_fields_test.go        |  92 +++++++
 .../plugins/customize/e2e/extract_fields_test.go   |  15 +-
 ...fields_test.go => import_issue_commits_test.go} |  46 ++--
 .../plugins/customize/e2e/import_issues_test.go    | 147 ++++++++++++
 .../customize/e2e/raw_tables/issues_commits.csv    |  13 +
 .../customize/e2e/raw_tables/issues_input.csv      |   5 +
 .../snapshot_tables/_tool_customized_fields.csv    |   5 +
 .../customize/e2e/snapshot_tables/board_issues.csv |   5 +
 .../e2e/snapshot_tables/issue_commits.csv          |  13 +
 .../customize/e2e/snapshot_tables/issue_labels.csv |   5 +
 .../customize/e2e/snapshot_tables/issues_csv.csv   |   4 +
 .../e2e/snapshot_tables/issues_output.csv          |   5 +
 backend/plugins/customize/impl/impl.go             |  12 +
 .../customize/models/customized_field.go}          |  34 ++-
 .../20230201_add_customized_fields.go}             |  36 ++-
 .../migrationscripts/archived/customized_field.go} |  34 ++-
 .../customize/models/migrationscripts/register.go} |  25 +-
 backend/plugins/customize/service/service.go       | 263 +++++++++++++++++++++
 .../customize/service/service_test.go}             |  60 +++--
 27 files changed, 952 insertions(+), 223 deletions(-)

diff --git a/backend/core/dal/dal.go b/backend/core/dal/dal.go
index b60ca980b..5abbea8f3 100644
--- a/backend/core/dal/dal.go
+++ b/backend/core/dal/dal.go
@@ -24,6 +24,34 @@ import (
 	"github.com/apache/incubator-devlake/core/errors"
 )
 
+const (
+	Varchar ColumnType = "varchar(255)"
+	Text    ColumnType = "text"
+	Int     ColumnType = "bigint"
+	Time    ColumnType = "timestamp"
+	Float   ColumnType = "float"
+)
+
+var columnTypes = map[string]ColumnType{
+	Varchar.String(): Varchar,
+	Text.String():    Text,
+	Int.String():     Int,
+	Time.String():    Time,
+	Float.String():   Float,
+}
+
+type ColumnType string
+
+func (c ColumnType) String() string {
+	return string(c)
+}
+
+// ToColumnType converts a string to ColumnType
+func ToColumnType(s string) (ColumnType, bool) {
+	t, ok := columnTypes[s]
+	return t, ok
+}
+
 type Tabler interface {
 	TableName() string
 }
@@ -87,7 +115,7 @@ type Dal interface {
 	// AutoMigrate runs auto migration for given entity
 	AutoMigrate(entity interface{}, clauses ...Clause) errors.Error
 	// AddColumn add column for the table
-	AddColumn(table, columnName, columnType string) errors.Error
+	AddColumn(table, columnName string, columnType ColumnType) errors.Error
 	// DropColumns drop column from the table
 	DropColumns(table string, columnName ...string) errors.Error
 	// Exec executes raw sql query
@@ -106,6 +134,8 @@ type Dal interface {
 	Pluck(column string, dest interface{}, clauses ...Clause) errors.Error
 	// Create insert record to database
 	Create(entity interface{}, clauses ...Clause) errors.Error
+	// CreateWithMap insert record to database, the record is organized as map
+	CreateWithMap(entity interface{}, record map[string]interface{}) errors.Error
 	// Update updates record
 	Update(entity interface{}, clauses ...Clause) errors.Error
 	// UpdateColumn allows you to update multiple records
diff --git a/backend/helpers/e2ehelper/data_flow_tester.go b/backend/helpers/e2ehelper/data_flow_tester.go
index 7727c3273..bc0723eb8 100644
--- a/backend/helpers/e2ehelper/data_flow_tester.go
+++ b/backend/helpers/e2ehelper/data_flow_tester.go
@@ -119,7 +119,7 @@ func NewDataFlowTester(t *testing.T, pluginName string, pluginMeta plugin.Plugin
 
 // ImportCsvIntoRawTable imports records from specified csv file into target raw table, note that existing data would be deleted first.
 func (t *DataFlowTester) ImportCsvIntoRawTable(csvRelPath string, rawTableName string) {
-	csvIter := pluginhelper.NewCsvFileIterator(csvRelPath)
+	csvIter, _ := pluginhelper.NewCsvFileIterator(csvRelPath)
 	defer csvIter.Close()
 	t.FlushRawTable(rawTableName)
 	// load rows and insert into target table
@@ -136,7 +136,7 @@ func (t *DataFlowTester) ImportCsvIntoRawTable(csvRelPath string, rawTableName s
 
 // ImportCsvIntoTabler imports records from specified csv file into target tabler, note that existing data would be deleted first.
 func (t *DataFlowTester) ImportCsvIntoTabler(csvRelPath string, dst schema.Tabler) {
-	csvIter := pluginhelper.NewCsvFileIterator(csvRelPath)
+	csvIter, _ := pluginhelper.NewCsvFileIterator(csvRelPath)
 	defer csvIter.Close()
 	t.FlushTabler(dst)
 	// load rows and insert into target table
@@ -425,7 +425,7 @@ func (t *DataFlowTester) VerifyTableWithOptions(dst schema.Tabler, opts TableOpt
 		panic(err)
 	}
 
-	csvIter := pluginhelper.NewCsvFileIterator(opts.CSVRelPath)
+	csvIter, _ := pluginhelper.NewCsvFileIterator(opts.CSVRelPath)
 	defer csvIter.Close()
 
 	var expectedTotal int64
diff --git a/backend/helpers/pluginhelper/csv_file_iterator.go b/backend/helpers/pluginhelper/csv_file_iterator.go
index 33cac851c..b7a56ec94 100644
--- a/backend/helpers/pluginhelper/csv_file_iterator.go
+++ b/backend/helpers/pluginhelper/csv_file_iterator.go
@@ -21,6 +21,8 @@ import (
 	"encoding/csv"
 	"io"
 	"os"
+
+	"github.com/apache/incubator-devlake/core/errors"
 )
 
 // CsvFileIterator make iterating rows from csv file easier, it reads tuple from csv file and turn it into
@@ -31,30 +33,35 @@ import (
 //	"id","name","json","created_at"
 //	123,"foobar","{""url"": ""https://example.com""}","2022-05-05 09:56:43.438000000"
 type CsvFileIterator struct {
-	file   *os.File
+	file   io.ReadCloser
 	reader *csv.Reader
 	fields []string
 	row    map[string]interface{}
 }
 
 // NewCsvFileIterator create a `*CsvFileIterator` based on path to csv file
-func NewCsvFileIterator(csvPath string) *CsvFileIterator {
+func NewCsvFileIterator(csvPath string) (*CsvFileIterator, errors.Error) {
 	// open csv file
 	csvFile, err := os.Open(csvPath)
 	if err != nil {
-		panic(err)
+		return nil, errors.Convert(err)
 	}
+	return NewCsvFileIteratorFromFile(csvFile)
+}
+
+// NewCsvFileIteratorFromFile create a `*CsvFileIterator` from a file descriptor
+func NewCsvFileIteratorFromFile(csvFile io.ReadCloser) (*CsvFileIterator, errors.Error) {
 	csvReader := csv.NewReader(csvFile)
 	// load field names
 	fields, err := csvReader.Read()
 	if err != nil {
-		panic(err)
+		return nil, errors.Convert(err)
 	}
 	return &CsvFileIterator{
 		file:   csvFile,
 		reader: csvReader,
 		fields: fields,
-	}
+	}, nil
 }
 
 // Close releases resource
@@ -67,21 +74,30 @@ func (ci *CsvFileIterator) Close() {
 
 // HasNext returns a boolean to indicate whether there was any row to be `Fetch`
 func (ci *CsvFileIterator) HasNext() bool {
+	hasNext, err := ci.HasNextWithError()
+	if err != nil {
+		panic(err)
+	}
+	return hasNext
+}
+
+// HasNextWithError returns a boolean to indicate whether there was any row to be `Fetch`
+func (ci *CsvFileIterator) HasNextWithError() (bool, errors.Error) {
 	row, err := ci.reader.Read()
 	if err == io.EOF {
 		ci.row = nil
-		return false
+		return false, nil
 	}
 	if err != nil {
 		ci.row = nil
-		panic(err)
+		return false, errors.Convert(err)
 	}
 	// convert row tuple to map type, so gorm can insert data with it
 	ci.row = make(map[string]interface{})
 	for index, field := range ci.fields {
 		ci.row[field] = row[index]
 	}
-	return true
+	return true, nil
 }
 
 // Fetch returns current row
diff --git a/backend/helpers/pluginhelper/csv_file_test.go b/backend/helpers/pluginhelper/csv_file_test.go
index 644c88b38..42ac6c97f 100644
--- a/backend/helpers/pluginhelper/csv_file_test.go
+++ b/backend/helpers/pluginhelper/csv_file_test.go
@@ -32,7 +32,7 @@ func TestExampleCsvFile(t *testing.T) {
 	writer.Write([]string{"123", "foobar", `{"url": "https://example.com"}`, "2022-05-05 09:56:43.438000000"})
 	writer.Close()
 
-	iter := NewCsvFileIterator(filename)
+	iter, _ := NewCsvFileIterator(filename)
 	defer iter.Close()
 	for iter.HasNext() {
 		row := iter.Fetch()
diff --git a/backend/impls/dalgorm/dalgorm.go b/backend/impls/dalgorm/dalgorm.go
index 342c3730a..268d8ab24 100644
--- a/backend/impls/dalgorm/dalgorm.go
+++ b/backend/impls/dalgorm/dalgorm.go
@@ -31,6 +31,20 @@ import (
 	"gorm.io/gorm/clause"
 )
 
+const (
+	Varchar ColumnType = "varchar(255)"
+	Text    ColumnType = "text"
+	Int     ColumnType = "bigint"
+	Time    ColumnType = "timestamp"
+	Float   ColumnType = "float"
+)
+
+type ColumnType string
+
+func (c ColumnType) String() string {
+	return string(c)
+}
+
 // Dalgorm implements the dal.Dal interface with gorm
 type Dalgorm struct {
 	db *gorm.DB
@@ -181,6 +195,11 @@ func (d *Dalgorm) Create(entity interface{}, clauses ...dal.Clause) errors.Error
 	return errors.Convert(buildTx(d.db, clauses).Create(entity).Error)
 }
 
+// CreateWithMap insert record to database
+func (d *Dalgorm) CreateWithMap(entity interface{}, record map[string]interface{}) errors.Error {
+	return errors.Convert(buildTx(d.db, nil).Model(entity).Clauses(clause.OnConflict{UpdateAll: true}).Create(record).Error)
+}
+
 // Update updates record
 func (d *Dalgorm) Update(entity interface{}, clauses ...dal.Clause) errors.Error {
 	return errors.Convert(buildTx(d.db, clauses).Save(entity).Error)
@@ -247,13 +266,13 @@ func (d *Dalgorm) GetColumns(dst dal.Tabler, filter func(columnMeta dal.ColumnMe
 }
 
 // AddColumn add one column for the table
-func (d *Dalgorm) AddColumn(table, columnName, columnType string) errors.Error {
+func (d *Dalgorm) AddColumn(table, columnName string, columnType dal.ColumnType) errors.Error {
 	// work around the error `cached plan must not change result type` for postgres
 	// wrap in func(){} to make the linter happy
 	defer func() {
 		_ = d.Exec("SELECT * FROM ? LIMIT 1", clause.Table{Name: table})
 	}()
-	return d.Exec("ALTER TABLE ? ADD ? ?", clause.Table{Name: table}, clause.Column{Name: columnName}, clause.Expr{SQL: columnType})
+	return d.Exec("ALTER TABLE ? ADD ? ?", clause.Table{Name: table}, clause.Column{Name: columnName}, clause.Expr{SQL: columnType.String()})
 }
 
 // DropColumns drop one column from the table
diff --git a/backend/plugins/bitbucket/e2e/repo_test.go b/backend/plugins/bitbucket/e2e/repo_test.go
index 5edfd478c..85f88c316 100644
--- a/backend/plugins/bitbucket/e2e/repo_test.go
+++ b/backend/plugins/bitbucket/e2e/repo_test.go
@@ -43,7 +43,7 @@ func TestRepoDataFlow(t *testing.T) {
 	}
 
 	// import raw data table
-	csvIter := pluginhelper.NewCsvFileIterator("./raw_tables/_raw_bitbucket_api_repositories.csv")
+	csvIter, _ := pluginhelper.NewCsvFileIterator("./raw_tables/_raw_bitbucket_api_repositories.csv")
 	defer csvIter.Close()
 	apiRepo := &tasks.BitbucketApiRepo{}
 	// load rows and insert into target table
diff --git a/backend/plugins/customize/api/csv.go b/backend/plugins/customize/api/csv.go
new file mode 100644
index 000000000..651808fb7
--- /dev/null
+++ b/backend/plugins/customize/api/csv.go
@@ -0,0 +1,111 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package api
+
+import (
+	"io"
+	"strings"
+
+	"github.com/apache/incubator-devlake/core/errors"
+	"github.com/apache/incubator-devlake/core/plugin"
+)
+
+const maxMemory = 32 << 20 // 32 MB
+
+// ImportIssue accepts a CSV file, parses and saves it to the database
+// @Summary      Upload issues.csv file
+// @Description  Upload issues.csv file. 3 tables(boards, issues, board_issues) would be affected.
+// @Tags 		 plugins/customize
+// @Accept       multipart/form-data
+// @Param        boardId formData string true "the ID of the board"
+// @Param        boardName formData string true "the name of the board"
+// @Param        file formData file true "select file to upload"
+// @Produce      json
+// @Success      200
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
+// @Router       /plugins/customize/csvfiles/issues.csv [post]
+func (h *Handlers) ImportIssue(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	file, err := h.extractFile(input)
+	if err != nil {
+		return nil, err
+	}
+	// nolint
+	defer file.Close()
+	boardId := strings.TrimSpace(input.Request.FormValue("boardId"))
+	if boardId == "" {
+		return nil, errors.BadInput.New("empty boardId")
+	}
+	boardName := strings.TrimSpace(input.Request.FormValue("boardName"))
+	if boardName == "" {
+		return nil, errors.BadInput.New("empty boardName")
+	}
+	err = h.svc.SaveBoard(boardId, boardName)
+	if err != nil {
+		return nil, err
+	}
+	return nil, h.svc.ImportIssue(boardId, file)
+}
+
+// ImportIssueCommit accepts a CSV file, parses and saves it to the database
+// @Summary      Upload issue_commits.csv file
+// @Description  Upload issue_commits.csv file
+// @Tags 		 plugins/customize
+// @Accept       multipart/form-data
+// @Param        boardId formData string true "the ID of the board"
+// @Param        file formData file true "select file to upload"
+// @Produce      json
+// @Success      200
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
+// @Router       /plugins/customize/csvfiles/issue_commits.csv [post]
+func (h *Handlers) ImportIssueCommit(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
+	file, err := h.extractFile(input)
+	if err != nil {
+		return nil, err
+	}
+	// nolint
+	defer file.Close()
+	boardId := strings.TrimSpace(input.Request.FormValue("boardId"))
+	if boardId == "" {
+		return nil, errors.Default.New("empty boardId")
+	}
+	return nil, h.svc.ImportIssueCommit(boardId, file)
+}
+
+func (h *Handlers) extractFile(input *plugin.ApiResourceInput) (io.ReadCloser, errors.Error) {
+	if input.Request == nil {
+		return nil, errors.Default.New("request is nil")
+	}
+	if input.Request.MultipartForm == nil {
+		if err := input.Request.ParseMultipartForm(maxMemory); err != nil {
+			return nil, errors.Convert(err)
+		}
+	}
+	f, fh, err := input.Request.FormFile("file")
+	if err != nil {
+		return nil, errors.Convert(err)
+	}
+	// nolint
+	f.Close()
+	file, err := fh.Open()
+	if err != nil {
+		return nil, errors.Convert(err)
+	}
+	return file, nil
+}
diff --git a/backend/plugins/customize/api/api.go b/backend/plugins/customize/api/field.go
similarity index 50%
rename from backend/plugins/customize/api/api.go
rename to backend/plugins/customize/api/field.go
index ebf77b737..0d2d8b993 100644
--- a/backend/plugins/customize/api/api.go
+++ b/backend/plugins/customize/api/field.go
@@ -18,108 +18,82 @@ limitations under the License.
 package api
 
 import (
+	"fmt"
+	"net/http"
+	"strings"
+
 	"github.com/apache/incubator-devlake/core/dal"
 	"github.com/apache/incubator-devlake/core/errors"
 	"github.com/apache/incubator-devlake/core/plugin"
+	helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
 	"github.com/apache/incubator-devlake/plugins/customize/models"
-	"net/http"
-	"strings"
-
-	_ "github.com/apache/incubator-devlake/server/api/shared"
+	"github.com/apache/incubator-devlake/plugins/customize/service"
 )
 
 type field struct {
-	ColumnName string `json:"columnName"`
-	ColumnType string `json:"columnType"`
+	ColumnName        string `json:"columnName" example:"x_column_varchar"`
+	DisplayName       string `json:"displayName" example:"department"`
+	DataType          string `json:"dataType" example:"varchar(255)"`
+	Description       string `json:"description" example:"more details about the column"`
+	IsCustomizedField bool   `json:"isCustomizedField" example:"true"`
 }
 
-func getFields(d dal.Dal, tbl string) ([]field, errors.Error) {
-	columns, err := d.GetColumns(&models.Table{Name: tbl}, func(columnMeta dal.ColumnMeta) bool {
-		return strings.HasPrefix(columnMeta.Name(), "x_")
-	})
-	if err != nil {
-		return nil, errors.Default.Wrap(err, "GetColumns error")
-	}
-	var result []field
-	for _, col := range columns {
-		result = append(result, field{
-			ColumnName: col.Name(),
-			ColumnType: "VARCHAR(255)",
-		})
+func (f *field) toCustomizedField(table string) (*models.CustomizedField, errors.Error) {
+	if !strings.HasPrefix(f.ColumnName, "x_") {
+		return nil, errors.BadInput.New("the columnName should start with x_")
 	}
-	return result, nil
-}
-func checkField(d dal.Dal, table, field string) (bool, errors.Error) {
-	if !strings.HasPrefix(field, "x_") {
-		return false, errors.Default.New("column name should start with `x_`")
-	}
-	fields, err := getFields(d, table)
-	if err != nil {
-		return false, err
-	}
-	for _, fld := range fields {
-		if fld.ColumnName == field {
-			return true, nil
-		}
-	}
-	return false, nil
-}
-
-func CreateField(d dal.Dal, table, field string) errors.Error {
-	exists, err := checkField(d, table, field)
-	if err != nil {
-		return err
+	if f.DisplayName == "" {
+		return nil, errors.BadInput.New("the displayName is empty")
 	}
-	if exists {
-		return nil
-	}
-	err = d.AddColumn(table, field, "VARCHAR(255)")
-	if err != nil {
-		return errors.Default.Wrap(err, "AddColumn error")
+	t, ok := dal.ToColumnType(f.DataType)
+	if !ok {
+		return nil, errors.BadInput.New(fmt.Sprintf("the columnType:%s is unsupported", f.DataType))
 	}
-	return nil
+	return &models.CustomizedField{
+		TbName:      table,
+		ColumnName:  f.ColumnName,
+		DisplayName: f.DisplayName,
+		DataType:    t,
+		Description: f.Description,
+	}, nil
 }
 
-func deleteField(d dal.Dal, table, field string) errors.Error {
-	exists, err := checkField(d, table, field)
-	if err != nil {
-		return err
+func fromCustomizedField(cf models.CustomizedField) field {
+	return field{
+		ColumnName:        cf.ColumnName,
+		DisplayName:       cf.DisplayName,
+		DataType:          cf.DataType.String(),
+		Description:       cf.Description,
+		IsCustomizedField: strings.HasPrefix(cf.ColumnName, "x_"),
 	}
-	if !exists {
-		return nil
-	}
-	err = d.DropColumns(table, field)
-	if err != nil {
-		return errors.Default.Wrap(err, "DropColumn error")
-	}
-	return nil
 }
 
-//nolint:unused
-type input struct {
-	Name string `json:"name" example:"x_new_column"`
-}
 type Handlers struct {
-	dal dal.Dal
+	svc *service.Service
 }
 
 func NewHandlers(dal dal.Dal) *Handlers {
-	return &Handlers{dal: dal}
+	return &Handlers{svc: service.NewService(dal)}
 }
 
 // ListFields return all customized fields
 // @Summary return all customized fields
 // @Description return all customized fieldsh
 // @Tags plugins/customize
-// @Success 200  {object} shared.ApiBody "Success"
-// @Failure 400  {string} errcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internal Error"
+// @Param table path string true "the table name"
+// @Success 200  {object} []field "Success"
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
 // @Router /plugins/customize/{table}/fields [GET]
 func (h *Handlers) ListFields(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
-	fields, err := getFields(h.dal, input.Params["table"])
+	customizedFields, err := h.svc.GetFields(input.Params["table"])
 	if err != nil {
 		return &plugin.ApiResourceOutput{Status: http.StatusBadRequest}, errors.Default.Wrap(err, "getFields error")
 	}
+	fields := make([]field, 0, len(customizedFields))
+	for _, cf := range customizedFields {
+		fields = append(fields, fromCustomizedField(cf))
+	}
 	return &plugin.ApiResourceOutput{Body: fields, Status: http.StatusOK}, nil
 }
 
@@ -127,36 +101,44 @@ func (h *Handlers) ListFields(input *plugin.ApiResourceInput) (*plugin.ApiResour
 // @Summary create a customized field
 // @Description create a customized field
 // @Tags plugins/customize
-// @Param request body input true "request body"
-// @Success 200  {object} shared.ApiBody "Success"
-// @Failure 400  {string} errcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internal Error"
+// @Param table path string true "the table name"
+// @Param request body field true "request body"
+// @Success 200  {object} field "Success"
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
 // @Router /plugins/customize/{table}/fields [POST]
 func (h *Handlers) CreateFields(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
 	table := input.Params["table"]
-	fld, ok := input.Body["name"].(string)
-	if !ok {
-		return &plugin.ApiResourceOutput{Status: http.StatusBadRequest}, errors.BadInput.New("the name is not string")
+	fld := &field{}
+	err := helper.Decode(input.Body, fld, nil)
+	if err != nil {
+		return &plugin.ApiResourceOutput{Status: http.StatusBadRequest}, err
+	}
+	customizedField, err := fld.toCustomizedField(table)
+	if err != nil {
+		return &plugin.ApiResourceOutput{Status: http.StatusBadRequest}, err
 	}
-	err := CreateField(h.dal, table, fld)
+	err = h.svc.CreateField(customizedField)
 	if err != nil {
 		return nil, errors.Default.Wrap(err, "CreateField error")
 	}
-	return &plugin.ApiResourceOutput{Body: field{fld, "varchar(255)"}, Status: http.StatusOK}, nil
+	return &plugin.ApiResourceOutput{Body: fld, Status: http.StatusOK}, nil
 }
 
 // DeleteField delete a customized fields
 // @Summary return all customized fields
 // @Description return all customized fields
 // @Tags plugins/customize
+// @Param table path string true "the table name"
+// @Param field path string true "the column to be deleted"
 // @Success 200  {object} shared.ApiBody "Success"
-// @Failure 400  {string} errcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internal Error"
-// @Router /plugins/customize/{table}/fields [DELETE]
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
+// @Router /plugins/customize/{table}/fields/{field} [DELETE]
 func (h *Handlers) DeleteField(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
 	table := input.Params["table"]
 	fld := input.Params["field"]
-	err := deleteField(h.dal, table, fld)
+	err := h.svc.DeleteField(table, fld)
 	if err != nil {
 		return &plugin.ApiResourceOutput{Status: http.StatusBadRequest}, errors.Default.Wrap(err, "deleteField error")
 	}
diff --git a/backend/plugins/customize/e2e/customized_fields_test.go b/backend/plugins/customize/e2e/customized_fields_test.go
new file mode 100644
index 000000000..0e2f9c91c
--- /dev/null
+++ b/backend/plugins/customize/e2e/customized_fields_test.go
@@ -0,0 +1,92 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package e2e
+
+import (
+	"testing"
+
+	"github.com/apache/incubator-devlake/core/models/domainlayer/ticket"
+	"github.com/apache/incubator-devlake/helpers/e2ehelper"
+	"github.com/apache/incubator-devlake/plugins/customize/impl"
+	"github.com/apache/incubator-devlake/plugins/customize/models"
+	"github.com/apache/incubator-devlake/plugins/customize/service"
+)
+
+func TestCustomizedFieldDataFlow(t *testing.T) {
+	var plugin impl.Customize
+	dataflowTester := e2ehelper.NewDataFlowTester(t, "customize", plugin)
+	dataflowTester.FlushTabler(&models.CustomizedField{})
+	dataflowTester.FlushTabler(&ticket.Issue{})
+	svc := service.NewService(dataflowTester.Dal)
+	err := svc.CreateField(&models.CustomizedField{
+		TbName:      "issues",
+		ColumnName:  "x_varchar",
+		DisplayName: "test column x_varchar",
+		DataType:    "varchar(255)",
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = svc.CreateField(&models.CustomizedField{
+		TbName:      "issues",
+		ColumnName:  "x_text",
+		DisplayName: "test column x_text",
+		DataType:    "text",
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = svc.CreateField(&models.CustomizedField{
+		TbName:      "issues",
+		ColumnName:  "x_int",
+		DisplayName: "test column x_int",
+		DataType:    "bigint",
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = svc.CreateField(&models.CustomizedField{
+		TbName:      "issues",
+		ColumnName:  "x_float",
+		DisplayName: "test column x_float",
+		DataType:    "float",
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = svc.CreateField(&models.CustomizedField{
+		TbName:      "issues",
+		ColumnName:  "x_time",
+		DisplayName: "test column x_time",
+		DataType:    "timestamp",
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	ff, err := svc.GetFields("issues")
+	if err != nil {
+		t.Fatal(err)
+	}
+	for _, f := range ff {
+		t.Logf("%+v\n", f)
+	}
+	err = svc.DeleteField("issues", "x_varchar")
+	if err != nil {
+		t.Fatal(err)
+	}
+}
diff --git a/backend/plugins/customize/e2e/extract_fields_test.go b/backend/plugins/customize/e2e/extract_fields_test.go
index 6db3d2c3f..23d297bcc 100644
--- a/backend/plugins/customize/e2e/extract_fields_test.go
+++ b/backend/plugins/customize/e2e/extract_fields_test.go
@@ -18,12 +18,14 @@ limitations under the License.
 package e2e
 
 import (
+	"github.com/apache/incubator-devlake/plugins/customize/models"
+	"testing"
+
 	"github.com/apache/incubator-devlake/core/models/domainlayer/ticket"
 	"github.com/apache/incubator-devlake/helpers/e2ehelper"
-	"github.com/apache/incubator-devlake/plugins/customize/api"
 	"github.com/apache/incubator-devlake/plugins/customize/impl"
+	"github.com/apache/incubator-devlake/plugins/customize/service"
 	"github.com/apache/incubator-devlake/plugins/customize/tasks"
-	"testing"
 )
 
 func TestBoardDataFlow(t *testing.T) {
@@ -42,7 +44,14 @@ func TestBoardDataFlow(t *testing.T) {
 	// import raw data table
 	dataflowTester.ImportCsvIntoRawTable("./raw_tables/_raw_jira_api_issues.csv", "_raw_jira_api_issues")
 	dataflowTester.ImportCsvIntoTabler("./raw_tables/issues.csv", &ticket.Issue{})
-	err := api.CreateField(dataflowTester.Dal, "issues", "x_test")
+	dataflowTester.FlushTabler(&models.CustomizedField{})
+	svc := service.NewService(dataflowTester.Dal)
+	err := svc.CreateField(&models.CustomizedField{
+		TbName:      "issues",
+		ColumnName:  "x_test",
+		DisplayName: "test column",
+		DataType:    "varchar(255)",
+	})
 	if err != nil {
 		t.Fatal(err)
 	}
diff --git a/backend/plugins/customize/e2e/extract_fields_test.go b/backend/plugins/customize/e2e/import_issue_commits_test.go
similarity index 51%
copy from backend/plugins/customize/e2e/extract_fields_test.go
copy to backend/plugins/customize/e2e/import_issue_commits_test.go
index 6db3d2c3f..3b98f6b86 100644
--- a/backend/plugins/customize/e2e/extract_fields_test.go
+++ b/backend/plugins/customize/e2e/import_issue_commits_test.go
@@ -18,42 +18,36 @@ limitations under the License.
 package e2e
 
 import (
-	"github.com/apache/incubator-devlake/core/models/domainlayer/ticket"
+	"github.com/apache/incubator-devlake/core/models/domainlayer/crossdomain"
 	"github.com/apache/incubator-devlake/helpers/e2ehelper"
-	"github.com/apache/incubator-devlake/plugins/customize/api"
 	"github.com/apache/incubator-devlake/plugins/customize/impl"
-	"github.com/apache/incubator-devlake/plugins/customize/tasks"
+	"github.com/apache/incubator-devlake/plugins/customize/service"
+	"os"
 	"testing"
 )
 
-func TestBoardDataFlow(t *testing.T) {
+func TestImportIssueCommitDataFlow(t *testing.T) {
 	var plugin impl.Customize
 	dataflowTester := e2ehelper.NewDataFlowTester(t, "customize", plugin)
 
-	taskData := &tasks.TaskData{
-		Options: &tasks.Options{
-			TransformationRules: []tasks.MappingRules{{
-				Table:         "issues",
-				RawDataTable:  "_raw_jira_api_issues",
-				RawDataParams: "{\"ConnectionId\":1,\"BoardId\":8}",
-				Mapping:       map[string]string{"x_test": "fields.created"},
-			}}}}
-
 	// import raw data table
-	dataflowTester.ImportCsvIntoRawTable("./raw_tables/_raw_jira_api_issues.csv", "_raw_jira_api_issues")
-	dataflowTester.ImportCsvIntoTabler("./raw_tables/issues.csv", &ticket.Issue{})
-	err := api.CreateField(dataflowTester.Dal, "issues", "x_test")
+	dataflowTester.FlushTabler(&crossdomain.IssueCommit{})
+	svc := service.NewService(dataflowTester.Dal)
+
+	f, err1 := os.Open("raw_tables/issues_commits.csv")
+	if err1 != nil {
+		t.Fatal(err1)
+	}
+	defer f.Close()
+	err := svc.ImportIssueCommit(`{"ConnectionId":1,"BoardId":8}`, f)
 	if err != nil {
 		t.Fatal(err)
 	}
-	// verify extension fields extraction
-	dataflowTester.Subtask(tasks.ExtractCustomizedFieldsMeta, taskData)
-	dataflowTester.VerifyTable(
-		ticket.Issue{},
-		"./snapshot_tables/issues.csv",
-		e2ehelper.ColumnWithRawData(
-			"id",
-			"x_test",
-		),
-	)
+	dataflowTester.VerifyTableWithRawData(
+		crossdomain.IssueCommit{},
+		"snapshot_tables/issue_commits.csv",
+		[]string{
+			"issue_id",
+			"commit_sha",
+		})
 }
diff --git a/backend/plugins/customize/e2e/import_issues_test.go b/backend/plugins/customize/e2e/import_issues_test.go
new file mode 100644
index 000000000..81a0db4e9
--- /dev/null
+++ b/backend/plugins/customize/e2e/import_issues_test.go
@@ -0,0 +1,147 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package e2e
+
+import (
+	"github.com/apache/incubator-devlake/core/models/domainlayer/ticket"
+	"github.com/apache/incubator-devlake/helpers/e2ehelper"
+	"github.com/apache/incubator-devlake/plugins/customize/impl"
+	"github.com/apache/incubator-devlake/plugins/customize/models"
+	"github.com/apache/incubator-devlake/plugins/customize/service"
+	"os"
+	"testing"
+)
+
+func TestImportIssueDataFlow(t *testing.T) {
+	var plugin impl.Customize
+	dataflowTester := e2ehelper.NewDataFlowTester(t, "customize", plugin)
+
+	// import raw data table
+	dataflowTester.FlushTabler(&ticket.Issue{})
+	dataflowTester.FlushTabler(&models.CustomizedField{})
+	dataflowTester.FlushTabler(&ticket.IssueLabel{})
+	dataflowTester.FlushTabler(&ticket.BoardIssue{})
+	svc := service.NewService(dataflowTester.Dal)
+	err := svc.CreateField(&models.CustomizedField{
+		TbName:      "issues",
+		ColumnName:  "x_varchar",
+		DisplayName: "test column x_varchar",
+		DataType:    "varchar(255)",
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = svc.CreateField(&models.CustomizedField{
+		TbName:      "issues",
+		ColumnName:  "x_text",
+		DisplayName: "test column x_text",
+		DataType:    "text",
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = svc.CreateField(&models.CustomizedField{
+		TbName:      "issues",
+		ColumnName:  "x_int",
+		DisplayName: "test column x_int",
+		DataType:    "bigint",
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = svc.CreateField(&models.CustomizedField{
+		TbName:      "issues",
+		ColumnName:  "x_float",
+		DisplayName: "test column x_float",
+		DataType:    "float",
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = svc.CreateField(&models.CustomizedField{
+		TbName:      "issues",
+		ColumnName:  "x_time",
+		DisplayName: "test column x_time",
+		DataType:    "timestamp",
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	issueFile, err1 := os.Open("raw_tables/issues_input.csv")
+	if err1 != nil {
+		t.Fatal(err1)
+	}
+	defer issueFile.Close()
+	err = svc.ImportIssue(`{"ConnectionId":1,"Owner":"thenicetgp","Repo":"lake"}`, issueFile)
+	if err != nil {
+		t.Fatal(err)
+	}
+	dataflowTester.VerifyTableWithRawData(
+		ticket.Issue{},
+		"snapshot_tables/issues_output.csv",
+		[]string{
+			"id",
+			"url",
+			"icon_url",
+			"issue_key",
+			"title",
+			"description",
+			"epic_key",
+			"type",
+			"original_type",
+			"status",
+			"original_status",
+			"story_point",
+			"resolution_date",
+			"created_date",
+			"updated_date",
+			"lead_time_minutes",
+			"parent_issue_id",
+			"priority",
+			"original_estimate_minutes",
+			"time_spent_minutes",
+			"time_remaining_minutes",
+			"creator_id",
+			"creator_name",
+			"assignee_id",
+			"assignee_name",
+			"severity",
+			"component",
+			"original_project",
+			"x_varchar",
+			"x_text",
+			"x_time",
+			"x_float",
+			"x_int",
+		})
+	dataflowTester.VerifyTableWithRawData(
+		&ticket.IssueLabel{},
+		"snapshot_tables/issue_labels.csv",
+		[]string{
+			"issue_id",
+			"label_name",
+		})
+	dataflowTester.VerifyTableWithRawData(
+		&ticket.BoardIssue{},
+		"snapshot_tables/board_issues.csv",
+		[]string{
+			"board_id",
+			"issue_id",
+		})
+}
diff --git a/backend/plugins/customize/e2e/raw_tables/issues_commits.csv b/backend/plugins/customize/e2e/raw_tables/issues_commits.csv
new file mode 100644
index 000000000..11586e3c2
--- /dev/null
+++ b/backend/plugins/customize/e2e/raw_tables/issues_commits.csv
@@ -0,0 +1,13 @@
+"created_at","updated_at","_raw_data_params","_raw_data_table","_raw_data_id","_raw_data_remark","issue_id","commit_sha"
+"2022-10-25 08:38:12.588","2022-10-25 08:38:12.588","{""ConnectionId"":1,""BoardId"":8}","_raw_jira_api_remotelinks",1,"","jira:JiraIssue:1:10063","8748a066cbaf67b15e86f2c636f9931347e987cf"
+"2022-10-25 08:38:12.588","2022-10-25 08:38:12.588","{""ConnectionId"":1,""BoardId"":9}","_raw_jira_api_remotelinks",2,"","jira:JiraIssue:1:10064","abc0892edaee00dd7ee268dbee71620407a29bca"
+"2022-10-25 08:38:12.588","2022-10-25 08:38:12.588","{""ConnectionId"":1,""BoardId"":8}","_raw_jira_api_remotelinks",3,"","jira:JiraIssue:1:10064","e6bde456807818c5c78d7b265964d6d48b653af6"
+"2022-10-25 08:38:12.588","2022-10-25 08:38:12.588","{""ConnectionId"":1,""BoardId"":8}","_raw_jira_api_remotelinks",4,"","jira:JiraIssue:1:10065","8f91020bcf684c6ad07adfafa3d8a2f826686c42"
+"2022-10-25 08:38:12.588","2022-10-25 08:38:12.588","{""ConnectionId"":1,""BoardId"":8}","_raw_jira_api_remotelinks",5,"","jira:JiraIssue:1:10066","0dfe2e9ed88ad4e27f825d9b67d4d56ac983c5ef"
+"2022-10-25 08:38:12.588","2022-10-25 08:38:12.588","{""ConnectionId"":2,""BoardId"":8}","_raw_jira_api_remotelinks",13,"","jira:JiraIssue:1:10139","8993c04249e9d549e8950daec86717548c53c423"
+"2022-10-25 08:38:12.588","2022-10-25 08:38:12.588","{""ConnectionId"":1,""BoardId"":8}","_raw_jira_api_remotelinks",19,"","jira:JiraIssue:1:10145","07aa2ebed68e286dc51a7e0082031196a6135f74"
+"2022-10-25 08:38:12.588","2022-10-25 08:38:12.588","{""ConnectionId"":1,""BoardId"":8}","_raw_jira_api_remotelinks",20,"","jira:JiraIssue:1:10145","d70d6687e06304d9b6e0cb32b3f8c0f0928400f7"
+"2022-10-25 08:38:12.588","2022-10-25 08:38:12.588","{""ConnectionId"":11,""BoardId"":18}","_raw_jira_api_remotelinks",21,"","jira:JiraIssue:1:10145","ef5ab26111744f65f5191b247767a473c70d6c95"
+"2022-10-25 08:38:12.588","2022-10-25 08:38:12.588","{""ConnectionId"":1,""BoardId"":8}","_raw_jira_api_remotelinks",36,"","jira:JiraIssue:1:10159","d28785ff09229ac9e3c6734f0c97466ab00eb4da"
+"2022-10-25 08:38:12.588","2022-10-25 08:38:12.588","{""ConnectionId"":1,""BoardId"":8}","_raw_jira_api_remotelinks",38,"","jira:JiraIssue:1:10202","0ab12c4d4064003602edceed900d1456b6209894"
+"2022-10-25 08:38:12.588","2022-10-25 08:38:12.588","{""ConnectionId"":1,""BoardId"":8}","_raw_jira_api_remotelinks",39,"","jira:JiraIssue:1:10203","980e9fe7bc3e22a0409f7241a024eaf9c53680dd"
diff --git a/backend/plugins/customize/e2e/raw_tables/issues_input.csv b/backend/plugins/customize/e2e/raw_tables/issues_input.csv
new file mode 100644
index 000000000..c55b0643f
--- /dev/null
+++ b/backend/plugins/customize/e2e/raw_tables/issues_input.csv
@@ -0,0 +1,5 @@
+"id","created_at","updated_at","_raw_data_params","_raw_data_table","_raw_data_id","_raw_data_remark","url","icon_url","issue_key","title","description","epic_key","type","status","original_status","story_point","resolution_date","created_date","updated_date","parent_issue_id","priority","original_estimate_minutes","time_spent_minutes","time_remaining_minutes","creator_id","creator_name","assignee_id","assignee_name","severity","component","lead_time_minutes","original_project","original [...]
+"bitbucket:BitbucketIssue:1:1","2022-09-15 15:27:56.395","2022-09-15 15:27:56.395","{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}","_raw_bitbucket_api_issues",60,"","https://api.bitbucket.org/2.0/repositories/thenicetgp/lake/issues/1","",1,"issue test","bitbucket issues test for devlake","","issue","TODO","new",0,NULL,"2022-07-17 07:15:55.959+00:00","2022-07-17 09:11:42.656+00:00","","major",0,0,0,"bitbucket:BitbucketAccount:1:62abf394192edb006fa0e8cf","tgp","bitbucket: [...]
+"bitbucket:BitbucketIssue:1:10","2022-09-15 15:27:56.395","2022-09-15 15:27:56.395","{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}","_raw_bitbucket_api_issues",52,"","https://api.bitbucket.org/2.0/repositories/thenicetgp/lake/issues/10","",10,"issue test007","issue test007","","issue","TODO","new",0,NULL,"2022-08-12 13:43:00.783+00:00","2022-08-12 13:43:00.783+00:00","","trivial",0,0,0,"bitbucket:BitbucketAccount:1:62abf394192edb006fa0e8cf","tgp","bitbucket:BitbucketAcc [...]
+"bitbucket:BitbucketIssue:1:13","2022-09-15 15:27:56.395","2022-09-15 15:27:56.395","{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}","_raw_bitbucket_api_issues",50,"","https://api.bitbucket.org/2.0/repositories/thenicetgp/lake/issues/13","",13,"issue test010","issue test010","","issue","TODO","new",0,NULL,"2022-08-12 13:44:46.508+00:00","2022-08-12 13:44:46.508+00:00","","critical",0,0,0,"bitbucket:BitbucketAccount:1:62abf394192edb006fa0e8cf","tgp","","","","",NULL,NULL, [...]
+"bitbucket:BitbucketIssue:1:14","2022-09-15 15:27:56.395","2022-09-15 15:27:56.395","{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}","_raw_bitbucket_api_issues",49,"","https://api.bitbucket.org/2.0/repositories/thenicetgp/lake/issues/14","",14,"issue test011","issue test011","","issue","TODO","new",0,NULL,"2022-08-12 13:45:12.810+00:00","2022-08-12 13:45:12.810+00:00","","blocker",0,0,0,"bitbucket:BitbucketAccount:1:62abf394192edb006fa0e8cf","tgp","bitbucket:BitbucketAcc [...]
diff --git a/backend/plugins/customize/e2e/snapshot_tables/_tool_customized_fields.csv b/backend/plugins/customize/e2e/snapshot_tables/_tool_customized_fields.csv
new file mode 100644
index 000000000..6d8ab78ad
--- /dev/null
+++ b/backend/plugins/customize/e2e/snapshot_tables/_tool_customized_fields.csv
@@ -0,0 +1,5 @@
+tb_name,column_name,display_name,data_type
+issues,x_float,test column x_float,float
+issues,x_int,test column x_int,bigint
+issues,x_text,test column x_text,text
+issues,x_time,test column x_time,timestamp
diff --git a/backend/plugins/customize/e2e/snapshot_tables/board_issues.csv b/backend/plugins/customize/e2e/snapshot_tables/board_issues.csv
new file mode 100644
index 000000000..5c1c86d1a
--- /dev/null
+++ b/backend/plugins/customize/e2e/snapshot_tables/board_issues.csv
@@ -0,0 +1,5 @@
+board_id,issue_id,_raw_data_params,_raw_data_table,_raw_data_id,_raw_data_remark
+"{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}",bitbucket:BitbucketIssue:1:1,,,0,
+"{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}",bitbucket:BitbucketIssue:1:10,,,0,
+"{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}",bitbucket:BitbucketIssue:1:13,,,0,
+"{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}",bitbucket:BitbucketIssue:1:14,,,0,
diff --git a/backend/plugins/customize/e2e/snapshot_tables/issue_commits.csv b/backend/plugins/customize/e2e/snapshot_tables/issue_commits.csv
new file mode 100644
index 000000000..98b01ac99
--- /dev/null
+++ b/backend/plugins/customize/e2e/snapshot_tables/issue_commits.csv
@@ -0,0 +1,13 @@
+issue_id,commit_sha,_raw_data_params,_raw_data_table,_raw_data_id,_raw_data_remark
+jira:JiraIssue:1:10063,8748a066cbaf67b15e86f2c636f9931347e987cf,"{""ConnectionId"":1,""BoardId"":8}",_raw_jira_api_remotelinks,1,
+jira:JiraIssue:1:10064,abc0892edaee00dd7ee268dbee71620407a29bca,"{""ConnectionId"":1,""BoardId"":8}",_raw_jira_api_remotelinks,2,
+jira:JiraIssue:1:10064,e6bde456807818c5c78d7b265964d6d48b653af6,"{""ConnectionId"":1,""BoardId"":8}",_raw_jira_api_remotelinks,3,
+jira:JiraIssue:1:10065,8f91020bcf684c6ad07adfafa3d8a2f826686c42,"{""ConnectionId"":1,""BoardId"":8}",_raw_jira_api_remotelinks,4,
+jira:JiraIssue:1:10066,0dfe2e9ed88ad4e27f825d9b67d4d56ac983c5ef,"{""ConnectionId"":1,""BoardId"":8}",_raw_jira_api_remotelinks,5,
+jira:JiraIssue:1:10139,8993c04249e9d549e8950daec86717548c53c423,"{""ConnectionId"":1,""BoardId"":8}",_raw_jira_api_remotelinks,13,
+jira:JiraIssue:1:10145,07aa2ebed68e286dc51a7e0082031196a6135f74,"{""ConnectionId"":1,""BoardId"":8}",_raw_jira_api_remotelinks,19,
+jira:JiraIssue:1:10145,d70d6687e06304d9b6e0cb32b3f8c0f0928400f7,"{""ConnectionId"":1,""BoardId"":8}",_raw_jira_api_remotelinks,20,
+jira:JiraIssue:1:10145,ef5ab26111744f65f5191b247767a473c70d6c95,"{""ConnectionId"":1,""BoardId"":8}",_raw_jira_api_remotelinks,21,
+jira:JiraIssue:1:10159,d28785ff09229ac9e3c6734f0c97466ab00eb4da,"{""ConnectionId"":1,""BoardId"":8}",_raw_jira_api_remotelinks,36,
+jira:JiraIssue:1:10202,0ab12c4d4064003602edceed900d1456b6209894,"{""ConnectionId"":1,""BoardId"":8}",_raw_jira_api_remotelinks,38,
+jira:JiraIssue:1:10203,980e9fe7bc3e22a0409f7241a024eaf9c53680dd,"{""ConnectionId"":1,""BoardId"":8}",_raw_jira_api_remotelinks,39,
diff --git a/backend/plugins/customize/e2e/snapshot_tables/issue_labels.csv b/backend/plugins/customize/e2e/snapshot_tables/issue_labels.csv
new file mode 100644
index 000000000..7b7191daa
--- /dev/null
+++ b/backend/plugins/customize/e2e/snapshot_tables/issue_labels.csv
@@ -0,0 +1,5 @@
+issue_id,label_name,_raw_data_params,_raw_data_table,_raw_data_id,_raw_data_remark
+bitbucket:BitbucketIssue:1:10,hello worlds,"{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}",,0,
+bitbucket:BitbucketIssue:1:14,label1,"{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}",,0,
+bitbucket:BitbucketIssue:1:14,label2,"{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}",,0,
+bitbucket:BitbucketIssue:1:14,label3,"{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}",,0,
diff --git a/backend/plugins/customize/e2e/snapshot_tables/issues_csv.csv b/backend/plugins/customize/e2e/snapshot_tables/issues_csv.csv
new file mode 100644
index 000000000..4f930ee29
--- /dev/null
+++ b/backend/plugins/customize/e2e/snapshot_tables/issues_csv.csv
@@ -0,0 +1,4 @@
+"id","created_at","updated_at","_raw_data_params","_raw_data_table","_raw_data_id","_raw_data_remark","url","icon_url","issue_key","title","description","epic_key","type","status","original_status","story_point","resolution_date","created_date","updated_date","parent_issue_id","priority","original_estimate_minutes","time_spent_minutes","time_remaining_minutes","creator_id","creator_name","assignee_id","assignee_name","severity","component","lead_time_minutes","original_project","original [...]
+"abc","2023-02-07 14:55:19.650","2023-02-07 14:55:19.650","","",0,"","","","","","","","","","",0,NULL,NULL,NULL,"","",0,0,0,"","","","","","",0,"","",20,"2022-09-15 15:27:56","hello",123.5
+"bitbucket:BitbucketIssue:1:1","2022-09-15 15:27:56.395","2022-09-15 15:27:56.395","{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}","_raw_bitbucket_api_issues",60,"","https://api.bitbucket.org/2.0/repositories/thenicetgp/lake/issues/1","",1,"issue test","bitbucket issues test for devlake","","issue","TODO","new",0,NULL,"2022-07-17 07:15:55.959","2022-07-17 09:11:42.656","","major",0,0,0,"bitbucket:BitbucketAccount:1:62abf394192edb006fa0e8cf","tgp","bitbucket:BitbucketAcc [...]
+"bitbucket:BitbucketIssue:1:10","2022-09-15 15:27:56.395","2022-09-15 15:27:56.395","{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}","_raw_bitbucket_api_issues",52,"","https://api.bitbucket.org/2.0/repositories/thenicetgp/lake/issues/10","",10,"issue test007","issue test007","","issue","TODO","new",0,NULL,"2022-08-12 13:43:00.783","2022-08-12 13:43:00.783","","trivial",0,0,0,"bitbucket:BitbucketAccount:1:62abf394192edb006fa0e8cf","tgp","bitbucket:BitbucketAccount:1:62abf [...]
diff --git a/backend/plugins/customize/e2e/snapshot_tables/issues_output.csv b/backend/plugins/customize/e2e/snapshot_tables/issues_output.csv
new file mode 100644
index 000000000..d64c1e8c1
--- /dev/null
+++ b/backend/plugins/customize/e2e/snapshot_tables/issues_output.csv
@@ -0,0 +1,5 @@
+id,url,icon_url,issue_key,title,description,epic_key,type,original_type,status,original_status,story_point,resolution_date,created_date,updated_date,lead_time_minutes,parent_issue_id,priority,original_estimate_minutes,time_spent_minutes,time_remaining_minutes,creator_id,creator_name,assignee_id,assignee_name,severity,component,original_project,x_varchar,x_text,x_time,x_float,x_int,_raw_data_params,_raw_data_table,_raw_data_id,_raw_data_remark
+bitbucket:BitbucketIssue:1:1,https://api.bitbucket.org/2.0/repositories/thenicetgp/lake/issues/1,,1,issue test,bitbucket issues test for devlake,,issue,,TODO,new,0,,2022-07-17T07:15:55.959+00:00,2022-07-17T09:11:42.656+00:00,,,major,0,0,0,bitbucket:BitbucketAccount:1:62abf394192edb006fa0e8cf,tgp,bitbucket:BitbucketAccount:1:62abf394192edb006fa0e8cf,tgp,,,,world,,2022-09-15T15:27:56.000+00:00,8,10,"{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}",_raw_bitbucket_api_issues,60,
+bitbucket:BitbucketIssue:1:10,https://api.bitbucket.org/2.0/repositories/thenicetgp/lake/issues/10,,10,issue test007,issue test007,,issue,,TODO,new,0,,2022-08-12T13:43:00.783+00:00,2022-08-12T13:43:00.783+00:00,,,trivial,0,0,0,bitbucket:BitbucketAccount:1:62abf394192edb006fa0e8cf,tgp,bitbucket:BitbucketAccount:1:62abf394192edb006fa0e8cf,tgp,,,,abc,,2022-09-15T15:27:56.000+00:00,2.45679e+06,30,"{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}",_raw_bitbucket_api_issues,52,
+bitbucket:BitbucketIssue:1:13,https://api.bitbucket.org/2.0/repositories/thenicetgp/lake/issues/13,,13,issue test010,issue test010,,issue,,TODO,new,0,,2022-08-12T13:44:46.508+00:00,2022-08-12T13:44:46.508+00:00,,,critical,0,0,0,bitbucket:BitbucketAccount:1:62abf394192edb006fa0e8cf,tgp,,,,,,,,2022-09-15T15:27:56.000+00:00,0.00014,1,"{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}",_raw_bitbucket_api_issues,50,
+bitbucket:BitbucketIssue:1:14,https://api.bitbucket.org/2.0/repositories/thenicetgp/lake/issues/14,,14,issue test011,issue test011,,issue,,TODO,new,0,,2022-08-12T13:45:12.810+00:00,2022-08-12T13:45:12.810+00:00,,,blocker,0,0,0,bitbucket:BitbucketAccount:1:62abf394192edb006fa0e8cf,tgp,bitbucket:BitbucketAccount:1:62abf394192edb006fa0e8cf,tgp,,,,,,2022-09-15T15:27:56.000+00:00,,41534568464351,"{""ConnectionId"":1,""Owner"":""thenicetgp"",""Repo"":""lake""}",_raw_bitbucket_api_issues,49,
diff --git a/backend/plugins/customize/impl/impl.go b/backend/plugins/customize/impl/impl.go
index a22ba2512..874cd300f 100644
--- a/backend/plugins/customize/impl/impl.go
+++ b/backend/plugins/customize/impl/impl.go
@@ -23,6 +23,7 @@ import (
 	"github.com/apache/incubator-devlake/core/errors"
 	"github.com/apache/incubator-devlake/core/plugin"
 	"github.com/apache/incubator-devlake/plugins/customize/api"
+	"github.com/apache/incubator-devlake/plugins/customize/models/migrationscripts"
 	"github.com/apache/incubator-devlake/plugins/customize/tasks"
 	"github.com/mitchellh/mapstructure"
 )
@@ -31,6 +32,7 @@ var _ plugin.PluginMeta = (*Customize)(nil)
 var _ plugin.PluginInit = (*Customize)(nil)
 var _ plugin.PluginApi = (*Customize)(nil)
 var _ plugin.PluginModel = (*Customize)(nil)
+var _ plugin.PluginMigration = (*Customize)(nil)
 
 type Customize struct {
 	handlers *api.Handlers
@@ -73,6 +75,10 @@ func (p Customize) Description() string {
 	return "To customize table fields"
 }
 
+func (p Customize) MigrationScripts() []plugin.MigrationScript {
+	return migrationscripts.All()
+}
+
 func (p Customize) RootPkgPath() string {
 	return "github.com/apache/incubator-devlake/plugins/customize"
 }
@@ -86,5 +92,11 @@ func (p *Customize) ApiResources() map[string]map[string]plugin.ApiResourceHandl
 		":table/fields/:field": {
 			"DELETE": p.handlers.DeleteField,
 		},
+		"csvfiles/issues.csv": {
+			"POST": p.handlers.ImportIssue,
+		},
+		"csvfiles/issue_commits.csv": {
+			"POST": p.handlers.ImportIssueCommit,
+		},
 	}
 }
diff --git a/backend/helpers/pluginhelper/csv_file_test.go b/backend/plugins/customize/models/customized_field.go
similarity index 53%
copy from backend/helpers/pluginhelper/csv_file_test.go
copy to backend/plugins/customize/models/customized_field.go
index 644c88b38..16a4d79db 100644
--- a/backend/helpers/pluginhelper/csv_file_test.go
+++ b/backend/plugins/customize/models/customized_field.go
@@ -15,28 +15,24 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package pluginhelper
+package models
 
 import (
-	"fmt"
-	"github.com/magiconair/properties/assert"
-	"testing"
-)
+	"time"
 
-func TestExampleCsvFile(t *testing.T) {
-	tmpPath := t.TempDir()
-	filename := fmt.Sprintf(`%s/foobar.csv`, tmpPath)
-	println(filename)
+	"github.com/apache/incubator-devlake/core/dal"
+)
 
-	writer := NewCsvFileWriter(filename, []string{"id", "name", "json", "created_at"})
-	writer.Write([]string{"123", "foobar", `{"url": "https://example.com"}`, "2022-05-05 09:56:43.438000000"})
-	writer.Close()
+type CustomizedField struct {
+	CreatedAt   time.Time      `json:"createdAt"`
+	UpdatedAt   time.Time      `json:"updatedAt"`
+	TbName      string         `gorm:"primaryKey;type:varchar(255)"` // avoid conflicting with the method `TableName()`
+	ColumnName  string         `gorm:"primaryKey;type:varchar(255)"`
+	DisplayName string         `gorm:"type:varchar(255)"`
+	DataType    dal.ColumnType `gorm:"type:varchar(255)"`
+	Description string
+}
 
-	iter := NewCsvFileIterator(filename)
-	defer iter.Close()
-	for iter.HasNext() {
-		row := iter.Fetch()
-		assert.Equal(t, row["name"], "foobar", "name not euqal")
-		assert.Equal(t, row["json"], `{"url": "https://example.com"}`, "json not euqal")
-	}
+func (t *CustomizedField) TableName() string {
+	return "_tool_customized_fields"
 }
diff --git a/backend/helpers/pluginhelper/csv_file_test.go b/backend/plugins/customize/models/migrationscripts/20230201_add_customized_fields.go
similarity index 53%
copy from backend/helpers/pluginhelper/csv_file_test.go
copy to backend/plugins/customize/models/migrationscripts/20230201_add_customized_fields.go
index 644c88b38..274b96ece 100644
--- a/backend/helpers/pluginhelper/csv_file_test.go
+++ b/backend/plugins/customize/models/migrationscripts/20230201_add_customized_fields.go
@@ -15,28 +15,24 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package pluginhelper
+package migrationscripts
 
 import (
-	"fmt"
-	"github.com/magiconair/properties/assert"
-	"testing"
+	"github.com/apache/incubator-devlake/core/context"
+	"github.com/apache/incubator-devlake/core/errors"
+	"github.com/apache/incubator-devlake/plugins/customize/models/migrationscripts/archived"
 )
 
-func TestExampleCsvFile(t *testing.T) {
-	tmpPath := t.TempDir()
-	filename := fmt.Sprintf(`%s/foobar.csv`, tmpPath)
-	println(filename)
-
-	writer := NewCsvFileWriter(filename, []string{"id", "name", "json", "created_at"})
-	writer.Write([]string{"123", "foobar", `{"url": "https://example.com"}`, "2022-05-05 09:56:43.438000000"})
-	writer.Close()
-
-	iter := NewCsvFileIterator(filename)
-	defer iter.Close()
-	for iter.HasNext() {
-		row := iter.Fetch()
-		assert.Equal(t, row["name"], "foobar", "name not euqal")
-		assert.Equal(t, row["json"], `{"url": "https://example.com"}`, "json not euqal")
-	}
+type addCustomizedField struct{}
+
+func (script *addCustomizedField) Up(basicRes context.BasicRes) errors.Error {
+	return basicRes.GetDal().AutoMigrate(&archived.CustomizedField{})
+}
+
+func (*addCustomizedField) Version() uint64 {
+	return 20230201093311
+}
+
+func (*addCustomizedField) Name() string {
+	return "add _tool_customized_fields"
 }
diff --git a/backend/helpers/pluginhelper/csv_file_test.go b/backend/plugins/customize/models/migrationscripts/archived/customized_field.go
similarity index 53%
copy from backend/helpers/pluginhelper/csv_file_test.go
copy to backend/plugins/customize/models/migrationscripts/archived/customized_field.go
index 644c88b38..96814a1f6 100644
--- a/backend/helpers/pluginhelper/csv_file_test.go
+++ b/backend/plugins/customize/models/migrationscripts/archived/customized_field.go
@@ -15,28 +15,24 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package pluginhelper
+package archived
 
 import (
-	"fmt"
-	"github.com/magiconair/properties/assert"
-	"testing"
-)
+	"time"
 
-func TestExampleCsvFile(t *testing.T) {
-	tmpPath := t.TempDir()
-	filename := fmt.Sprintf(`%s/foobar.csv`, tmpPath)
-	println(filename)
+	"github.com/apache/incubator-devlake/core/dal"
+)
 
-	writer := NewCsvFileWriter(filename, []string{"id", "name", "json", "created_at"})
-	writer.Write([]string{"123", "foobar", `{"url": "https://example.com"}`, "2022-05-05 09:56:43.438000000"})
-	writer.Close()
+type CustomizedField struct {
+	CreatedAt   time.Time      `json:"createdAt"`
+	UpdatedAt   time.Time      `json:"updatedAt"`
+	TbName      string         `gorm:"primaryKey;type:varchar(255)"` // avoid conflicting with the method `TableName()`
+	ColumnName  string         `gorm:"primaryKey;type:varchar(255)"`
+	DisplayName string         `gorm:"type:varchar(255)"`
+	DataType    dal.ColumnType `gorm:"type:varchar(255)"`
+	Description string
+}
 
-	iter := NewCsvFileIterator(filename)
-	defer iter.Close()
-	for iter.HasNext() {
-		row := iter.Fetch()
-		assert.Equal(t, row["name"], "foobar", "name not euqal")
-		assert.Equal(t, row["json"], `{"url": "https://example.com"}`, "json not euqal")
-	}
+func (t *CustomizedField) TableName() string {
+	return "_tool_customized_fields"
 }
diff --git a/backend/helpers/pluginhelper/csv_file_test.go b/backend/plugins/customize/models/migrationscripts/register.go
similarity index 53%
copy from backend/helpers/pluginhelper/csv_file_test.go
copy to backend/plugins/customize/models/migrationscripts/register.go
index 644c88b38..fbe26a71e 100644
--- a/backend/helpers/pluginhelper/csv_file_test.go
+++ b/backend/plugins/customize/models/migrationscripts/register.go
@@ -15,28 +15,15 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package pluginhelper
+package migrationscripts
 
 import (
-	"fmt"
-	"github.com/magiconair/properties/assert"
-	"testing"
+	"github.com/apache/incubator-devlake/core/plugin"
 )
 
-func TestExampleCsvFile(t *testing.T) {
-	tmpPath := t.TempDir()
-	filename := fmt.Sprintf(`%s/foobar.csv`, tmpPath)
-	println(filename)
-
-	writer := NewCsvFileWriter(filename, []string{"id", "name", "json", "created_at"})
-	writer.Write([]string{"123", "foobar", `{"url": "https://example.com"}`, "2022-05-05 09:56:43.438000000"})
-	writer.Close()
-
-	iter := NewCsvFileIterator(filename)
-	defer iter.Close()
-	for iter.HasNext() {
-		row := iter.Fetch()
-		assert.Equal(t, row["name"], "foobar", "name not euqal")
-		assert.Equal(t, row["json"], `{"url": "https://example.com"}`, "json not euqal")
+// All return all the migration scripts
+func All() []plugin.MigrationScript {
+	return []plugin.MigrationScript{
+		new(addCustomizedField),
 	}
 }
diff --git a/backend/plugins/customize/service/service.go b/backend/plugins/customize/service/service.go
new file mode 100644
index 000000000..869492f13
--- /dev/null
+++ b/backend/plugins/customize/service/service.go
@@ -0,0 +1,263 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package service
+
+import (
+	"fmt"
+	"io"
+	"regexp"
+	"strings"
+
+	"github.com/apache/incubator-devlake/core/dal"
+	"github.com/apache/incubator-devlake/core/errors"
+	"github.com/apache/incubator-devlake/core/models/common"
+	"github.com/apache/incubator-devlake/core/models/domainlayer"
+	"github.com/apache/incubator-devlake/core/models/domainlayer/crossdomain"
+	"github.com/apache/incubator-devlake/core/models/domainlayer/ticket"
+	"github.com/apache/incubator-devlake/helpers/pluginhelper"
+	"github.com/apache/incubator-devlake/plugins/customize/models"
+)
+
+// Service wraps database operations
+type Service struct {
+	dal         dal.Dal
+	nameChecker *regexp.Regexp
+}
+
+func NewService(dal dal.Dal) *Service {
+	return &Service{dal: dal, nameChecker: regexp.MustCompile(`^x_\w+`)}
+}
+
+// GetFields returns all the fields of the table
+func (s *Service) GetFields(table string) ([]models.CustomizedField, errors.Error) {
+	// the customized fields created before v0.16.0 were not recorded in the table `_tool_customized_field`, we should take care of them
+	columns, err := s.dal.GetColumns(&models.Table{Name: table}, func(columnMeta dal.ColumnMeta) bool {
+		return true
+	})
+	if err != nil {
+		return nil, errors.Default.Wrap(err, "GetColumns error")
+	}
+	ff, err := s.getCustomizedFields(table)
+	if err != nil {
+		return nil, err
+	}
+	fieldMap := make(map[string]models.CustomizedField)
+	for _, f := range ff {
+		fieldMap[f.ColumnName] = f
+	}
+	var result []models.CustomizedField
+	for _, col := range columns {
+		// original fields
+		if !strings.HasPrefix(col.Name(), "x_") {
+			dataType, _ := col.ColumnType()
+			result = append(result, models.CustomizedField{
+				TbName:     table,
+				ColumnName: col.Name(),
+				DataType:   dal.ColumnType(dataType),
+			})
+			// customized fields
+		} else {
+			if field, ok := fieldMap[col.Name()]; ok {
+				result = append(result, field)
+			} else {
+				result = append(result, models.CustomizedField{
+					ColumnName: col.Name(),
+					DataType:   dal.Varchar,
+				})
+			}
+		}
+	}
+	return result, nil
+}
+func (s *Service) checkField(table, field string) (bool, errors.Error) {
+	if table == "" {
+		return false, errors.Default.New("empty table name")
+	}
+	if !strings.HasPrefix(field, "x_") {
+		return false, errors.Default.New("column name should start with `x_`")
+	}
+	if !s.nameChecker.MatchString(field) {
+		return false, errors.Default.New("invalid column name")
+	}
+	fields, err := s.GetFields(table)
+	if err != nil {
+		return false, err
+	}
+	for _, fld := range fields {
+		if fld.ColumnName == field {
+			return true, nil
+		}
+	}
+	return false, nil
+}
+
+// CreateField creates a new column for the table cf.TbName and creates a new record in the table `_tool_customized_fields`
+func (s *Service) CreateField(cf *models.CustomizedField) errors.Error {
+	exists, err := s.checkField(cf.TbName, cf.ColumnName)
+	if err != nil {
+		return err
+	}
+	if exists {
+		return errors.BadInput.New(fmt.Sprintf("the column %s already exists", cf.ColumnName))
+	}
+	err = s.dal.Create(cf)
+	if err != nil {
+		return errors.Default.Wrap(err, "create customizedField")
+	}
+	err = s.dal.AddColumn(cf.TbName, cf.ColumnName, cf.DataType)
+	if err != nil {
+		return errors.Default.Wrap(err, "AddColumn error")
+	}
+	return nil
+}
+
+// DeleteField deletes the `field` form the `table`
+func (s *Service) DeleteField(table, field string) errors.Error {
+	exists, err := s.checkField(table, field)
+	if err != nil {
+		return err
+	}
+	if !exists {
+		return nil
+	}
+	err = s.dal.DropColumns(table, field)
+	if err != nil {
+		return errors.Default.Wrap(err, "DropColumn error")
+	}
+	return s.dal.Delete(&models.CustomizedField{}, dal.Where("tb_name = ? AND column_name = ?", table, field))
+}
+
+func (s *Service) getCustomizedFields(table string) ([]models.CustomizedField, errors.Error) {
+	var result []models.CustomizedField
+	err := s.dal.All(&result, dal.Where("tb_name = ?", table))
+	return result, err
+}
+
+func (s *Service) ImportIssue(boardId string, file io.ReadCloser) errors.Error {
+	err := s.dal.Delete(&ticket.Issue{}, dal.Where("_raw_data_params = ?", boardId))
+	if err != nil {
+		return err
+	}
+	err = s.dal.Delete(&ticket.BoardIssue{}, dal.Where("board_id = ?", boardId))
+	if err != nil {
+		return err
+	}
+	return s.importCSV(file, boardId, s.issueHandlerFactory(boardId))
+}
+
+func (s *Service) SaveBoard(boardId, boardName string) errors.Error {
+	return s.dal.CreateOrUpdate(&ticket.Board{
+		DomainEntity: domainlayer.DomainEntity{
+			Id: boardId,
+		},
+		Name: boardName,
+	})
+}
+
+func (s *Service) ImportIssueCommit(rawDataParams string, file io.ReadCloser) errors.Error {
+	err := s.dal.Delete(&crossdomain.IssueCommit{}, dal.Where("_raw_data_params = ?", rawDataParams))
+	if err != nil {
+		return err
+	}
+	return s.importCSV(file, rawDataParams, s.issueCommitHandler)
+}
+
+func (s *Service) importCSV(file io.ReadCloser, rawDataParams string, recordHandler func(map[string]interface{}) errors.Error) errors.Error {
+	iterator, err := pluginhelper.NewCsvFileIteratorFromFile(file)
+	if err != nil {
+		return err
+	}
+	var hasNext bool
+	for {
+		if hasNext, err = iterator.HasNextWithError(); !hasNext {
+			return err
+		} else {
+			record := iterator.Fetch()
+			record["_raw_data_params"] = rawDataParams
+			for k, v := range record {
+				if v.(string) == "NULL" {
+					record[k] = nil
+				}
+			}
+			err = recordHandler(record)
+			if err != nil {
+				return err
+			}
+		}
+	}
+}
+
+func (s *Service) issueHandlerFactory(boardId string) func(record map[string]interface{}) errors.Error {
+	return func(record map[string]interface{}) errors.Error {
+		var err errors.Error
+		var id string
+		if record["id"] == nil {
+			return errors.Default.New("record without id")
+		}
+		id, _ = record["id"].(string)
+		if id == "" {
+			return errors.Default.New("empty id")
+		}
+		if record["labels"] != nil {
+			labels, ok := record["labels"].(string)
+			if !ok {
+				return errors.Default.New("labels is not string")
+			}
+			var issueLabels []*ticket.IssueLabel
+			appearedLabels := make(map[string]struct{}) // record the labels that have appeared
+			for _, label := range strings.Split(labels, ",") {
+				label = strings.TrimSpace(label)
+				if label == "" {
+					continue
+				}
+				if _, appeared := appearedLabels[label]; !appeared {
+					issueLabel := &ticket.IssueLabel{
+						IssueId:   id,
+						LabelName: label,
+						NoPKModel: common.NoPKModel{
+							RawDataOrigin: common.RawDataOrigin{
+								RawDataParams: boardId,
+							},
+						},
+					}
+					issueLabels = append(issueLabels, issueLabel)
+					appearedLabels[label] = struct{}{}
+				}
+			}
+			if len(issueLabels) > 0 {
+				err = s.dal.CreateOrUpdate(issueLabels)
+				if err != nil {
+					return err
+				}
+			}
+		}
+		delete(record, "labels")
+		err = s.dal.CreateWithMap(&ticket.Issue{}, record)
+		if err != nil {
+			return err
+		}
+		return s.dal.CreateOrUpdate(&ticket.BoardIssue{
+			BoardId: boardId,
+			IssueId: id,
+		})
+	}
+}
+
+func (s *Service) issueCommitHandler(record map[string]interface{}) errors.Error {
+	return s.dal.CreateWithMap(&crossdomain.IssueCommit{}, record)
+}
diff --git a/backend/helpers/pluginhelper/csv_file_test.go b/backend/plugins/customize/service/service_test.go
similarity index 54%
copy from backend/helpers/pluginhelper/csv_file_test.go
copy to backend/plugins/customize/service/service_test.go
index 644c88b38..d3208cd02 100644
--- a/backend/helpers/pluginhelper/csv_file_test.go
+++ b/backend/plugins/customize/service/service_test.go
@@ -15,28 +15,52 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package pluginhelper
+package service
 
 import (
-	"fmt"
-	"github.com/magiconair/properties/assert"
+	"regexp"
 	"testing"
 )
 
-func TestExampleCsvFile(t *testing.T) {
-	tmpPath := t.TempDir()
-	filename := fmt.Sprintf(`%s/foobar.csv`, tmpPath)
-	println(filename)
-
-	writer := NewCsvFileWriter(filename, []string{"id", "name", "json", "created_at"})
-	writer.Write([]string{"123", "foobar", `{"url": "https://example.com"}`, "2022-05-05 09:56:43.438000000"})
-	writer.Close()
-
-	iter := NewCsvFileIterator(filename)
-	defer iter.Close()
-	for iter.HasNext() {
-		row := iter.Fetch()
-		assert.Equal(t, row["name"], "foobar", "name not euqal")
-		assert.Equal(t, row["json"], `{"url": "https://example.com"}`, "json not euqal")
+func TestService_checkFieldName(t *testing.T) {
+	nameChecker := regexp.MustCompile(`^x_\w+`)
+	tests := []struct {
+		name string
+		args string
+		want bool
+	}{
+		{
+			"",
+			"x_abc23_e",
+			true,
+		},
+		{
+			"",
+			"_abc23_e",
+			false,
+		},
+		{
+			"",
+			"x__",
+			true,
+		},
+		{
+			"",
+			"x_ space",
+			false,
+		},
+		{
+			"",
+			"x_123",
+			true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got := nameChecker.MatchString(tt.args)
+			if got != tt.want {
+				t.Errorf("got = %v, want %v", got, tt.want)
+			}
+		})
 	}
 }