You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by kv...@apache.org on 2020/12/17 10:06:38 UTC

[apisix-ingress-controller] branch master updated: fix: add item to workqueue with delay when syncFailed (#103)

This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new b29dfc0  fix: add item to workqueue with delay when syncFailed (#103)
b29dfc0 is described below

commit b29dfc0a794d4747e2c4ab367eafd8a108eb2749
Author: kv <gx...@163.com>
AuthorDate: Thu Dec 17 18:06:31 2020 +0800

    fix: add item to workqueue with delay when syncFailed (#103)
    
    * fix: add item to workqueue with delay when syncFailed
    
    * fix: remove the dirty item from queueu when retry
    
    * add retry when tls sync failed
    
    * fix: add logs when retry
    
    * checking for transform error
    
    * fix: the logic about the resouces has been deleted when UPDATE
    
    * add warning logs for dirty data
---
 pkg/ingress/controller/apisix_route.go    | 49 +++++++++++------
 pkg/ingress/controller/apisix_service.go  | 87 +++++++++++++++++++++----------
 pkg/ingress/controller/apisix_tls.go      | 25 +++++++--
 pkg/ingress/controller/apisix_upstream.go |  3 +-
 4 files changed, 116 insertions(+), 48 deletions(-)

diff --git a/pkg/ingress/controller/apisix_route.go b/pkg/ingress/controller/apisix_route.go
index eed0825..66044c9 100644
--- a/pkg/ingress/controller/apisix_route.go
+++ b/pkg/ingress/controller/apisix_route.go
@@ -60,7 +60,7 @@ func BuildApisixRouteController(
 		apisixRouteClientset: api6RouteClientset,
 		apisixRouteList:      api6RouteInformer.Lister(),
 		apisixRouteSynced:    api6RouteInformer.Informer().HasSynced,
-		workqueue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ApisixRoutes"),
+		workqueue:            workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixRoutes"),
 	}
 	api6RouteInformer.Informer().AddEventHandler(
 		cache.ResourceEventHandlerFuncs{
@@ -85,7 +85,7 @@ func (c *ApisixRouteController) addFunc(obj interface{}) {
 func (c *ApisixRouteController) updateFunc(oldObj, newObj interface{}) {
 	oldRoute := oldObj.(*api6V1.ApisixRoute)
 	newRoute := newObj.(*api6V1.ApisixRoute)
-	if oldRoute.ResourceVersion == newRoute.ResourceVersion {
+	if oldRoute.ResourceVersion >= newRoute.ResourceVersion {
 		return
 	}
 	//c.addFunc(newObj)
@@ -100,6 +100,17 @@ func (c *ApisixRouteController) updateFunc(oldObj, newObj interface{}) {
 }
 
 func (c *ApisixRouteController) deleteFunc(obj interface{}) {
+	oldRoute, ok := obj.(*api6V1.ApisixRoute)
+	if !ok {
+		oldState, ok := obj.(cache.DeletedFinalStateUnknown)
+		if !ok {
+			return
+		}
+		oldRoute, ok = oldState.Obj.(*api6V1.ApisixRoute)
+		if !ok {
+			return
+		}
+	}
 	var key string
 	var err error
 	key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
@@ -107,7 +118,7 @@ func (c *ApisixRouteController) deleteFunc(obj interface{}) {
 		runtime.HandleError(err)
 		return
 	}
-	rqo := &RouteQueueObj{Key: key, OldObj: nil, Ope: DELETE}
+	rqo := &RouteQueueObj{Key: key, OldObj: oldRoute, Ope: DELETE}
 	c.workqueue.AddRateLimited(rqo)
 }
 
@@ -135,16 +146,16 @@ func (c *ApisixRouteController) processNextWorkItem() bool {
 	}
 	err := func(obj interface{}) error {
 		defer c.workqueue.Done(obj)
-		var key string
 		var ok bool
 		var rqo *RouteQueueObj
 		if rqo, ok = obj.(*RouteQueueObj); !ok {
 			c.workqueue.Forget(obj)
 			return fmt.Errorf("expected RouteQueueObj in workqueue but got %#v", obj)
 		}
-		// 在syncHandler中处理业务
 		if err := c.syncHandler(rqo); err != nil {
-			return fmt.Errorf("error syncing '%s': %s", key, err.Error())
+			c.workqueue.AddRateLimited(obj)
+			log.Errorf("sync route %s failed", rqo.Key)
+			return fmt.Errorf("error syncing '%s': %s", rqo.Key, err.Error())
 		}
 
 		c.workqueue.Forget(obj)
@@ -211,18 +222,16 @@ func (c *ApisixRouteController) sync(rqo *RouteQueueObj) error {
 		log.Errorf("invalid resource key: %s", key)
 		return fmt.Errorf("invalid resource key: %s", key)
 	}
-
-	apisixIngressRoute, err := c.apisixRouteList.ApisixRoutes(namespace).Get(name)
-	if err != nil {
-		if errors.IsNotFound(err) {
-			log.Infof("apisixRoute %s is removed", key)
-			return nil
-		}
-		runtime.HandleError(fmt.Errorf("failed to list apisixRoute %s/%s", key, err.Error()))
-		return err
-	}
 	switch {
 	case rqo.Ope == UPDATE:
+		apisixIngressRoute, err := c.apisixRouteList.ApisixRoutes(namespace).Get(name)
+		if err != nil {
+			if errors.IsNotFound(err) {
+				log.Errorf("apisixRoute %s is removed", key)
+				return nil
+			}
+			return err // if error occurred, return
+		}
 		oldApisixRoute := apisix.ApisixRoute(*rqo.OldObj)
 		oldRoutes, _, _, _ := oldApisixRoute.Convert()
 
@@ -232,10 +241,16 @@ func (c *ApisixRouteController) sync(rqo *RouteQueueObj) error {
 		rc := &state.RouteCompare{OldRoutes: oldRoutes, NewRoutes: newRoutes}
 		return rc.Sync()
 	case rqo.Ope == DELETE:
-		apisixRoute := apisix.ApisixRoute(*apisixIngressRoute)
+		apisixIngressRoute, _ := c.apisixRouteList.ApisixRoutes(namespace).Get(name)
+		if apisixIngressRoute != nil && apisixIngressRoute.ResourceVersion > rqo.OldObj.ResourceVersion {
+			log.Warnf("Route %s has been covered when retry", rqo.Key)
+			return nil
+		}
+		apisixRoute := apisix.ApisixRoute(*rqo.OldObj)
 		routes, _, _, _ := apisixRoute.Convert()
 		rc := &state.RouteCompare{OldRoutes: routes, NewRoutes: nil}
 		return rc.Sync()
+
 	default:
 		return fmt.Errorf("not expected in (ApisixRouteController) sync")
 	}
diff --git a/pkg/ingress/controller/apisix_service.go b/pkg/ingress/controller/apisix_service.go
index e446736..a6b2005 100644
--- a/pkg/ingress/controller/apisix_service.go
+++ b/pkg/ingress/controller/apisix_service.go
@@ -55,7 +55,7 @@ func BuildApisixServiceController(
 		apisixClientset:     apisixServiceClientset,
 		apisixServiceList:   apisixServiceInformer.Lister(),
 		apisixServiceSynced: apisixServiceInformer.Informer().HasSynced,
-		workqueue:           workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ApisixServices"),
+		workqueue:           workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixServices"),
 	}
 	apisixServiceInformer.Informer().AddEventHandler(
 		cache.ResourceEventHandlerFuncs{
@@ -66,6 +66,12 @@ func BuildApisixServiceController(
 	return controller
 }
 
+type ServiceQueueObj struct {
+	Key    string                  `json:"key"`
+	OldObj *apisixV1.ApisixService `json:"old_obj"`
+	Ope    string                  `json:"ope"` // add / update / delete
+}
+
 func (c *ApisixServiceController) Run(stop <-chan struct{}) error {
 	// 同步缓存
 	if ok := cache.WaitForCacheSync(stop); !ok {
@@ -89,16 +95,16 @@ func (c *ApisixServiceController) processNextWorkItem() bool {
 	}
 	err := func(obj interface{}) error {
 		defer c.workqueue.Done(obj)
-		var key string
+		var sqo *ServiceQueueObj
 		var ok bool
-
-		if key, ok = obj.(string); !ok {
+		if sqo, ok = obj.(*ServiceQueueObj); !ok {
 			c.workqueue.Forget(obj)
-			return fmt.Errorf("expected string in workqueue but got %#v", obj)
+			return fmt.Errorf("expected ServiceQueueObj in workqueue but got %#v", obj)
 		}
-		// 在syncHandler中处理业务
-		if err := c.syncHandler(key); err != nil {
-			return fmt.Errorf("error syncing '%s': %s", key, err.Error())
+		if err := c.syncHandler(sqo); err != nil {
+			c.workqueue.AddRateLimited(obj)
+			log.Errorf("sync service %s failed", sqo.Key)
+			return fmt.Errorf("error syncing '%s': %s", sqo.Key, err.Error())
 		}
 
 		c.workqueue.Forget(obj)
@@ -110,24 +116,30 @@ func (c *ApisixServiceController) processNextWorkItem() bool {
 	return true
 }
 
-func (c *ApisixServiceController) syncHandler(key string) error {
-	namespace, name, err := cache.SplitMetaNamespaceKey(key)
+func (c *ApisixServiceController) syncHandler(sqo *ServiceQueueObj) error {
+	namespace, name, err := cache.SplitMetaNamespaceKey(sqo.Key)
 	if err != nil {
-		log.Errorf("invalid resource key: %s", key)
-		return fmt.Errorf("invalid resource key: %s", key)
+		log.Errorf("invalid resource key: %s", sqo.Key)
+		return fmt.Errorf("invalid resource key: %s", sqo.Key)
 	}
-
-	apisixServiceYaml, err := c.apisixServiceList.ApisixServices(namespace).Get(name)
-	if err != nil {
-		if errors.IsNotFound(err) {
-			log.Infof("apisixUpstream %s is removed", key)
+	apisixServiceYaml := sqo.OldObj
+	if sqo.Ope == DELETE {
+		apisixIngressService, _ := c.apisixServiceList.ApisixServices(namespace).Get(name)
+		if apisixIngressService != nil && apisixIngressService.ResourceVersion > sqo.OldObj.ResourceVersion {
+			log.Warnf("Service %s has been covered when retry", sqo.Key)
 			return nil
 		}
-		runtime.HandleError(fmt.Errorf("failed to list apisixUpstream %s/%s", key, err.Error()))
-		return err
+	} else {
+		apisixServiceYaml, err = c.apisixServiceList.ApisixServices(namespace).Get(name)
+		if err != nil {
+			if errors.IsNotFound(err) {
+				log.Infof("apisixUpstream %s is removed", sqo.Key)
+				return nil
+			}
+			runtime.HandleError(fmt.Errorf("failed to list apisixUpstream %s/%s", sqo.Key, err.Error()))
+			return err
+		}
 	}
-	log.Info(namespace)
-	log.Info(name)
 	apisixService := apisix.ApisixServiceCRD(*apisixServiceYaml)
 	services, upstreams, _ := apisixService.Convert()
 	comb := state.ApisixCombination{Routes: nil, Services: services, Upstreams: upstreams}
@@ -142,19 +154,39 @@ func (c *ApisixServiceController) addFunc(obj interface{}) {
 		runtime.HandleError(err)
 		return
 	}
-	c.workqueue.AddRateLimited(key)
+	sqo := &RouteQueueObj{Key: key, OldObj: nil, Ope: ADD}
+	c.workqueue.AddRateLimited(sqo)
 }
 
 func (c *ApisixServiceController) updateFunc(oldObj, newObj interface{}) {
-	oldRoute := oldObj.(*apisixV1.ApisixService)
-	newRoute := newObj.(*apisixV1.ApisixService)
-	if oldRoute.ResourceVersion == newRoute.ResourceVersion {
+	oldService := oldObj.(*apisixV1.ApisixService)
+	newService := newObj.(*apisixV1.ApisixService)
+	if oldService.ResourceVersion >= newService.ResourceVersion {
 		return
 	}
-	c.addFunc(newObj)
+	var key string
+	var err error
+	if key, err = cache.MetaNamespaceKeyFunc(newObj); err != nil {
+		runtime.HandleError(err)
+		return
+	}
+	sqo := &ServiceQueueObj{Key: key, OldObj: oldService, Ope: UPDATE}
+	c.workqueue.AddRateLimited(sqo)
 }
 
 func (c *ApisixServiceController) deleteFunc(obj interface{}) {
+	oldService, ok := obj.(*apisixV1.ApisixService)
+	if !ok {
+		oldState, ok := obj.(cache.DeletedFinalStateUnknown)
+		if !ok {
+			return
+		}
+		oldService, ok = oldState.Obj.(*apisixV1.ApisixService)
+		if !ok {
+			return
+		}
+	}
+
 	var key string
 	var err error
 	key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
@@ -162,5 +194,6 @@ func (c *ApisixServiceController) deleteFunc(obj interface{}) {
 		runtime.HandleError(err)
 		return
 	}
-	c.workqueue.AddRateLimited(key)
+	sqo := &ServiceQueueObj{Key: key, OldObj: oldService, Ope: DELETE}
+	c.workqueue.AddRateLimited(sqo)
 }
diff --git a/pkg/ingress/controller/apisix_tls.go b/pkg/ingress/controller/apisix_tls.go
index fe2a144..17f0405 100644
--- a/pkg/ingress/controller/apisix_tls.go
+++ b/pkg/ingress/controller/apisix_tls.go
@@ -60,7 +60,7 @@ func BuildApisixTlsController(
 		apisixClientset: apisixTlsClientset,
 		apisixTlsList:   apisixTlsInformer.Lister(),
 		apisixTlsSynced: apisixTlsInformer.Informer().HasSynced,
-		workqueue:       workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ApisixTlses"),
+		workqueue:       workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixTlses"),
 	}
 	apisixTlsInformer.Informer().AddEventHandler(
 		cache.ResourceEventHandlerFuncs{
@@ -102,6 +102,8 @@ func (c *ApisixTlsController) processNextWorkItem() bool {
 			return fmt.Errorf("expected TlsQueueObj in workqueue but got %#v", obj)
 		}
 		if err := c.syncHandler(tqo); err != nil {
+			c.workqueue.AddRateLimited(tqo)
+			log.Errorf("sync tls %s failed", tqo.Key)
 			return fmt.Errorf("error syncing '%s': %s", key, err.Error())
 		}
 
@@ -121,7 +123,13 @@ func (c *ApisixTlsController) syncHandler(tqo *TlsQueueObj) error {
 		return fmt.Errorf("invalid resource key: %s", tqo.Key)
 	}
 	apisixTlsYaml := tqo.OldObj
-	if tqo.Ope != state.Delete {
+	if tqo.Ope == state.Delete {
+		apisixIngressTls, _ := c.apisixTlsList.ApisixTlses(namespace).Get(name)
+		if apisixIngressTls != nil && apisixIngressTls.ResourceVersion > tqo.OldObj.ResourceVersion {
+			log.Warnf("TLS %s has been covered when retry", tqo.Key)
+			return nil
+		}
+	} else {
 		apisixTlsYaml, err = c.apisixTlsList.ApisixTlses(namespace).Get(name)
 		if err != nil {
 			if errors.IsNotFound(err) {
@@ -132,6 +140,7 @@ func (c *ApisixTlsController) syncHandler(tqo *TlsQueueObj) error {
 			return err
 		}
 	}
+
 	apisixTls := apisix.ApisixTlsCRD(*apisixTlsYaml)
 	sc := &apisix.SecretClient{}
 	if tls, err := apisixTls.Convert(sc); err != nil {
@@ -173,7 +182,17 @@ func (c *ApisixTlsController) updateFunc(oldObj, newObj interface{}) {
 }
 
 func (c *ApisixTlsController) deleteFunc(obj interface{}) {
-	oldTls := obj.(cache.DeletedFinalStateUnknown).Obj.(*apisixV1.ApisixTls)
+	oldTls, ok := obj.(*apisixV1.ApisixTls)
+	if !ok {
+		oldState, ok := obj.(cache.DeletedFinalStateUnknown)
+		if !ok {
+			return
+		}
+		oldTls, ok = oldState.Obj.(*apisixV1.ApisixTls)
+		if !ok {
+			return
+		}
+	}
 	var key string
 	var err error
 	key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
diff --git a/pkg/ingress/controller/apisix_upstream.go b/pkg/ingress/controller/apisix_upstream.go
index e9436be..092f63e 100644
--- a/pkg/ingress/controller/apisix_upstream.go
+++ b/pkg/ingress/controller/apisix_upstream.go
@@ -56,7 +56,7 @@ func BuildApisixUpstreamController(
 		apisixClientset:      apisixUpstreamClientset,
 		apisixUpstreamList:   apisixUpstreamInformer.Lister(),
 		apisixUpstreamSynced: apisixUpstreamInformer.Informer().HasSynced,
-		workqueue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ApisixUpstreams"),
+		workqueue:            workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixUpstreams"),
 	}
 	apisixUpstreamInformer.Informer().AddEventHandler(
 		cache.ResourceEventHandlerFuncs{
@@ -99,6 +99,7 @@ func (c *ApisixUpstreamController) processNextWorkItem() bool {
 		}
 		// 在syncHandler中处理业务
 		if err := c.syncHandler(key); err != nil {
+			c.workqueue.AddRateLimited(obj)
 			return fmt.Errorf("error syncing '%s': %s", key, err.Error())
 		}