You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by kv...@apache.org on 2020/12/17 10:06:38 UTC
[apisix-ingress-controller] branch master updated: fix: add item to
workqueue with delay when syncFailed (#103)
This is an automated email from the ASF dual-hosted git repository.
kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new b29dfc0 fix: add item to workqueue with delay when syncFailed (#103)
b29dfc0 is described below
commit b29dfc0a794d4747e2c4ab367eafd8a108eb2749
Author: kv <gx...@163.com>
AuthorDate: Thu Dec 17 18:06:31 2020 +0800
fix: add item to workqueue with delay when syncFailed (#103)
* fix: add item to workqueue with delay when syncFailed
* fix: remove the dirty item from queueu when retry
* add retry when tls sync failed
* fix: add logs when retry
* checking for transform error
* fix: the logic about the resouces has been deleted when UPDATE
* add warning logs for dirty data
---
pkg/ingress/controller/apisix_route.go | 49 +++++++++++------
pkg/ingress/controller/apisix_service.go | 87 +++++++++++++++++++++----------
pkg/ingress/controller/apisix_tls.go | 25 +++++++--
pkg/ingress/controller/apisix_upstream.go | 3 +-
4 files changed, 116 insertions(+), 48 deletions(-)
diff --git a/pkg/ingress/controller/apisix_route.go b/pkg/ingress/controller/apisix_route.go
index eed0825..66044c9 100644
--- a/pkg/ingress/controller/apisix_route.go
+++ b/pkg/ingress/controller/apisix_route.go
@@ -60,7 +60,7 @@ func BuildApisixRouteController(
apisixRouteClientset: api6RouteClientset,
apisixRouteList: api6RouteInformer.Lister(),
apisixRouteSynced: api6RouteInformer.Informer().HasSynced,
- workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ApisixRoutes"),
+ workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixRoutes"),
}
api6RouteInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
@@ -85,7 +85,7 @@ func (c *ApisixRouteController) addFunc(obj interface{}) {
func (c *ApisixRouteController) updateFunc(oldObj, newObj interface{}) {
oldRoute := oldObj.(*api6V1.ApisixRoute)
newRoute := newObj.(*api6V1.ApisixRoute)
- if oldRoute.ResourceVersion == newRoute.ResourceVersion {
+ if oldRoute.ResourceVersion >= newRoute.ResourceVersion {
return
}
//c.addFunc(newObj)
@@ -100,6 +100,17 @@ func (c *ApisixRouteController) updateFunc(oldObj, newObj interface{}) {
}
func (c *ApisixRouteController) deleteFunc(obj interface{}) {
+ oldRoute, ok := obj.(*api6V1.ApisixRoute)
+ if !ok {
+ oldState, ok := obj.(cache.DeletedFinalStateUnknown)
+ if !ok {
+ return
+ }
+ oldRoute, ok = oldState.Obj.(*api6V1.ApisixRoute)
+ if !ok {
+ return
+ }
+ }
var key string
var err error
key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
@@ -107,7 +118,7 @@ func (c *ApisixRouteController) deleteFunc(obj interface{}) {
runtime.HandleError(err)
return
}
- rqo := &RouteQueueObj{Key: key, OldObj: nil, Ope: DELETE}
+ rqo := &RouteQueueObj{Key: key, OldObj: oldRoute, Ope: DELETE}
c.workqueue.AddRateLimited(rqo)
}
@@ -135,16 +146,16 @@ func (c *ApisixRouteController) processNextWorkItem() bool {
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
- var key string
var ok bool
var rqo *RouteQueueObj
if rqo, ok = obj.(*RouteQueueObj); !ok {
c.workqueue.Forget(obj)
return fmt.Errorf("expected RouteQueueObj in workqueue but got %#v", obj)
}
- // 在syncHandler中处理业务
if err := c.syncHandler(rqo); err != nil {
- return fmt.Errorf("error syncing '%s': %s", key, err.Error())
+ c.workqueue.AddRateLimited(obj)
+ log.Errorf("sync route %s failed", rqo.Key)
+ return fmt.Errorf("error syncing '%s': %s", rqo.Key, err.Error())
}
c.workqueue.Forget(obj)
@@ -211,18 +222,16 @@ func (c *ApisixRouteController) sync(rqo *RouteQueueObj) error {
log.Errorf("invalid resource key: %s", key)
return fmt.Errorf("invalid resource key: %s", key)
}
-
- apisixIngressRoute, err := c.apisixRouteList.ApisixRoutes(namespace).Get(name)
- if err != nil {
- if errors.IsNotFound(err) {
- log.Infof("apisixRoute %s is removed", key)
- return nil
- }
- runtime.HandleError(fmt.Errorf("failed to list apisixRoute %s/%s", key, err.Error()))
- return err
- }
switch {
case rqo.Ope == UPDATE:
+ apisixIngressRoute, err := c.apisixRouteList.ApisixRoutes(namespace).Get(name)
+ if err != nil {
+ if errors.IsNotFound(err) {
+ log.Errorf("apisixRoute %s is removed", key)
+ return nil
+ }
+ return err // if error occurred, return
+ }
oldApisixRoute := apisix.ApisixRoute(*rqo.OldObj)
oldRoutes, _, _, _ := oldApisixRoute.Convert()
@@ -232,10 +241,16 @@ func (c *ApisixRouteController) sync(rqo *RouteQueueObj) error {
rc := &state.RouteCompare{OldRoutes: oldRoutes, NewRoutes: newRoutes}
return rc.Sync()
case rqo.Ope == DELETE:
- apisixRoute := apisix.ApisixRoute(*apisixIngressRoute)
+ apisixIngressRoute, _ := c.apisixRouteList.ApisixRoutes(namespace).Get(name)
+ if apisixIngressRoute != nil && apisixIngressRoute.ResourceVersion > rqo.OldObj.ResourceVersion {
+ log.Warnf("Route %s has been covered when retry", rqo.Key)
+ return nil
+ }
+ apisixRoute := apisix.ApisixRoute(*rqo.OldObj)
routes, _, _, _ := apisixRoute.Convert()
rc := &state.RouteCompare{OldRoutes: routes, NewRoutes: nil}
return rc.Sync()
+
default:
return fmt.Errorf("not expected in (ApisixRouteController) sync")
}
diff --git a/pkg/ingress/controller/apisix_service.go b/pkg/ingress/controller/apisix_service.go
index e446736..a6b2005 100644
--- a/pkg/ingress/controller/apisix_service.go
+++ b/pkg/ingress/controller/apisix_service.go
@@ -55,7 +55,7 @@ func BuildApisixServiceController(
apisixClientset: apisixServiceClientset,
apisixServiceList: apisixServiceInformer.Lister(),
apisixServiceSynced: apisixServiceInformer.Informer().HasSynced,
- workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ApisixServices"),
+ workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixServices"),
}
apisixServiceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
@@ -66,6 +66,12 @@ func BuildApisixServiceController(
return controller
}
+type ServiceQueueObj struct {
+ Key string `json:"key"`
+ OldObj *apisixV1.ApisixService `json:"old_obj"`
+ Ope string `json:"ope"` // add / update / delete
+}
+
func (c *ApisixServiceController) Run(stop <-chan struct{}) error {
// 同步缓存
if ok := cache.WaitForCacheSync(stop); !ok {
@@ -89,16 +95,16 @@ func (c *ApisixServiceController) processNextWorkItem() bool {
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
- var key string
+ var sqo *ServiceQueueObj
var ok bool
-
- if key, ok = obj.(string); !ok {
+ if sqo, ok = obj.(*ServiceQueueObj); !ok {
c.workqueue.Forget(obj)
- return fmt.Errorf("expected string in workqueue but got %#v", obj)
+ return fmt.Errorf("expected ServiceQueueObj in workqueue but got %#v", obj)
}
- // 在syncHandler中处理业务
- if err := c.syncHandler(key); err != nil {
- return fmt.Errorf("error syncing '%s': %s", key, err.Error())
+ if err := c.syncHandler(sqo); err != nil {
+ c.workqueue.AddRateLimited(obj)
+ log.Errorf("sync service %s failed", sqo.Key)
+ return fmt.Errorf("error syncing '%s': %s", sqo.Key, err.Error())
}
c.workqueue.Forget(obj)
@@ -110,24 +116,30 @@ func (c *ApisixServiceController) processNextWorkItem() bool {
return true
}
-func (c *ApisixServiceController) syncHandler(key string) error {
- namespace, name, err := cache.SplitMetaNamespaceKey(key)
+func (c *ApisixServiceController) syncHandler(sqo *ServiceQueueObj) error {
+ namespace, name, err := cache.SplitMetaNamespaceKey(sqo.Key)
if err != nil {
- log.Errorf("invalid resource key: %s", key)
- return fmt.Errorf("invalid resource key: %s", key)
+ log.Errorf("invalid resource key: %s", sqo.Key)
+ return fmt.Errorf("invalid resource key: %s", sqo.Key)
}
-
- apisixServiceYaml, err := c.apisixServiceList.ApisixServices(namespace).Get(name)
- if err != nil {
- if errors.IsNotFound(err) {
- log.Infof("apisixUpstream %s is removed", key)
+ apisixServiceYaml := sqo.OldObj
+ if sqo.Ope == DELETE {
+ apisixIngressService, _ := c.apisixServiceList.ApisixServices(namespace).Get(name)
+ if apisixIngressService != nil && apisixIngressService.ResourceVersion > sqo.OldObj.ResourceVersion {
+ log.Warnf("Service %s has been covered when retry", sqo.Key)
return nil
}
- runtime.HandleError(fmt.Errorf("failed to list apisixUpstream %s/%s", key, err.Error()))
- return err
+ } else {
+ apisixServiceYaml, err = c.apisixServiceList.ApisixServices(namespace).Get(name)
+ if err != nil {
+ if errors.IsNotFound(err) {
+ log.Infof("apisixUpstream %s is removed", sqo.Key)
+ return nil
+ }
+ runtime.HandleError(fmt.Errorf("failed to list apisixUpstream %s/%s", sqo.Key, err.Error()))
+ return err
+ }
}
- log.Info(namespace)
- log.Info(name)
apisixService := apisix.ApisixServiceCRD(*apisixServiceYaml)
services, upstreams, _ := apisixService.Convert()
comb := state.ApisixCombination{Routes: nil, Services: services, Upstreams: upstreams}
@@ -142,19 +154,39 @@ func (c *ApisixServiceController) addFunc(obj interface{}) {
runtime.HandleError(err)
return
}
- c.workqueue.AddRateLimited(key)
+ sqo := &RouteQueueObj{Key: key, OldObj: nil, Ope: ADD}
+ c.workqueue.AddRateLimited(sqo)
}
func (c *ApisixServiceController) updateFunc(oldObj, newObj interface{}) {
- oldRoute := oldObj.(*apisixV1.ApisixService)
- newRoute := newObj.(*apisixV1.ApisixService)
- if oldRoute.ResourceVersion == newRoute.ResourceVersion {
+ oldService := oldObj.(*apisixV1.ApisixService)
+ newService := newObj.(*apisixV1.ApisixService)
+ if oldService.ResourceVersion >= newService.ResourceVersion {
return
}
- c.addFunc(newObj)
+ var key string
+ var err error
+ if key, err = cache.MetaNamespaceKeyFunc(newObj); err != nil {
+ runtime.HandleError(err)
+ return
+ }
+ sqo := &ServiceQueueObj{Key: key, OldObj: oldService, Ope: UPDATE}
+ c.workqueue.AddRateLimited(sqo)
}
func (c *ApisixServiceController) deleteFunc(obj interface{}) {
+ oldService, ok := obj.(*apisixV1.ApisixService)
+ if !ok {
+ oldState, ok := obj.(cache.DeletedFinalStateUnknown)
+ if !ok {
+ return
+ }
+ oldService, ok = oldState.Obj.(*apisixV1.ApisixService)
+ if !ok {
+ return
+ }
+ }
+
var key string
var err error
key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
@@ -162,5 +194,6 @@ func (c *ApisixServiceController) deleteFunc(obj interface{}) {
runtime.HandleError(err)
return
}
- c.workqueue.AddRateLimited(key)
+ sqo := &ServiceQueueObj{Key: key, OldObj: oldService, Ope: DELETE}
+ c.workqueue.AddRateLimited(sqo)
}
diff --git a/pkg/ingress/controller/apisix_tls.go b/pkg/ingress/controller/apisix_tls.go
index fe2a144..17f0405 100644
--- a/pkg/ingress/controller/apisix_tls.go
+++ b/pkg/ingress/controller/apisix_tls.go
@@ -60,7 +60,7 @@ func BuildApisixTlsController(
apisixClientset: apisixTlsClientset,
apisixTlsList: apisixTlsInformer.Lister(),
apisixTlsSynced: apisixTlsInformer.Informer().HasSynced,
- workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ApisixTlses"),
+ workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixTlses"),
}
apisixTlsInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
@@ -102,6 +102,8 @@ func (c *ApisixTlsController) processNextWorkItem() bool {
return fmt.Errorf("expected TlsQueueObj in workqueue but got %#v", obj)
}
if err := c.syncHandler(tqo); err != nil {
+ c.workqueue.AddRateLimited(tqo)
+ log.Errorf("sync tls %s failed", tqo.Key)
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
}
@@ -121,7 +123,13 @@ func (c *ApisixTlsController) syncHandler(tqo *TlsQueueObj) error {
return fmt.Errorf("invalid resource key: %s", tqo.Key)
}
apisixTlsYaml := tqo.OldObj
- if tqo.Ope != state.Delete {
+ if tqo.Ope == state.Delete {
+ apisixIngressTls, _ := c.apisixTlsList.ApisixTlses(namespace).Get(name)
+ if apisixIngressTls != nil && apisixIngressTls.ResourceVersion > tqo.OldObj.ResourceVersion {
+ log.Warnf("TLS %s has been covered when retry", tqo.Key)
+ return nil
+ }
+ } else {
apisixTlsYaml, err = c.apisixTlsList.ApisixTlses(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
@@ -132,6 +140,7 @@ func (c *ApisixTlsController) syncHandler(tqo *TlsQueueObj) error {
return err
}
}
+
apisixTls := apisix.ApisixTlsCRD(*apisixTlsYaml)
sc := &apisix.SecretClient{}
if tls, err := apisixTls.Convert(sc); err != nil {
@@ -173,7 +182,17 @@ func (c *ApisixTlsController) updateFunc(oldObj, newObj interface{}) {
}
func (c *ApisixTlsController) deleteFunc(obj interface{}) {
- oldTls := obj.(cache.DeletedFinalStateUnknown).Obj.(*apisixV1.ApisixTls)
+ oldTls, ok := obj.(*apisixV1.ApisixTls)
+ if !ok {
+ oldState, ok := obj.(cache.DeletedFinalStateUnknown)
+ if !ok {
+ return
+ }
+ oldTls, ok = oldState.Obj.(*apisixV1.ApisixTls)
+ if !ok {
+ return
+ }
+ }
var key string
var err error
key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
diff --git a/pkg/ingress/controller/apisix_upstream.go b/pkg/ingress/controller/apisix_upstream.go
index e9436be..092f63e 100644
--- a/pkg/ingress/controller/apisix_upstream.go
+++ b/pkg/ingress/controller/apisix_upstream.go
@@ -56,7 +56,7 @@ func BuildApisixUpstreamController(
apisixClientset: apisixUpstreamClientset,
apisixUpstreamList: apisixUpstreamInformer.Lister(),
apisixUpstreamSynced: apisixUpstreamInformer.Informer().HasSynced,
- workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ApisixUpstreams"),
+ workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixUpstreams"),
}
apisixUpstreamInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
@@ -99,6 +99,7 @@ func (c *ApisixUpstreamController) processNextWorkItem() bool {
}
// 在syncHandler中处理业务
if err := c.syncHandler(key); err != nil {
+ c.workqueue.AddRateLimited(obj)
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
}