You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by ju...@apache.org on 2020/07/12 01:13:05 UTC
[incubator-apisix-dashboard] branch master updated: fix:
transaction in routes and upstreams (#306)
This is an automated email from the ASF dual-hosted git repository.
juzhiyuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-apisix-dashboard.git
The following commit(s) were added to refs/heads/master by this push:
new ba6d741 fix: transaction in routes and upstreams (#306)
ba6d741 is described below
commit ba6d741916d018a1daa068ab44374ac033be73a6
Author: kv <gx...@163.com>
AuthorDate: Sun Jul 12 09:12:58 2020 +0800
fix: transaction in routes and upstreams (#306)
---
api/route/route.go | 148 ++++++++++++++++++++++++++++++-------------
api/route/upstream.go | 162 ++++++++++++++++++++++++++++++++++--------------
api/service/route.go | 24 ++++---
api/service/upstream.go | 12 ++--
4 files changed, 243 insertions(+), 103 deletions(-)
diff --git a/api/route/route.go b/api/route/route.go
index 98ac001..29bae16 100644
--- a/api/route/route.go
+++ b/api/route/route.go
@@ -134,30 +134,45 @@ func listRoute(c *gin.Context) {
func deleteRoute(c *gin.Context) {
rid := c.Param("rid")
- // todo params check
- // delete from apisix
- request := &service.ApisixRouteRequest{}
- if _, err := request.Delete(rid); err != nil {
- e := errno.FromMessage(errno.ApisixRouteDeleteError, err.Error())
+ db := conf.DB()
+ tx := db.Begin()
+ defer func() {
+ if r := recover(); r != nil {
+ tx.Rollback()
+ }
+ }()
+ // delete from mysql
+ rd := &service.Route{}
+ rd.ID = uuid.FromStringOrNil(rid)
+ if err := conf.DB().Delete(rd).Error; err != nil {
+ tx.Rollback()
+ e := errno.FromMessage(errno.DBRouteDeleteError, err.Error())
logger.Error(e.Msg)
c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
return
} else {
- // delete from mysql
- rd := &service.Route{}
- rd.ID = uuid.FromStringOrNil(rid)
- if err := conf.DB().Delete(rd).Error; err != nil {
- e := errno.FromMessage(errno.DBRouteDeleteError, err.Error())
- logger.Error(e.Msg)
- c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
- return
+ request := &service.ApisixRouteRequest{}
+ if _, err := request.Delete(rid); err != nil {
+ tx.Rollback()
+ if httpError, ok := err.(*errno.HttpError); ok {
+ c.AbortWithStatusJSON(httpError.Code, httpError.Msg)
+ return
+ } else {
+ e := errno.FromMessage(errno.ApisixRouteDeleteError, err.Error())
+ logger.Error(e.Msg)
+ c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
+ return
+ }
}
}
+ if err := tx.Commit().Error; err != nil {
+ e := errno.FromMessage(errno.ApisixRouteDeleteError, err.Error())
+ logger.Error(e.Msg)
+ }
c.Data(http.StatusOK, service.ContentType, errno.Success())
}
func updateRoute(c *gin.Context) {
rid := c.Param("rid")
- // todo params check
param, exist := c.Get("requestBody")
if !exist || len(param.([]byte)) < 1 {
e := errno.FromMessage(errno.RouteRequestError, "route create with no post data")
@@ -173,27 +188,52 @@ func updateRoute(c *gin.Context) {
return
}
logger.Info(routeRequest.Plugins)
-
+ db := conf.DB()
arr := service.ToApisixRequest(routeRequest)
- logger.Info(arr)
- if resp, err := arr.Update(rid); err != nil {
- e := errno.FromMessage(errno.ApisixRouteUpdateError, err.Error())
- logger.Error(e.Msg)
- c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
+ var resp *service.ApisixRouteResponse
+ if rd, err := service.ToRoute(routeRequest, arr, uuid.FromStringOrNil(rid), nil); err != nil {
+ c.AbortWithStatusJSON(http.StatusInternalServerError, err.Response())
return
} else {
- // update mysql
- if rd, err := service.ToRoute(routeRequest, arr, uuid.FromStringOrNil(rid), resp); err != nil {
- c.AbortWithStatusJSON(http.StatusInternalServerError, err.Response())
+ tx := db.Begin()
+ defer func() {
+ if r := recover(); r != nil {
+ tx.Rollback()
+ }
+ }()
+ logger.Info(rd)
+ if err := tx.Model(&service.Route{}).Update(rd).Error; err != nil {
+ // rollback
+ tx.Rollback()
+ e := errno.FromMessage(errno.DBRouteCreateError, err.Error())
+ logger.Error(e.Msg)
+ c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
return
} else {
- if err := conf.DB().Model(&service.Route{}).Update(rd).Error; err != nil {
+ if resp, err = arr.Update(rid); err != nil {
+ tx.Rollback()
+ if httpError, ok := err.(*errno.HttpError); ok {
+ c.AbortWithStatusJSON(httpError.Code, httpError.Msg)
+ return
+ } else {
+ e := errno.FromMessage(errno.ApisixRouteCreateError, err.Error())
+ logger.Error(e.Msg)
+ c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
+ return
+ }
+ }
+ }
+ if err := tx.Commit().Error; err == nil {
+ // update content_admin_api
+ if rd, err := service.ToRoute(routeRequest, arr, uuid.FromStringOrNil(rid), resp); err != nil {
e := errno.FromMessage(errno.DBRouteUpdateError, err.Error())
logger.Error(e.Msg)
- c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
- return
+ } else {
+ if err := conf.DB().Model(&service.Route{}).Update(rd).Error; err != nil {
+ e := errno.FromMessage(errno.DBRouteUpdateError, err.Error())
+ logger.Error(e.Msg)
+ }
}
- logger.Info(rd)
}
}
c.Data(http.StatusOK, service.ContentType, errno.Success())
@@ -235,7 +275,6 @@ func findRoute(c *gin.Context) {
func createRoute(c *gin.Context) {
u4 := uuid.NewV4()
rid := u4.String()
- // todo params check
param, exist := c.Get("requestBody")
if !exist || len(param.([]byte)) < 1 {
e := errno.FromMessage(errno.RouteRequestError, "route create with no post data")
@@ -251,26 +290,51 @@ func createRoute(c *gin.Context) {
return
}
logger.Info(routeRequest.Plugins)
-
+ db := conf.DB()
arr := service.ToApisixRequest(routeRequest)
- logger.Info(arr)
- if resp, err := arr.Create(rid); err != nil {
- e := errno.FromMessage(errno.ApisixRouteCreateError, err.Error())
- logger.Error(e.Msg)
- c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
+ var resp *service.ApisixRouteResponse
+ if rd, err := service.ToRoute(routeRequest, arr, u4, nil); err != nil {
+ c.AbortWithStatusJSON(http.StatusInternalServerError, err.Response())
return
} else {
- // update mysql
- if rd, err := service.ToRoute(routeRequest, arr, u4, resp); err != nil {
- c.AbortWithStatusJSON(http.StatusInternalServerError, err.Response())
+ tx := db.Begin()
+ defer func() {
+ if r := recover(); r != nil {
+ tx.Rollback()
+ }
+ }()
+ logger.Info(rd)
+ if err := tx.Create(rd).Error; err != nil {
+ // rollback
+ tx.Rollback()
+ e := errno.FromMessage(errno.DBRouteCreateError, err.Error())
+ logger.Error(e.Msg)
+ c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
return
} else {
- logger.Info(rd)
- if err := conf.DB().Create(rd).Error; err != nil {
- e := errno.FromMessage(errno.DBRouteCreateError, err.Error())
+ if resp, err = arr.Create(rid); err != nil {
+ tx.Rollback()
+ if httpError, ok := err.(*errno.HttpError); ok {
+ c.AbortWithStatusJSON(httpError.Code, httpError.Msg)
+ return
+ } else {
+ e := errno.FromMessage(errno.ApisixRouteCreateError, err.Error())
+ logger.Error(e.Msg)
+ c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
+ return
+ }
+ }
+ }
+ if err := tx.Commit().Error; err == nil {
+ // update content_admin_api
+ if rd, err := service.ToRoute(routeRequest, arr, u4, resp); err != nil {
+ e := errno.FromMessage(errno.DBRouteUpdateError, err.Error())
logger.Error(e.Msg)
- c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
- return
+ } else {
+ if err := conf.DB().Model(&service.Route{}).Update(rd).Error; err != nil {
+ e := errno.FromMessage(errno.DBRouteUpdateError, err.Error())
+ logger.Error(e.Msg)
+ }
}
}
}
diff --git a/api/route/upstream.go b/api/route/upstream.go
index 17dae49..7433aa7 100644
--- a/api/route/upstream.go
+++ b/api/route/upstream.go
@@ -101,31 +101,57 @@ func createUpstream(c *gin.Context) {
}
ur.Id = uid
fmt.Println(ur)
- if aur, err := ur.Parse2Apisix(); err != nil {
- e := errno.FromMessage(errno.UpstreamTransError, err.Error())
- logger.Error(e.Msg)
- c.AbortWithStatusJSON(http.StatusBadRequest, e.Response())
+ // mysql
+ if ud, err := service.Trans2UpstreamDao(nil, ur); err != nil {
+ c.AbortWithStatusJSON(http.StatusInternalServerError, err.Response())
return
} else {
- // apisix
- if resp, err := aur.Create(); err != nil {
- e := errno.FromMessage(errno.ApisixUpstreamCreateError, err.Error())
+ // transaction
+ db := conf.DB()
+ tx := db.Begin()
+ defer func() {
+ if r := recover(); r != nil {
+ tx.Rollback()
+ }
+ }()
+ if err := tx.Create(ud).Error; err != nil {
+ tx.Rollback()
+ e := errno.FromMessage(errno.DBUpstreamError, err.Error())
logger.Error(e.Msg)
c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
return
} else {
- // mysql
- fmt.Println(resp.UNode.UValue.Id)
- fmt.Println(resp.UNode.UValue.Upstream.Nodes)
- if ud, err := service.Trans2UpstreamDao(resp, ur); err != nil {
- c.AbortWithStatusJSON(http.StatusInternalServerError, err.Response())
+ // apisix
+ if aur, err := ur.Parse2Apisix(); err != nil {
+ tx.Rollback()
+ e := errno.FromMessage(errno.UpstreamTransError, err.Error())
+ logger.Error(e.Msg)
+ c.AbortWithStatusJSON(http.StatusBadRequest, e.Response())
return
} else {
- if err := conf.DB().Create(ud).Error; err != nil {
- e := errno.FromMessage(errno.DBUpstreamError, err.Error())
- logger.Error(e.Msg)
- c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
- return
+ if resp, err := aur.Create(); err != nil {
+ tx.Rollback()
+ if httpError, ok := err.(*errno.HttpError); ok {
+ c.AbortWithStatusJSON(httpError.Code, httpError.Msg)
+ return
+ } else {
+ e := errno.FromMessage(errno.ApisixUpstreamCreateError, err.Error())
+ logger.Error(e.Msg)
+ c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
+ return
+ }
+ } else {
+ if err := tx.Commit().Error; err == nil {
+ if ud, err := service.Trans2UpstreamDao(resp, ur); err != nil {
+ e := errno.FromMessage(errno.DBUpstreamError, err.Error())
+ logger.Error(e.Msg)
+ } else {
+ if err := conf.DB().Model(&service.UpstreamDao{}).Update(ud).Error; err != nil {
+ e := errno.FromMessage(errno.DBUpstreamError, err.Error())
+ logger.Error(e.Msg)
+ }
+ }
+ }
}
}
}
@@ -184,7 +210,6 @@ func listUpstream(c *gin.Context) {
}
func updateUpstream(c *gin.Context) {
uid := c.Param("uid")
- // todo 参数校验
param, exist := c.Get("requestBody")
if !exist || len(param.([]byte)) < 1 {
e := errno.FromMessage(errno.RouteRequestError, "upstream update with no post data")
@@ -201,30 +226,57 @@ func updateUpstream(c *gin.Context) {
return
}
ur.Id = uid
- fmt.Println(ur)
- if aur, err := ur.Parse2Apisix(); err != nil {
- e := errno.FromMessage(errno.UpstreamTransError, err.Error())
- logger.Error(e.Msg)
- c.AbortWithStatusJSON(http.StatusBadRequest, e.Response())
+ // mysql
+ if ud, err := service.Trans2UpstreamDao(nil, ur); err != nil {
+ c.AbortWithStatusJSON(http.StatusInternalServerError, err.Response())
return
} else {
- // apisix
- if resp, err := aur.Update(); err != nil {
- e := errno.FromMessage(errno.ApisixUpstreamUpdateError, err.Error())
+ // transaction
+ db := conf.DB()
+ tx := db.Begin()
+ defer func() {
+ if r := recover(); r != nil {
+ tx.Rollback()
+ }
+ }()
+ if err := tx.Model(&service.UpstreamDao{}).Update(ud).Error; err != nil {
+ tx.Rollback()
+ e := errno.FromMessage(errno.DBUpstreamError, err.Error())
logger.Error(e.Msg)
c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
return
} else {
- // mysql
- if ud, err := service.Trans2UpstreamDao(resp, ur); err != nil {
- c.AbortWithStatusJSON(http.StatusInternalServerError, err.Response())
+ // apisix
+ if aur, err := ur.Parse2Apisix(); err != nil {
+ tx.Rollback()
+ e := errno.FromMessage(errno.UpstreamTransError, err.Error())
+ logger.Error(e.Msg)
+ c.AbortWithStatusJSON(http.StatusBadRequest, e.Response())
return
} else {
- if err := conf.DB().Model(&service.UpstreamDao{}).Update(ud).Error; err != nil {
- e := errno.FromMessage(errno.DBUpstreamError, err.Error())
- logger.Error(e.Msg)
- c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
- return
+ if resp, err := aur.Update(); err != nil {
+ tx.Rollback()
+ if httpError, ok := err.(*errno.HttpError); ok {
+ c.AbortWithStatusJSON(httpError.Code, httpError.Msg)
+ return
+ } else {
+ e := errno.FromMessage(errno.ApisixUpstreamUpdateError, err.Error())
+ logger.Error(e.Msg)
+ c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
+ return
+ }
+ } else {
+ if err := tx.Commit().Error; err == nil {
+ if ud, err := service.Trans2UpstreamDao(resp, ur); err != nil {
+ e := errno.FromMessage(errno.DBUpstreamError, err.Error())
+ logger.Error(e.Msg)
+ } else {
+ if err := conf.DB().Model(&service.UpstreamDao{}).Update(ud).Error; err != nil {
+ e := errno.FromMessage(errno.DBUpstreamError, err.Error())
+ logger.Error(e.Msg)
+ }
+ }
+ }
}
}
}
@@ -242,23 +294,41 @@ func deleteUpstream(c *gin.Context) {
c.AbortWithStatusJSON(http.StatusBadRequest, e.Response())
return
}
- // delete from apisix
- request := &service.ApisixUpstreamRequest{Id: uid}
- if _, err := request.Delete(); err != nil {
- e := errno.FromMessage(errno.ApisixUpstreamDeleteError, err.Error())
+ db := conf.DB()
+ tx := db.Begin()
+ defer func() {
+ if r := recover(); r != nil {
+ tx.Rollback()
+ }
+ }()
+ // delete from mysql
+ rd := &service.UpstreamDao{}
+ rd.ID = uuid.FromStringOrNil(uid)
+ if err := tx.Delete(rd).Error; err != nil {
+ tx.Rollback()
+ e := errno.FromMessage(errno.DBUpstreamDeleteError, err.Error())
logger.Error(e.Msg)
c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
return
} else {
- // delete from mysql
- rd := &service.UpstreamDao{}
- rd.ID = uuid.FromStringOrNil(uid)
- if err := conf.DB().Delete(rd).Error; err != nil {
- e := errno.FromMessage(errno.DBUpstreamDeleteError, err.Error())
- logger.Error(e.Msg)
- c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
- return
+ // delete from apisix
+ request := &service.ApisixUpstreamRequest{Id: uid}
+ if _, err := request.Delete(); err != nil {
+ tx.Rollback()
+ if httpError, ok := err.(*errno.HttpError); ok {
+ c.AbortWithStatusJSON(httpError.Code, httpError.Msg)
+ return
+ } else {
+ e := errno.FromMessage(errno.ApisixUpstreamDeleteError, err.Error())
+ logger.Error(e.Msg)
+ c.AbortWithStatusJSON(http.StatusInternalServerError, e.Response())
+ return
+ }
}
}
+ if err := tx.Commit().Error; err != nil {
+ e := errno.FromMessage(errno.ApisixUpstreamDeleteError, err.Error())
+ logger.Error(e.Msg)
+ }
c.Data(http.StatusOK, service.ContentType, errno.Success())
}
diff --git a/api/service/route.go b/api/service/route.go
index ea7e535..c99d0b7 100644
--- a/api/service/route.go
+++ b/api/service/route.go
@@ -243,9 +243,10 @@ func (r *ApisixRouteResponse) Parse() (*RouteRequest, error) {
//Plugins
requestPlugins := utils.CopyMap(o.Plugins)
delete(requestPlugins, REDIRECT)
+ delete(requestPlugins, PROXY_REWRIETE)
// check if upstream is not exist
- if o.Upstream == nil {
+ if o.Upstream == nil && o.UpstreamId == "" {
upstreamProtocol = ""
upstreamHeader = nil
upstreamPath = nil
@@ -262,6 +263,7 @@ func (r *ApisixRouteResponse) Parse() (*RouteRequest, error) {
Hosts: o.Hosts,
Redirect: redirect,
Upstream: o.Upstream,
+ UpstreamId: o.UpstreamId,
UpstreamProtocol: upstreamProtocol,
UpstreamPath: upstreamPath,
UpstreamHeader: upstreamHeader,
@@ -523,14 +525,16 @@ func ToRoute(routeRequest *RouteRequest,
}
rd.ID = u4
// content_admin_api
- if respStr, err := json.Marshal(resp); err != nil {
- e := errno.FromMessage(errno.DBRouteCreateError, err.Error())
- return nil, e
- } else {
- rd.ContentAdminApi = string(respStr)
+ if resp != nil {
+ if respStr, err := json.Marshal(resp); err != nil {
+ e := errno.FromMessage(errno.DBRouteCreateError, err.Error())
+ return nil, e
+ } else {
+ rd.ContentAdminApi = string(respStr)
+ }
}
// hosts
- hosts := resp.Node.Value.Hosts
+ hosts := routeRequest.Hosts
if hb, err := json.Marshal(hosts); err != nil {
e := errno.FromMessage(errno.DBRouteCreateError, err.Error())
logger.Warn(e.Msg)
@@ -538,7 +542,7 @@ func ToRoute(routeRequest *RouteRequest,
rd.Hosts = string(hb)
}
// uris
- uris := resp.Node.Value.Uris
+ uris := routeRequest.Uris
if ub, err := json.Marshal(uris); err != nil {
e := errno.FromMessage(errno.DBRouteCreateError, err.Error())
logger.Warn(e.Msg)
@@ -546,8 +550,8 @@ func ToRoute(routeRequest *RouteRequest,
rd.Uris = string(ub)
}
// upstreamNodes
- if resp.Node.Value.Upstream != nil {
- nodes := resp.Node.Value.Upstream.Nodes
+ if routeRequest.Upstream != nil {
+ nodes := routeRequest.Upstream.Nodes
ips := make([]string, 0)
for k, _ := range nodes {
ips = append(ips, k)
diff --git a/api/service/upstream.go b/api/service/upstream.go
index 4274ad9..e63edee 100644
--- a/api/service/upstream.go
+++ b/api/service/upstream.go
@@ -167,11 +167,13 @@ func Trans2UpstreamDao(resp *ApisixUpstreamResponse, r *UpstreamRequest) (*Upstr
u.Content = string(content)
}
// content_admin_api
- if respStr, err := json.Marshal(resp); err != nil {
- e := errno.FromMessage(errno.DBUpstreamError, err.Error())
- return nil, e
- } else {
- u.ContentAdminApi = string(respStr)
+ if resp != nil {
+ if respStr, err := json.Marshal(resp); err != nil {
+ e := errno.FromMessage(errno.DBUpstreamError, err.Error())
+ return nil, e
+ } else {
+ u.ContentAdminApi = string(respStr)
+ }
}
return u, nil
}