You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by al...@apache.org on 2023/04/13 01:21:06 UTC
[apisix-ingress-controller] branch master updated: refactor: update status (#1618)
This is an automated email from the ASF dual-hosted git repository.
alinsran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 3a8fdf64 refactor: update status (#1618)
3a8fdf64 is described below
commit 3a8fdf641cde6d0095ecee6917c35cec7193624d
Author: Xin Rong <al...@gmail.com>
AuthorDate: Thu Apr 13 09:21:01 2023 +0800
refactor: update status (#1618)
---
go.mod | 2 +
go.sum | 4 +
pkg/kube/apisix_consumer.go | 12 +
pkg/kube/apisix_plugin_config.go | 11 +
pkg/kube/apisix_route.go | 11 +
pkg/kube/apisix_tls.go | 34 ++-
pkg/kube/apisix_upstream.go | 11 +
pkg/kube/ingress.go | 13 +
pkg/providers/apisix/apisix_consumer.go | 99 ++++++--
pkg/providers/apisix/apisix_plugin_config.go | 259 +++++++++++---------
pkg/providers/apisix/apisix_route.go | 269 +++++++++++----------
pkg/providers/apisix/apisix_tls.go | 115 ++++++---
pkg/providers/apisix/apisix_upstream.go | 130 +++++++---
pkg/providers/ingress/ingress.go | 262 +++++++++++---------
.../suite-ingress/suite-ingress-features/status.go | 226 +++++++++++++++--
15 files changed, 989 insertions(+), 469 deletions(-)
diff --git a/go.mod b/go.mod
index ec174fd7..1beb41af 100644
--- a/go.mod
+++ b/go.mod
@@ -18,6 +18,7 @@ require (
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
golang.org/x/net v0.9.0
+ gopkg.in/go-playground/pool.v3 v3.1.1
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.26.3
@@ -89,6 +90,7 @@ require (
golang.org/x/tools v0.6.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.30.0 // indirect
+ gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.26.3 // indirect
diff --git a/go.sum b/go.sum
index 121066ec..ef731e04 100644
--- a/go.sum
+++ b/go.sum
@@ -336,6 +336,10 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
+gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM=
+gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE=
+gopkg.in/go-playground/pool.v3 v3.1.1 h1:4Qcj91IsYTpIeRhe/eo6Fz+w6uKWPEghx8vHFTYMfhw=
+gopkg.in/go-playground/pool.v3 v3.1.1/go.mod h1:pUAGBximS/hccTTSzEop6wvvQhVa3QPDFFW+8REdutg=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
diff --git a/pkg/kube/apisix_consumer.go b/pkg/kube/apisix_consumer.go
index e57956ef..4f9f71a6 100644
--- a/pkg/kube/apisix_consumer.go
+++ b/pkg/kube/apisix_consumer.go
@@ -17,6 +17,8 @@ package kube
import (
"errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
"github.com/apache/apisix-ingress-controller/pkg/config"
configv2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2"
configv2beta3 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3"
@@ -54,6 +56,8 @@ type ApisixConsumer interface {
// ResourceVersion returns the the resource version field inside
// the real ApisixConsumer.
ResourceVersion() string
+
+ metav1.Object
}
// ApisixConsumerEvent contains the ApisixConsumer key (namespace/name)
@@ -68,6 +72,8 @@ type apisixConsumer struct {
groupVersion string
v2beta3 *configv2beta3.ApisixConsumer
v2 *configv2.ApisixConsumer
+
+ metav1.Object
}
func (ac *apisixConsumer) V2beta3() *configv2beta3.ApisixConsumer {
@@ -108,6 +114,7 @@ func (l *apisixConsumerLister) V2beta3(namespace, name string) (ApisixConsumer,
return &apisixConsumer{
groupVersion: config.ApisixV2beta3,
v2beta3: ac,
+ Object: ac,
}, nil
}
@@ -119,6 +126,7 @@ func (l *apisixConsumerLister) V2(namespace, name string) (ApisixConsumer, error
return &apisixConsumer{
groupVersion: config.ApisixV2,
v2: ac,
+ Object: ac,
}, nil
}
@@ -130,11 +138,13 @@ func MustNewApisixConsumer(obj interface{}) ApisixConsumer {
return &apisixConsumer{
groupVersion: config.ApisixV2beta3,
v2beta3: ac,
+ Object: ac,
}
case *configv2.ApisixConsumer:
return &apisixConsumer{
groupVersion: config.ApisixV2,
v2: ac,
+ Object: ac,
}
default:
panic("invalid ApisixConsumer type")
@@ -150,11 +160,13 @@ func NewApisixConsumer(obj interface{}) (ApisixConsumer, error) {
return &apisixConsumer{
groupVersion: config.ApisixV2beta3,
v2beta3: ac,
+ Object: ac,
}, nil
case *configv2.ApisixConsumer:
return &apisixConsumer{
groupVersion: config.ApisixV2,
v2: ac,
+ Object: ac,
}, nil
default:
return nil, errors.New("invalid ApisixConsumer type")
diff --git a/pkg/kube/apisix_plugin_config.go b/pkg/kube/apisix_plugin_config.go
index 922caf6e..ddee747e 100644
--- a/pkg/kube/apisix_plugin_config.go
+++ b/pkg/kube/apisix_plugin_config.go
@@ -17,6 +17,8 @@ package kube
import (
"errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
"github.com/apache/apisix-ingress-controller/pkg/config"
configv2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2"
configv2beta3 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3"
@@ -54,6 +56,8 @@ type ApisixPluginConfig interface {
// ResourceVersion returns the the resource version field inside
// the real ApisixPluginConfig.
ResourceVersion() string
+
+ metav1.Object
}
// ApisixPluginConfigEvent contains the ApisixPluginConfig key (namespace/name)
@@ -68,6 +72,7 @@ type apisixPluginConfig struct {
groupVersion string
v2beta3 *configv2beta3.ApisixPluginConfig
v2 *configv2.ApisixPluginConfig
+ metav1.Object
}
func (apc *apisixPluginConfig) V2beta3() *configv2beta3.ApisixPluginConfig {
@@ -108,6 +113,7 @@ func (l *apisixPluginConfigLister) V2beta3(namespace, name string) (ApisixPlugin
return &apisixPluginConfig{
groupVersion: config.ApisixV2beta3,
v2beta3: apc,
+ Object: apc,
}, nil
}
@@ -119,6 +125,7 @@ func (l *apisixPluginConfigLister) V2(namespace, name string) (ApisixPluginConfi
return &apisixPluginConfig{
groupVersion: config.ApisixV2,
v2: apc,
+ Object: apc,
}, nil
}
@@ -130,11 +137,13 @@ func MustNewApisixPluginConfig(obj interface{}) ApisixPluginConfig {
return &apisixPluginConfig{
groupVersion: config.ApisixV2beta3,
v2beta3: apc,
+ Object: apc,
}
case *configv2.ApisixPluginConfig:
return &apisixPluginConfig{
groupVersion: config.ApisixV2,
v2: apc,
+ Object: apc,
}
default:
panic("invalid ApisixPluginConfig type")
@@ -150,11 +159,13 @@ func NewApisixPluginConfig(obj interface{}) (ApisixPluginConfig, error) {
return &apisixPluginConfig{
groupVersion: config.ApisixV2beta3,
v2beta3: apc,
+ Object: apc,
}, nil
case *configv2.ApisixPluginConfig:
return &apisixPluginConfig{
groupVersion: config.ApisixV2,
v2: apc,
+ Object: apc,
}, nil
default:
return nil, errors.New("invalid ApisixPluginConfig type")
diff --git a/pkg/kube/apisix_route.go b/pkg/kube/apisix_route.go
index bf011b9e..069c4369 100644
--- a/pkg/kube/apisix_route.go
+++ b/pkg/kube/apisix_route.go
@@ -17,6 +17,8 @@ package kube
import (
"errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
"github.com/apache/apisix-ingress-controller/pkg/config"
configv2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2"
configv2beta3 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3"
@@ -58,6 +60,8 @@ type ApisixRoute interface {
// ResourceVersion returns the the resource version field inside
// the real ApisixRoute.
ResourceVersion() string
+
+ metav1.Object
}
// ApisixRouteEvent contains the ApisixRoute key (namespace/name)
@@ -72,6 +76,7 @@ type apisixRoute struct {
groupVersion string
v2beta3 *configv2beta3.ApisixRoute
v2 *configv2.ApisixRoute
+ metav1.Object
}
func (l *apisixRouteLister) V2beta3Lister() listersv2beta3.ApisixRouteLister {
@@ -119,6 +124,7 @@ func (l *apisixRouteLister) V2beta3(namespace, name string) (ApisixRoute, error)
return &apisixRoute{
groupVersion: config.ApisixV2beta3,
v2beta3: ar,
+ Object: ar,
}, nil
}
func (l *apisixRouteLister) V2(namespace, name string) (ApisixRoute, error) {
@@ -129,6 +135,7 @@ func (l *apisixRouteLister) V2(namespace, name string) (ApisixRoute, error) {
return &apisixRoute{
groupVersion: config.ApisixV2,
v2: ar,
+ Object: ar,
}, nil
}
@@ -140,11 +147,13 @@ func MustNewApisixRoute(obj interface{}) ApisixRoute {
return &apisixRoute{
groupVersion: config.ApisixV2beta3,
v2beta3: ar,
+ Object: ar,
}
case *configv2.ApisixRoute:
return &apisixRoute{
groupVersion: config.ApisixV2,
v2: ar,
+ Object: ar,
}
default:
panic("invalid ApisixRoute type")
@@ -160,11 +169,13 @@ func NewApisixRoute(obj interface{}) (ApisixRoute, error) {
return &apisixRoute{
groupVersion: config.ApisixV2beta3,
v2beta3: ar,
+ Object: ar,
}, nil
case *configv2.ApisixRoute:
return &apisixRoute{
groupVersion: config.ApisixV2,
v2: ar,
+ Object: ar,
}, nil
default:
return nil, errors.New("invalid ApisixRoute type")
diff --git a/pkg/kube/apisix_tls.go b/pkg/kube/apisix_tls.go
index 403edbac..96508ae6 100644
--- a/pkg/kube/apisix_tls.go
+++ b/pkg/kube/apisix_tls.go
@@ -17,6 +17,8 @@ package kube
import (
"fmt"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
"github.com/apache/apisix-ingress-controller/pkg/config"
configv2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2"
configv2beta3 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3"
@@ -54,6 +56,8 @@ type ApisixTls interface {
// ResourceVersion returns the the resource version field inside
// the real ApisixTls.
ResourceVersion() string
+
+ metav1.Object
}
// ApisixTlsEvent contains the ApisixTls key (namespace/name)
@@ -68,6 +72,8 @@ type apisixTls struct {
groupVersion string
v2beta3 *configv2beta3.ApisixTls
v2 *configv2.ApisixTls
+
+ metav1.Object
}
func (atls *apisixTls) V2beta3() *configv2beta3.ApisixTls {
@@ -100,39 +106,43 @@ type apisixTlsLister struct {
}
func (l *apisixTlsLister) V2beta3(namespace, name string) (ApisixTls, error) {
- ar, err := l.v2beta3Lister.ApisixTlses(namespace).Get(name)
+ at, err := l.v2beta3Lister.ApisixTlses(namespace).Get(name)
if err != nil {
return nil, err
}
return &apisixTls{
groupVersion: config.ApisixV2beta3,
- v2beta3: ar,
+ v2beta3: at,
+ Object: at,
}, nil
}
func (l *apisixTlsLister) V2(namespace, name string) (ApisixTls, error) {
- ar, err := l.v2Lister.ApisixTlses(namespace).Get(name)
+ at, err := l.v2Lister.ApisixTlses(namespace).Get(name)
if err != nil {
return nil, err
}
return &apisixTls{
groupVersion: config.ApisixV2,
- v2: ar,
+ v2: at,
+ Object: at,
}, nil
}
// MustNewApisixTls creates a kube.ApisixTls object according to the
// type of obj.
func MustNewApisixTls(obj interface{}) ApisixTls {
- switch ar := obj.(type) {
+ switch at := obj.(type) {
case *configv2beta3.ApisixTls:
return &apisixTls{
groupVersion: config.ApisixV2beta3,
- v2beta3: ar,
+ v2beta3: at,
+ Object: at,
}
case *configv2.ApisixTls:
return &apisixTls{
groupVersion: config.ApisixV2,
- v2: ar,
+ v2: at,
+ Object: at,
}
default:
panic("invalid ApisixTls type")
@@ -143,19 +153,21 @@ func MustNewApisixTls(obj interface{}) ApisixTls {
// type of obj. It returns nil and the error reason when the
// type assertion fails.
func NewApisixTls(obj interface{}) (ApisixTls, error) {
- switch ar := obj.(type) {
+ switch at := obj.(type) {
case *configv2beta3.ApisixTls:
return &apisixTls{
groupVersion: config.ApisixV2beta3,
- v2beta3: ar,
+ v2beta3: at,
+ Object: at,
}, nil
case *configv2.ApisixTls:
return &apisixTls{
groupVersion: config.ApisixV2,
- v2: ar,
+ v2: at,
+ Object: at,
}, nil
default:
- return nil, fmt.Errorf("invalid ApisixTls type %T", ar)
+ return nil, fmt.Errorf("invalid ApisixTls type %T", at)
}
}
diff --git a/pkg/kube/apisix_upstream.go b/pkg/kube/apisix_upstream.go
index 2ac51791..7d8c44c6 100644
--- a/pkg/kube/apisix_upstream.go
+++ b/pkg/kube/apisix_upstream.go
@@ -17,6 +17,7 @@ package kube
import (
"errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"github.com/apache/apisix-ingress-controller/pkg/config"
@@ -58,6 +59,8 @@ type ApisixUpstream interface {
// ResourceVersion returns the the resource version field inside
// the real ApisixUpstream.
ResourceVersion() string
+
+ metav1.Object
}
// ApisixUpstreamEvent contains the ApisixUpstream key (namespace/name)
@@ -72,6 +75,8 @@ type apisixUpstream struct {
groupVersion string
v2beta3 *configv2beta3.ApisixUpstream
v2 *configv2.ApisixUpstream
+
+ metav1.Object
}
func (au *apisixUpstream) V2beta3() *configv2beta3.ApisixUpstream {
@@ -111,6 +116,7 @@ func (l *apisixUpstreamLister) V2beta3(namespace, name string) (ApisixUpstream,
return &apisixUpstream{
groupVersion: config.ApisixV2beta3,
v2beta3: au,
+ Object: au,
}, nil
}
func (l *apisixUpstreamLister) V2(namespace, name string) (ApisixUpstream, error) {
@@ -121,6 +127,7 @@ func (l *apisixUpstreamLister) V2(namespace, name string) (ApisixUpstream, error
return &apisixUpstream{
groupVersion: config.ApisixV2,
v2: au,
+ Object: au,
}, nil
}
@@ -136,11 +143,13 @@ func MustNewApisixUpstream(obj interface{}) ApisixUpstream {
return &apisixUpstream{
groupVersion: config.ApisixV2beta3,
v2beta3: au,
+ Object: au,
}
case *configv2.ApisixUpstream:
return &apisixUpstream{
groupVersion: config.ApisixV2,
v2: au,
+ Object: au,
}
default:
panic("invalid ApisixUpstream type")
@@ -156,11 +165,13 @@ func NewApisixUpstream(obj interface{}) (ApisixUpstream, error) {
return &apisixUpstream{
groupVersion: config.ApisixV2beta3,
v2beta3: au,
+ Object: au,
}, nil
case *configv2.ApisixUpstream:
return &apisixUpstream{
groupVersion: config.ApisixV2,
v2: au,
+ Object: au,
}, nil
default:
return nil, errors.New("invalid ApisixUpstream type")
diff --git a/pkg/kube/ingress.go b/pkg/kube/ingress.go
index 414047b7..84b7ba31 100644
--- a/pkg/kube/ingress.go
+++ b/pkg/kube/ingress.go
@@ -20,6 +20,7 @@ import (
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
networkingv1 "k8s.io/api/networking/v1"
networkingv1beta1 "k8s.io/api/networking/v1beta1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
listersextensionsv1beta1 "k8s.io/client-go/listers/extensions/v1beta1"
listersnetworkingv1 "k8s.io/client-go/listers/networking/v1"
listersnetworkingv1beta1 "k8s.io/client-go/listers/networking/v1beta1"
@@ -69,6 +70,8 @@ type Ingress interface {
// ResourceVersion returns the the resource version field inside
// the real Ingress.
ResourceVersion() string
+
+ metav1.Object
}
// IngressEvents contains the ingress key (namespace/name)
@@ -84,6 +87,7 @@ type ingress struct {
v1 *networkingv1.Ingress
v1beta1 *networkingv1beta1.Ingress
extensionsV1beta1 *extensionsv1beta1.Ingress
+ metav1.Object
}
func (ing *ingress) V1() *networkingv1.Ingress {
@@ -135,6 +139,7 @@ func (l *ingressLister) V1(namespace, name string) (Ingress, error) {
return &ingress{
groupVersion: IngressV1,
v1: ing,
+ Object: ing,
}, nil
}
@@ -146,6 +151,7 @@ func (l *ingressLister) V1beta1(namespace, name string) (Ingress, error) {
return &ingress{
groupVersion: IngressV1beta1,
v1beta1: ing,
+ Object: ing,
}, nil
}
@@ -157,6 +163,7 @@ func (l *ingressLister) ExtensionsV1beta1(namespace, name string) (Ingress, erro
return &ingress{
groupVersion: IngressExtensionsV1beta1,
extensionsV1beta1: ing,
+ Object: ing,
}, nil
}
@@ -168,16 +175,19 @@ func MustNewIngress(obj interface{}) Ingress {
return &ingress{
groupVersion: IngressV1,
v1: ing,
+ Object: ing,
}
case *networkingv1beta1.Ingress:
return &ingress{
groupVersion: IngressV1beta1,
v1beta1: ing,
+ Object: ing,
}
case *extensionsv1beta1.Ingress:
return &ingress{
groupVersion: IngressExtensionsV1beta1,
extensionsV1beta1: ing,
+ Object: ing,
}
default:
panic("invalid ingress type")
@@ -193,16 +203,19 @@ func NewIngress(obj interface{}) (Ingress, error) {
return &ingress{
groupVersion: IngressV1,
v1: ing,
+ Object: ing,
}, nil
case *networkingv1beta1.Ingress:
return &ingress{
groupVersion: IngressV1beta1,
v1beta1: ing,
+ Object: ing,
}, nil
case *extensionsv1beta1.Ingress:
return &ingress{
groupVersion: IngressExtensionsV1beta1,
extensionsV1beta1: ing,
+ Object: ing,
}, nil
default:
return nil, errors.New("invalid ingress type")
diff --git a/pkg/providers/apisix/apisix_consumer.go b/pkg/providers/apisix/apisix_consumer.go
index d9198c5a..2f6a0b0e 100644
--- a/pkg/providers/apisix/apisix_consumer.go
+++ b/pkg/providers/apisix/apisix_consumer.go
@@ -17,9 +17,11 @@ package apisix
import (
"context"
"fmt"
+ "reflect"
"time"
"go.uber.org/zap"
+ "gopkg.in/go-playground/pool.v3"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
@@ -42,6 +44,7 @@ type apisixConsumerController struct {
workqueue workqueue.RateLimitingInterface
workers int
+ pool pool.Pool
}
func newApisixConsumerController(common *apisixCommon) *apisixConsumerController {
@@ -49,6 +52,7 @@ func newApisixConsumerController(common *apisixCommon) *apisixConsumerController
apisixCommon: common,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixConsumer"),
workers: 1,
+ pool: pool.NewLimited(2),
}
c.ApisixConsumerInformer.AddEventHandler(
@@ -65,6 +69,7 @@ func (c *apisixConsumerController) run(ctx context.Context) {
log.Info("ApisixConsumer controller started")
defer log.Info("ApisixConsumer controller exited")
defer c.workqueue.ShutDown()
+ defer c.pool.Close()
for i := 0; i < c.workers; i++ {
go c.runWorker(ctx)
@@ -136,6 +141,7 @@ func (c *apisixConsumerController) sync(ctx context.Context, ev *types.Event) er
multiVersioned = ev.Tombstone.(kube.ApisixConsumer)
}
+ var errRecord error
switch event.GroupVersion {
case config.ApisixV2beta3:
ac := multiVersioned.V2beta3()
@@ -146,9 +152,8 @@ func (c *apisixConsumerController) sync(ctx context.Context, ev *types.Event) er
zap.Error(err),
zap.Any("ApisixConsumer", ac),
)
- c.RecordEvent(ac, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(ac, utils.ResourceSyncAborted, err, metav1.ConditionFalse, ac.GetGeneration())
- return err
+ errRecord = err
+ goto updateStatus
}
log.Debugw("got consumer object from ApisixConsumer",
zap.Any("consumer", consumer),
@@ -160,12 +165,9 @@ func (c *apisixConsumerController) sync(ctx context.Context, ev *types.Event) er
zap.Error(err),
zap.Any("consumer", consumer),
)
- c.RecordEvent(ac, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(ac, utils.ResourceSyncAborted, err, metav1.ConditionFalse, ac.GetGeneration())
- return err
+ errRecord = err
+ goto updateStatus
}
-
- c.RecordEvent(ac, corev1.EventTypeNormal, utils.ResourceSynced, nil)
case config.ApisixV2:
ac := multiVersioned.V2()
@@ -175,9 +177,8 @@ func (c *apisixConsumerController) sync(ctx context.Context, ev *types.Event) er
zap.Error(err),
zap.Any("ApisixConsumer", ac),
)
- c.RecordEvent(ac, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(ac, utils.ResourceSyncAborted, err, metav1.ConditionFalse, ac.GetGeneration())
- return err
+ errRecord = err
+ goto updateStatus
}
log.Debugw("got consumer object from ApisixConsumer",
zap.Any("consumer", consumer),
@@ -189,14 +190,19 @@ func (c *apisixConsumerController) sync(ctx context.Context, ev *types.Event) er
zap.Error(err),
zap.Any("consumer", consumer),
)
- c.RecordEvent(ac, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(ac, utils.ResourceSyncAborted, err, metav1.ConditionFalse, ac.GetGeneration())
- return err
+ errRecord = err
+ goto updateStatus
}
-
- c.RecordEvent(ac, corev1.EventTypeNormal, utils.ResourceSynced, nil)
}
- return nil
+updateStatus:
+ c.pool.Queue(func(wu pool.WorkUnit) (interface{}, error) {
+ if wu.IsCancelled() {
+ return nil, nil
+ }
+ c.updateStatus(multiVersioned, errRecord)
+ return true, nil
+ })
+ return errRecord
}
func (c *apisixConsumerController) handleSyncErr(obj interface{}, err error) {
@@ -268,6 +274,15 @@ func (c *apisixConsumerController) onUpdate(oldObj, newObj interface{}) {
if prev.ResourceVersion() >= curr.ResourceVersion() {
return
}
+ // Updates triggered by status are ignored.
+ if prev.GetGeneration() == curr.GetGeneration() && prev.GetUID() == curr.GetUID() {
+ switch curr.GroupVersion() {
+ case config.ApisixV2:
+ if reflect.DeepEqual(prev.V2().Spec, curr.V2().Spec) && !reflect.DeepEqual(prev.V2().Status, curr.V2().Status) {
+ return
+ }
+ }
+ }
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
log.Errorf("found ApisixConsumer resource with bad meta namespace key: %s", err)
@@ -375,6 +390,56 @@ func (c *apisixConsumerController) ResourceSync(interval time.Duration) {
}
}
+func (c *apisixConsumerController) updateStatus(obj kube.ApisixConsumer, statusErr error) {
+ if obj == nil {
+ return
+ }
+ var (
+ ac kube.ApisixConsumer
+ err error
+ namespace = obj.GetNamespace()
+ name = obj.GetName()
+ )
+
+ switch obj.GroupVersion() {
+ case config.ApisixV2beta3:
+ ac, err = c.ApisixConsumerLister.V2beta3(namespace, name)
+ case config.ApisixV2:
+ ac, err = c.ApisixConsumerLister.V2(namespace, name)
+ }
+ if err != nil {
+ if !k8serrors.IsNotFound(err) {
+ log.Warnw("Failed to update status, unable to get ApisixConsumer",
+ zap.Error(err),
+ zap.String("name", name),
+ zap.String("namespace", namespace),
+ )
+ }
+ return
+ }
+ if ac.ResourceVersion() != obj.ResourceVersion() {
+ return
+ }
+ var (
+ reason = utils.ResourceSynced
+ condition = metav1.ConditionTrue
+ eventType = corev1.EventTypeNormal
+ )
+ if statusErr != nil {
+ reason = utils.ResourceSyncAborted
+ condition = metav1.ConditionFalse
+ eventType = corev1.EventTypeWarning
+ }
+ switch obj.GroupVersion() {
+ case config.ApisixV2beta3:
+ c.RecordEvent(obj.V2beta3(), eventType, reason, statusErr)
+ c.recordStatus(obj.V2beta3(), reason, statusErr, condition, ac.GetGeneration())
+ case config.ApisixV2:
+ c.RecordEvent(obj.V2(), eventType, reason, statusErr)
+ c.recordStatus(obj.V2(), reason, statusErr, condition, ac.GetGeneration())
+ }
+}
+
// recordStatus record resources status
func (c *apisixConsumerController) recordStatus(at interface{}, reason string, err error, status metav1.ConditionStatus, generation int64) {
if c.Kubernetes.DisableStatusUpdates {
diff --git a/pkg/providers/apisix/apisix_plugin_config.go b/pkg/providers/apisix/apisix_plugin_config.go
index 912f18d0..0ab72150 100644
--- a/pkg/providers/apisix/apisix_plugin_config.go
+++ b/pkg/providers/apisix/apisix_plugin_config.go
@@ -17,9 +17,11 @@ package apisix
import (
"context"
"fmt"
+ "reflect"
"time"
"go.uber.org/zap"
+ "gopkg.in/go-playground/pool.v3"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
@@ -43,6 +45,7 @@ type apisixPluginConfigController struct {
workqueue workqueue.RateLimitingInterface
workers int
+ pool pool.Pool
}
func newApisixPluginConfigController(common *apisixCommon) *apisixPluginConfigController {
@@ -50,6 +53,7 @@ func newApisixPluginConfigController(common *apisixCommon) *apisixPluginConfigCo
apisixCommon: common,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixPluginConfig"),
workers: 1,
+ pool: pool.NewLimited(1),
}
c.ApisixPluginConfigInformer.AddEventHandler(
@@ -138,151 +142,172 @@ func (c *apisixPluginConfigController) sync(ctx context.Context, ev *types.Event
}
apc = ev.Tombstone.(kube.ApisixPluginConfig)
}
-
- switch obj.GroupVersion {
- case config.ApisixV2beta3:
- if ev.Type != types.EventDelete {
- tctx, err = c.translator.TranslatePluginConfigV2beta3(apc.V2beta3())
- } else {
- tctx, err = c.translator.GeneratePluginConfigV2beta3DeleteMark(apc.V2beta3())
+ // translator phase: translate resource, construction data plance context
+ var errRecord error
+ {
+ switch obj.GroupVersion {
+ case config.ApisixV2beta3:
+ if ev.Type != types.EventDelete {
+ tctx, err = c.translator.TranslatePluginConfigV2beta3(apc.V2beta3())
+ } else {
+ tctx, err = c.translator.GeneratePluginConfigV2beta3DeleteMark(apc.V2beta3())
+ }
+ if err != nil {
+ log.Errorw("failed to translate ApisixPluginConfig v2beta3",
+ zap.Error(err),
+ zap.Any("object", apc),
+ )
+ errRecord = err
+ goto updatestatus
+ }
+ case config.ApisixV2:
+ if ev.Type != types.EventDelete {
+ tctx, err = c.translator.TranslatePluginConfigV2(apc.V2())
+ } else {
+ tctx, err = c.translator.GeneratePluginConfigV2DeleteMark(apc.V2())
+ }
+ if err != nil {
+ log.Errorw("failed to translate ApisixPluginConfig v2",
+ zap.Error(err),
+ zap.Any("object", apc),
+ )
+ errRecord = err
+ goto updatestatus
+ }
}
- if err != nil {
- log.Errorw("failed to translate ApisixPluginConfig v2beta3",
- zap.Error(err),
- zap.Any("object", apc),
- )
- return err
+
+ }
+ // sync phase: Use context update data palne
+ {
+ log.Debugw("translated ApisixPluginConfig",
+ zap.Any("pluginConfigs", tctx.PluginConfigs),
+ )
+ m := &utils.Manifest{
+ PluginConfigs: tctx.PluginConfigs,
}
- case config.ApisixV2:
- if ev.Type != types.EventDelete {
- tctx, err = c.translator.TranslatePluginConfigV2(apc.V2())
+
+ var (
+ added *utils.Manifest
+ updated *utils.Manifest
+ deleted *utils.Manifest
+ )
+
+ if ev.Type == types.EventDelete {
+ deleted = m
+ } else if ev.Type.IsAddEvent() {
+ added = m
} else {
- tctx, err = c.translator.GeneratePluginConfigV2DeleteMark(apc.V2())
+ var oldCtx *translation.TranslateContext
+ switch obj.GroupVersion {
+ case config.ApisixV2beta3:
+ oldCtx, err = c.translator.TranslatePluginConfigV2beta3(obj.OldObject.V2beta3())
+ case config.ApisixV2:
+ oldCtx, err = c.translator.TranslatePluginConfigV2(obj.OldObject.V2())
+ }
+ if err != nil {
+ log.Errorw("failed to translate old ApisixPluginConfig",
+ zap.String("version", obj.GroupVersion),
+ zap.String("event", "update"),
+ zap.Error(err),
+ zap.Any("ApisixPluginConfig", apc),
+ )
+ errRecord = err
+ goto updatestatus
+ }
+
+ om := &utils.Manifest{
+ PluginConfigs: oldCtx.PluginConfigs,
+ }
+ added, updated, deleted = m.Diff(om)
}
- if err != nil {
- log.Errorw("failed to translate ApisixPluginConfig v2",
+
+ if err := c.SyncManifests(ctx, added, updated, deleted, ev.Type.IsSyncEvent()); err != nil {
+ log.Errorw("failed to sync ApisixPluginConfig to apisix",
zap.Error(err),
- zap.Any("object", apc),
)
- return err
+ errRecord = err
+ goto updatestatus
}
}
+updatestatus:
+ c.pool.Queue(func(wu pool.WorkUnit) (interface{}, error) {
+ if wu.IsCancelled() {
+ return nil, nil
+ }
+ c.updateStatus(apc, errRecord)
+ return true, nil
+ })
+ return errRecord
+}
- log.Debugw("translated ApisixPluginConfig",
- zap.Any("pluginConfigs", tctx.PluginConfigs),
- )
-
- m := &utils.Manifest{
- PluginConfigs: tctx.PluginConfigs,
+func (c *apisixPluginConfigController) updateStatus(obj kube.ApisixPluginConfig, statusErr error) {
+ if obj == nil {
+ return
}
-
var (
- added *utils.Manifest
- updated *utils.Manifest
- deleted *utils.Manifest
+ apc kube.ApisixPluginConfig
+ err error
+ namespace = obj.GetNamespace()
+ name = obj.GetName()
)
- if ev.Type == types.EventDelete {
- deleted = m
- } else if ev.Type.IsAddEvent() {
- added = m
- } else {
- var oldCtx *translation.TranslateContext
- switch obj.GroupVersion {
- case config.ApisixV2beta3:
- oldCtx, err = c.translator.TranslatePluginConfigV2beta3(obj.OldObject.V2beta3())
- case config.ApisixV2:
- oldCtx, err = c.translator.TranslatePluginConfigV2(obj.OldObject.V2())
- }
- if err != nil {
- log.Errorw("failed to translate old ApisixPluginConfig",
- zap.String("version", obj.GroupVersion),
- zap.String("event", "update"),
+ switch obj.GroupVersion() {
+ case config.ApisixV2beta3:
+ apc, err = c.ApisixPluginConfigLister.V2beta3(namespace, name)
+ case config.ApisixV2:
+ apc, err = c.ApisixPluginConfigLister.V2(namespace, name)
+ }
+ if err != nil {
+ if !k8serrors.IsNotFound(err) {
+ log.Warnw("failed to update status, unable to get ApisixPluginConfig",
zap.Error(err),
- zap.Any("ApisixPluginConfig", apc),
+ zap.String("name", name),
+ zap.String("namespace", namespace),
)
- return err
- }
-
- om := &utils.Manifest{
- PluginConfigs: oldCtx.PluginConfigs,
}
- added, updated, deleted = m.Diff(om)
+ return
+ }
+ if apc.ResourceVersion() != obj.ResourceVersion() {
+ return
+ }
+ var (
+ reason = utils.ResourceSynced
+ condition = metav1.ConditionTrue
+ )
+ if statusErr != nil {
+ reason = utils.ResourceSyncAborted
+ condition = metav1.ConditionFalse
+ }
+ switch obj.GroupVersion() {
+ case config.ApisixV2beta3:
+ c.RecordEvent(apc.V2beta3(), v1.EventTypeNormal, reason, statusErr)
+ c.recordStatus(apc.V2beta3(), reason, statusErr, condition, apc.GetGeneration())
+ case config.ApisixV2:
+ c.RecordEvent(apc.V2(), v1.EventTypeNormal, reason, statusErr)
+ c.recordStatus(apc.V2(), reason, statusErr, condition, apc.GetGeneration())
}
-
- return c.SyncManifests(ctx, added, updated, deleted, ev.Type.IsSyncEvent())
}
func (c *apisixPluginConfigController) handleSyncErr(obj interface{}, errOrigin error) {
+ if errOrigin == nil {
+ c.workqueue.Forget(obj)
+ c.MetricsCollector.IncrSyncOperation("PluginConfig", "success")
+ return
+ }
ev := obj.(*types.Event)
- event := ev.Object.(kube.ApisixPluginConfigEvent)
if k8serrors.IsNotFound(errOrigin) && ev.Type != types.EventDelete {
log.Infow("sync ApisixPluginConfig but not found, ignore",
zap.String("event_type", ev.Type.String()),
zap.String("ApisixPluginConfig", ev.Object.(kube.ApisixPluginConfigEvent).Key),
)
- c.workqueue.Forget(event)
- return
- }
- namespace, name, errLocal := cache.SplitMetaNamespaceKey(event.Key)
- if errLocal != nil {
- log.Errorf("invalid resource key: %s", event.Key)
- c.MetricsCollector.IncrSyncOperation("PluginConfig", "failure")
- return
- }
- var apc kube.ApisixPluginConfig
- switch event.GroupVersion {
- case config.ApisixV2beta3:
- apc, errLocal = c.ApisixPluginConfigLister.V2beta3(namespace, name)
- case config.ApisixV2:
- apc, errLocal = c.ApisixPluginConfigLister.V2(namespace, name)
- default:
- errLocal = fmt.Errorf("unsupported ApisixPluginConfig group version %s", event.GroupVersion)
- }
- if errOrigin == nil {
- if ev.Type != types.EventDelete {
- if errLocal == nil {
- switch apc.GroupVersion() {
- case config.ApisixV2beta3:
- c.RecordEvent(apc.V2beta3(), v1.EventTypeNormal, utils.ResourceSynced, nil)
- c.recordStatus(apc.V2beta3(), utils.ResourceSynced, nil, metav1.ConditionTrue, apc.V2beta3().GetGeneration())
- case config.ApisixV2:
- c.RecordEvent(apc.V2(), v1.EventTypeNormal, utils.ResourceSynced, nil)
- c.recordStatus(apc.V2(), utils.ResourceSynced, nil, metav1.ConditionTrue, apc.V2().GetGeneration())
- }
- } else {
- log.Errorw("failed list ApisixPluginConfig",
- zap.Error(errLocal),
- zap.String("name", name),
- zap.String("namespace", namespace),
- )
- }
- }
c.workqueue.Forget(obj)
- c.MetricsCollector.IncrSyncOperation("PluginConfig", "success")
return
}
log.Warnw("sync ApisixPluginConfig failed, will retry",
zap.Any("object", obj),
zap.Error(errOrigin),
)
- if errLocal == nil {
- switch apc.GroupVersion() {
- case config.ApisixV2beta3:
- c.RecordEvent(apc.V2beta3(), v1.EventTypeWarning, utils.ResourceSyncAborted, errOrigin)
- c.recordStatus(apc.V2beta3(), utils.ResourceSyncAborted, errOrigin, metav1.ConditionFalse, apc.V2beta3().GetGeneration())
- case config.ApisixV2:
- c.RecordEvent(apc.V2(), v1.EventTypeWarning, utils.ResourceSyncAborted, errOrigin)
- c.recordStatus(apc.V2(), utils.ResourceSyncAborted, errOrigin, metav1.ConditionFalse, apc.V2().GetGeneration())
- }
- } else {
- log.Errorw("failed list ApisixPluginConfig",
- zap.Error(errLocal),
- zap.String("name", name),
- zap.String("namespace", namespace),
- )
- }
- c.workqueue.AddRateLimited(obj)
+ c.workqueue.Forget(obj)
c.MetricsCollector.IncrSyncOperation("PluginConfig", "failure")
}
@@ -319,6 +344,15 @@ func (c *apisixPluginConfigController) onUpdate(oldObj, newObj interface{}) {
if prev.ResourceVersion() >= curr.ResourceVersion() {
return
}
+ // Updates triggered by status are ignored.
+ if prev.GetGeneration() == curr.GetGeneration() && prev.GetUID() == curr.GetUID() {
+ switch curr.GroupVersion() {
+ case config.ApisixV2:
+ if reflect.DeepEqual(prev.V2().Spec, curr.V2().Spec) && !reflect.DeepEqual(prev.V2().Status, curr.V2().Status) {
+ return
+ }
+ }
+ }
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
log.Errorf("found ApisixPluginConfig resource with bad meta namespace key: %s", err)
@@ -433,6 +467,7 @@ func (c *apisixPluginConfigController) recordStatus(at interface{}, reason strin
Message: message,
ObservedGeneration: generation,
}
+
apisixClient := c.KubeClient.APISIXClient
if kubeObj, ok := at.(runtime.Object); ok {
diff --git a/pkg/providers/apisix/apisix_route.go b/pkg/providers/apisix/apisix_route.go
index 3d536aac..5e6a62b8 100644
--- a/pkg/providers/apisix/apisix_route.go
+++ b/pkg/providers/apisix/apisix_route.go
@@ -17,10 +17,12 @@ package apisix
import (
"context"
"fmt"
+ "reflect"
"sync"
"time"
"go.uber.org/zap"
+ "gopkg.in/go-playground/pool.v3"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
@@ -47,6 +49,8 @@ type apisixRouteController struct {
relatedWorkqueue workqueue.RateLimitingInterface
workers int
+ pool pool.Pool
+
svcLock sync.RWMutex
// service key -> apisix route key
svcMap map[string]map[string]struct{}
@@ -68,6 +72,8 @@ func newApisixRouteController(common *apisixCommon) *apisixRouteController {
relatedWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixRouteRelated"),
workers: 1,
+ pool: pool.NewLimited(2),
+
svcMap: make(map[string]map[string]struct{}),
apisixUpstreamMap: make(map[string]map[string]struct{}),
}
@@ -99,6 +105,7 @@ func (c *apisixRouteController) run(ctx context.Context) {
defer log.Info("ApisixRoute controller exited")
defer c.workqueue.ShutDown()
defer c.relatedWorkqueue.ShutDown()
+ defer c.pool.Close()
for i := 0; i < c.workers; i++ {
go c.runWorker(ctx)
@@ -338,82 +345,93 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error
}
ar = ev.Tombstone.(kube.ApisixRoute)
}
-
- switch obj.GroupVersion {
- case config.ApisixV2beta3:
- if ev.Type != types.EventDelete {
- if err = c.checkPluginNameIfNotEmptyV2beta3(ctx, ar.V2beta3()); err == nil {
- tctx, err = c.translator.TranslateRouteV2beta3(ar.V2beta3())
+ // translator phase: translate resource, construction data plance context
+ {
+ switch obj.GroupVersion {
+ case config.ApisixV2beta3:
+ if ev.Type != types.EventDelete {
+ if err = c.checkPluginNameIfNotEmptyV2beta3(ctx, ar.V2beta3()); err == nil {
+ tctx, err = c.translator.TranslateRouteV2beta3(ar.V2beta3())
+ }
+ } else {
+ tctx, err = c.translator.GenerateRouteV2beta3DeleteMark(ar.V2beta3())
+ }
+ if err != nil {
+ log.Errorw("failed to translate ApisixRoute v2beta3",
+ zap.Error(err),
+ zap.Any("object", ar),
+ )
+ goto updateStatus
+ }
+ case config.ApisixV2:
+ if ev.Type != types.EventDelete {
+ if err = c.checkPluginNameIfNotEmptyV2(ctx, ar.V2()); err == nil {
+ tctx, err = c.translator.TranslateRouteV2(ar.V2())
+ }
+ } else {
+ tctx, err = c.translator.GenerateRouteV2DeleteMark(ar.V2())
+ }
+ if err != nil {
+ log.Errorw("failed to translate ApisixRoute v2",
+ zap.Error(err),
+ zap.Any("object", ar),
+ )
+ goto updateStatus
}
- } else {
- tctx, err = c.translator.GenerateRouteV2beta3DeleteMark(ar.V2beta3())
}
- if err != nil {
- log.Errorw("failed to translate ApisixRoute v2beta3",
- zap.Error(err),
- zap.Any("object", ar),
- )
- return err
+
+ log.Debugw("translated ApisixRoute",
+ zap.Any("routes", tctx.Routes),
+ zap.Any("upstreams", tctx.Upstreams),
+ zap.Any("apisix_route", ar),
+ zap.Any("pluginConfigs", tctx.PluginConfigs),
+ )
+ }
+ // sync phase: Use context update data palne
+ {
+ m := &utils.Manifest{
+ Routes: tctx.Routes,
+ Upstreams: tctx.Upstreams,
+ StreamRoutes: tctx.StreamRoutes,
+ PluginConfigs: tctx.PluginConfigs,
}
- case config.ApisixV2:
- if ev.Type != types.EventDelete {
- if err = c.checkPluginNameIfNotEmptyV2(ctx, ar.V2()); err == nil {
- tctx, err = c.translator.TranslateRouteV2(ar.V2())
- }
+ var (
+ added *utils.Manifest
+ updated *utils.Manifest
+ deleted *utils.Manifest
+ )
+
+ if ev.Type == types.EventDelete {
+ deleted = m
+ } else if ev.Type.IsAddEvent() {
+ added = m
} else {
- tctx, err = c.translator.GenerateRouteV2DeleteMark(ar.V2())
+ oldCtx, _ := c.translator.TranslateOldRoute(obj.OldObject)
+ om := &utils.Manifest{
+ Routes: oldCtx.Routes,
+ Upstreams: oldCtx.Upstreams,
+ StreamRoutes: oldCtx.StreamRoutes,
+ PluginConfigs: oldCtx.PluginConfigs,
+ }
+ added, updated, deleted = m.Diff(om)
}
- if err != nil {
- log.Errorw("failed to translate ApisixRoute v2",
+
+ if err = c.SyncManifests(ctx, added, updated, deleted, ev.Type.IsSyncEvent()); err != nil {
+ log.Errorw("failed to sync ApisixRoute to apisix",
zap.Error(err),
- zap.Any("object", ar),
)
- return err
+ goto updateStatus
}
- default:
- log.Errorw("unknown ApisixRoute version",
- zap.String("version", obj.GroupVersion),
- zap.String("key", obj.Key),
- )
- return fmt.Errorf("unknown ApisixRoute version %v", obj.GroupVersion)
}
-
- log.Debugw("translated ApisixRoute",
- zap.Any("routes", tctx.Routes),
- zap.Any("upstreams", tctx.Upstreams),
- zap.Any("apisix_route", ar),
- zap.Any("pluginConfigs", tctx.PluginConfigs),
- )
-
- m := &utils.Manifest{
- Routes: tctx.Routes,
- Upstreams: tctx.Upstreams,
- StreamRoutes: tctx.StreamRoutes,
- PluginConfigs: tctx.PluginConfigs,
- }
-
- var (
- added *utils.Manifest
- updated *utils.Manifest
- deleted *utils.Manifest
- )
-
- if ev.Type == types.EventDelete {
- deleted = m
- } else if ev.Type.IsAddEvent() {
- added = m
- } else {
- oldCtx, _ := c.translator.TranslateOldRoute(obj.OldObject)
- om := &utils.Manifest{
- Routes: oldCtx.Routes,
- Upstreams: oldCtx.Upstreams,
- StreamRoutes: oldCtx.StreamRoutes,
- PluginConfigs: oldCtx.PluginConfigs,
+updateStatus:
+ c.pool.Queue(func(wu pool.WorkUnit) (interface{}, error) {
+ if wu.IsCancelled() {
+ return nil, nil
}
- added, updated, deleted = m.Diff(om)
- }
-
- return c.SyncManifests(ctx, added, updated, deleted, ev.Type.IsSyncEvent())
+ c.updateStatus(ar, err)
+ return true, nil
+ })
+ return err
}
func (c *apisixRouteController) checkPluginNameIfNotEmptyV2beta3(ctx context.Context, in *v2beta3.ApisixRoute) error {
@@ -462,78 +480,76 @@ func (c *apisixRouteController) checkPluginNameIfNotEmptyV2(ctx context.Context,
return nil
}
-func (c *apisixRouteController) handleSyncErr(obj interface{}, errOrigin error) {
- ev := obj.(*types.Event)
- event := ev.Object.(kube.ApisixRouteEvent)
- if k8serrors.IsNotFound(errOrigin) && ev.Type != types.EventDelete {
- log.Infow("sync ApisixRoute but not found, ignore",
- zap.String("event_type", ev.Type.String()),
- zap.String("ApisixRoute", event.Key),
- )
- c.workqueue.Forget(event)
+func (c *apisixRouteController) updateStatus(obj kube.ApisixRoute, statusErr error) {
+ if obj == nil {
+ return
+ }
+ var (
+ ar kube.ApisixRoute
+ err error
+ namespace = obj.GetNamespace()
+ name = obj.GetName()
+ )
+
+ switch obj.GroupVersion() {
+ case config.ApisixV2beta3:
+ ar, err = c.ApisixRouteLister.V2beta3(namespace, name)
+ case config.ApisixV2:
+ ar, err = c.ApisixRouteLister.V2(namespace, name)
+ }
+ if err != nil {
+ if !k8serrors.IsNotFound(err) {
+ log.Warnw("failed to update status, unable to get ApisixRoute",
+ zap.Error(err),
+ zap.String("name", name),
+ zap.String("namespace", namespace),
+ )
+ }
return
}
- namespace, name, errLocal := cache.SplitMetaNamespaceKey(event.Key)
- if errLocal != nil {
- log.Errorf("invalid resource key: %s", event.Key)
- c.MetricsCollector.IncrSyncOperation("route", "failure")
+ if ar.ResourceVersion() != obj.ResourceVersion() {
return
}
- var ar kube.ApisixRoute
- switch event.GroupVersion {
+ var (
+ reason = utils.ResourceSynced
+ condition = metav1.ConditionTrue
+ eventType = v1.EventTypeNormal
+ )
+ if statusErr != nil {
+ reason = utils.ResourceSyncAborted
+ condition = metav1.ConditionFalse
+ eventType = v1.EventTypeWarning
+ }
+ switch obj.GroupVersion() {
case config.ApisixV2beta3:
- ar, errLocal = c.ApisixRouteLister.V2beta3(namespace, name)
+ c.RecordEvent(obj.V2beta3(), eventType, reason, statusErr)
+ c.recordStatus(obj.V2beta3(), reason, statusErr, condition, ar.GetGeneration())
case config.ApisixV2:
- ar, errLocal = c.ApisixRouteLister.V2(namespace, name)
- default:
- log.Errorw("unknown ApisixRoute version",
- zap.String("version", event.GroupVersion),
- zap.String("key", event.Key),
- )
+ c.RecordEvent(obj.V2(), eventType, reason, statusErr)
+ c.recordStatus(obj.V2(), reason, statusErr, condition, ar.GetGeneration())
}
+}
+
+func (c *apisixRouteController) handleSyncErr(obj interface{}, errOrigin error) {
if errOrigin == nil {
- if ev.Type != types.EventDelete {
- if errLocal == nil {
- switch ar.GroupVersion() {
- case config.ApisixV2beta3:
- c.RecordEvent(ar.V2beta3(), v1.EventTypeNormal, utils.ResourceSynced, nil)
- c.recordStatus(ar.V2beta3(), utils.ResourceSynced, nil, metav1.ConditionTrue, ar.V2beta3().GetGeneration())
- case config.ApisixV2:
- c.RecordEvent(ar.V2(), v1.EventTypeNormal, utils.ResourceSynced, nil)
- c.recordStatus(ar.V2(), utils.ResourceSynced, nil, metav1.ConditionTrue, ar.V2().GetGeneration())
- }
- } else {
- log.Errorw("failed list ApisixRoute",
- zap.Error(errLocal),
- zap.String("name", name),
- zap.String("namespace", namespace),
- )
- }
- }
c.workqueue.Forget(obj)
c.MetricsCollector.IncrSyncOperation("route", "success")
return
}
+ ev := obj.(*types.Event)
+ event := ev.Object.(kube.ApisixRouteEvent)
+ if k8serrors.IsNotFound(errOrigin) && ev.Type != types.EventDelete {
+ log.Infow("sync ApisixRoute but not found, ignore",
+ zap.String("event_type", ev.Type.String()),
+ zap.String("ApisixRoute", event.Key),
+ )
+ c.workqueue.Forget(obj)
+ return
+ }
log.Warnw("sync ApisixRoute failed, will retry",
zap.Any("object", obj),
zap.Error(errOrigin),
)
- if errLocal == nil {
- switch ar.GroupVersion() {
- case config.ApisixV2beta3:
- c.RecordEvent(ar.V2beta3(), v1.EventTypeWarning, utils.ResourceSyncAborted, errOrigin)
- c.recordStatus(ar.V2beta3(), utils.ResourceSyncAborted, errOrigin, metav1.ConditionFalse, ar.V2beta3().GetGeneration())
- case config.ApisixV2:
- c.RecordEvent(ar.V2(), v1.EventTypeWarning, utils.ResourceSyncAborted, errOrigin)
- c.recordStatus(ar.V2(), utils.ResourceSyncAborted, errOrigin, metav1.ConditionFalse, ar.V2().GetGeneration())
- }
- } else {
- log.Errorw("failed list ApisixRoute",
- zap.Error(errLocal),
- zap.String("name", name),
- zap.String("namespace", namespace),
- )
- }
c.workqueue.AddRateLimited(obj)
c.MetricsCollector.IncrSyncOperation("route", "failure")
}
@@ -585,6 +601,15 @@ func (c *apisixRouteController) onUpdate(oldObj, newObj interface{}) {
if prev.ResourceVersion() >= curr.ResourceVersion() {
return
}
+ // Updates triggered by status are ignored.
+ if prev.GetGeneration() == curr.GetGeneration() && prev.GetUID() == curr.GetUID() {
+ switch curr.GroupVersion() {
+ case config.ApisixV2:
+ if reflect.DeepEqual(prev.V2().Spec, curr.V2().Spec) && !reflect.DeepEqual(prev.V2().Status, curr.V2().Status) {
+ return
+ }
+ }
+ }
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
log.Errorf("found ApisixRoute resource with bad meta namespace key: %s", err)
diff --git a/pkg/providers/apisix/apisix_tls.go b/pkg/providers/apisix/apisix_tls.go
index 3f9e6ed1..7fa219c8 100644
--- a/pkg/providers/apisix/apisix_tls.go
+++ b/pkg/providers/apisix/apisix_tls.go
@@ -17,10 +17,12 @@ package apisix
import (
"context"
"fmt"
+ "reflect"
"sync"
"time"
"go.uber.org/zap"
+ "gopkg.in/go-playground/pool.v3"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
@@ -45,6 +47,7 @@ type apisixTlsController struct {
workqueue workqueue.RateLimitingInterface
workers int
+ pool pool.Pool
// secretSSLMap stores reference from K8s secret to ApisixTls
// type: Map<SecretKey, Map<ApisixTlsKey, SSL object in APISIX>>
@@ -58,6 +61,7 @@ func newApisixTlsController(common *apisixCommon) *apisixTlsController {
apisixCommon: common,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixTls"),
workers: 1,
+ pool: pool.NewLimited(2),
secretSSLMap: new(sync.Map),
}
@@ -148,6 +152,7 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
multiVersionedTls = ev.Tombstone.(kube.ApisixTls)
}
+ var errRecord error
switch event.GroupVersion {
case config.ApisixV2beta3:
tls := multiVersionedTls.V2beta3()
@@ -168,9 +173,8 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
zap.Error(err),
zap.Any("ApisixTls", tls),
)
- c.RecordEvent(tls, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(tls, utils.ResourceSyncAborted, err, metav1.ConditionFalse, tls.GetGeneration())
- return err
+ errRecord = err
+ goto updateStatus
}
log.Debugw("got SSL object from ApisixTls",
zap.Any("ssl", ssl),
@@ -182,13 +186,9 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
zap.Error(err),
zap.Any("ssl", ssl),
)
- c.RecordEvent(tls, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(tls, utils.ResourceSyncAborted, err, metav1.ConditionFalse, tls.GetGeneration())
- return err
+ errRecord = err
+ goto updateStatus
}
- c.RecordEvent(tls, corev1.EventTypeNormal, utils.ResourceSynced, nil)
- c.recordStatus(tls, utils.ResourceSynced, nil, metav1.ConditionTrue, tls.GetGeneration())
- return err
case config.ApisixV2:
tls := multiVersionedTls.V2()
ssl, err := c.translator.TranslateSSLV2(tls)
@@ -208,9 +208,8 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
zap.Error(err),
zap.Any("ApisixTls", tls),
)
- c.RecordEvent(tls, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(tls, utils.ResourceSyncAborted, err, metav1.ConditionFalse, tls.GetGeneration())
- return err
+ errRecord = err
+ goto updateStatus
}
log.Debugw("got SSL object from ApisixTls",
zap.Any("ssl", ssl),
@@ -222,15 +221,68 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
zap.Error(err),
zap.Any("ssl", ssl),
)
- c.RecordEvent(tls, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(tls, utils.ResourceSyncAborted, err, metav1.ConditionFalse, tls.GetGeneration())
- return err
+ errRecord = err
+ goto updateStatus
}
- c.RecordEvent(tls, corev1.EventTypeNormal, utils.ResourceSynced, nil)
- c.recordStatus(tls, utils.ResourceSynced, nil, metav1.ConditionTrue, tls.GetGeneration())
- return err
- default:
- return fmt.Errorf("unsupported ApisixTls group version %s", event.GroupVersion)
+ }
+updateStatus:
+ c.pool.Queue(func(wu pool.WorkUnit) (interface{}, error) {
+ if wu.IsCancelled() {
+ return nil, nil
+ }
+ c.updateStatus(multiVersionedTls, errRecord)
+ return true, nil
+ })
+ return errRecord
+}
+
+func (c *apisixTlsController) updateStatus(obj kube.ApisixTls, statusErr error) {
+ if obj == nil {
+ return
+ }
+ var (
+ at kube.ApisixTls
+ err error
+ namespace = obj.GetNamespace()
+ name = obj.GetName()
+ )
+
+ switch obj.GroupVersion() {
+ case config.ApisixV2beta3:
+ at, err = c.ApisixTlsLister.V2beta3(namespace, name)
+ case config.ApisixV2:
+ at, err = c.ApisixTlsLister.V2(namespace, name)
+ }
+ if err != nil {
+ if !k8serrors.IsNotFound(err) {
+ log.Warnw("Failed to update status, unable to get ApisixTls",
+ zap.Error(err),
+ zap.String("name", name),
+ zap.String("namespace", namespace),
+ )
+ }
+ return
+ }
+ if at.ResourceVersion() != obj.ResourceVersion() {
+ return
+ }
+ var (
+ reason = utils.ResourceSynced
+ condition = metav1.ConditionTrue
+ eventType = corev1.EventTypeNormal
+ )
+ if statusErr != nil {
+ reason = utils.ResourceSyncAborted
+ condition = metav1.ConditionFalse
+ eventType = corev1.EventTypeWarning
+ }
+ switch obj.GroupVersion() {
+ case config.ApisixV2beta3:
+ c.RecordEvent(obj.V2beta3(), eventType, reason, statusErr)
+ c.recordStatus(obj.V2beta3(), reason, statusErr, condition, at.GetGeneration())
+ case config.ApisixV2:
+ c.RecordEvent(obj.V2(), eventType, reason, statusErr)
+ c.recordStatus(obj.V2(), reason, statusErr, condition, at.GetGeneration())
}
}
@@ -310,20 +362,29 @@ func (c *apisixTlsController) onAdd(obj interface{}) {
c.MetricsCollector.IncrEvents("TLS", "add")
}
-func (c *apisixTlsController) onUpdate(prev, curr interface{}) {
- oldTls, err := kube.NewApisixTls(prev)
+func (c *apisixTlsController) onUpdate(oldObj, newObj interface{}) {
+ prev, err := kube.NewApisixTls(oldObj)
if err != nil {
log.Errorw("found ApisixTls resource with bad type", zap.Error(err))
return
}
- newTls, err := kube.NewApisixTls(curr)
+ curr, err := kube.NewApisixTls(newObj)
if err != nil {
log.Errorw("found ApisixTls resource with bad type", zap.Error(err))
return
}
- if oldTls.ResourceVersion() >= newTls.ResourceVersion() {
+ if prev.ResourceVersion() >= curr.ResourceVersion() {
return
}
+ // Updates triggered by status are ignored.
+ if prev.GetGeneration() == curr.GetGeneration() && prev.GetUID() == curr.GetUID() {
+ switch curr.GroupVersion() {
+ case config.ApisixV2:
+ if reflect.DeepEqual(prev.V2().Spec, curr.V2().Spec) && !reflect.DeepEqual(prev.V2().Status, curr.V2().Status) {
+ return
+ }
+ }
+ }
key, err := cache.MetaNamespaceKeyFunc(curr)
if err != nil {
log.Errorf("found ApisixTls object with bad namespace/name: %s, ignore it", err)
@@ -332,7 +393,7 @@ func (c *apisixTlsController) onUpdate(prev, curr interface{}) {
if !c.namespaceProvider.IsWatchingNamespace(key) {
return
}
- if !c.isEffective(newTls) {
+ if !c.isEffective(curr) {
return
}
log.Debugw("ApisixTls update event arrived",
@@ -344,8 +405,8 @@ func (c *apisixTlsController) onUpdate(prev, curr interface{}) {
Type: types.EventUpdate,
Object: kube.ApisixTlsEvent{
Key: key,
- OldObject: oldTls,
- GroupVersion: newTls.GroupVersion(),
+ OldObject: prev,
+ GroupVersion: curr.GroupVersion(),
},
})
diff --git a/pkg/providers/apisix/apisix_upstream.go b/pkg/providers/apisix/apisix_upstream.go
index ee281aae..c32848e2 100644
--- a/pkg/providers/apisix/apisix_upstream.go
+++ b/pkg/providers/apisix/apisix_upstream.go
@@ -17,10 +17,12 @@ package apisix
import (
"context"
"fmt"
+ "reflect"
"sync"
"time"
"go.uber.org/zap"
+ "gopkg.in/go-playground/pool.v3"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
@@ -46,6 +48,7 @@ type apisixUpstreamController struct {
workqueue workqueue.RateLimitingInterface
svcWorkqueue workqueue.RateLimitingInterface
workers int
+ pool pool.Pool
externalSvcLock sync.RWMutex
// external name service name -> apisix upstream name
@@ -62,6 +65,7 @@ func newApisixUpstreamController(common *apisixCommon, notifyApisixUpstreamChang
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixUpstream"),
svcWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixUpstreamService"),
workers: 1,
+ pool: pool.NewLimited(2),
externalServiceMap: make(map[string]map[string]struct{}),
notifyApisixUpstreamChange: notifyApisixUpstreamChange,
@@ -89,6 +93,7 @@ func (c *apisixUpstreamController) run(ctx context.Context) {
defer log.Info("ApisixUpstream controller exited")
defer c.workqueue.ShutDown()
defer c.svcWorkqueue.ShutDown()
+ defer c.pool.Close()
for i := 0; i < c.workers; i++ {
go c.runWorker(ctx)
@@ -181,6 +186,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
c.syncRelationship(ev, key, multiVersioned)
+ var errRecord error
switch event.GroupVersion {
case config.ApisixV2beta3:
au := multiVersioned.V2beta3()
@@ -196,9 +202,8 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
svc, err := c.SvcLister.Services(namespace).Get(name)
if err != nil {
log.Errorf("failed to get service %s: %s", key, err)
- c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
- return err
+ errRecord = err
+ goto updateStatus
}
var subsets []configv2beta3.ApisixUpstreamSubset
@@ -217,9 +222,8 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
continue
}
log.Errorf("failed to get upstream %s: %s", upsName, err)
- c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
- return err
+ errRecord = err
+ goto updateStatus
}
var newUps *apisixv1.Upstream
if au.Spec != nil && ev.Type != types.EventDelete {
@@ -234,9 +238,8 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
zap.Any("object", au),
zap.Error(err),
)
- c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
- return err
+ errRecord = err
+ goto updateStatus
}
} else {
newUps = apisixv1.NewDefaultUpstream()
@@ -256,16 +259,11 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
zap.Any("ApisixUpstream", au),
zap.String("cluster", clusterName),
)
- c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
- return err
+ errRecord = err
+ goto updateStatus
}
}
}
- if ev.Type != types.EventDelete {
- c.RecordEvent(au, corev1.EventTypeNormal, utils.ResourceSynced, nil)
- c.recordStatus(au, utils.ResourceSynced, nil, metav1.ConditionTrue, au.GetGeneration())
- }
case config.ApisixV2:
au := multiVersioned.V2()
if au.Spec == nil {
@@ -283,25 +281,26 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
zap.Any("object", au),
zap.Error(err),
)
- c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
- return err
+ errRecord = err
+ goto updateStatus
}
}
if len(au.Spec.ExternalNodes) != 0 {
- return c.updateExternalNodes(ctx, au, nil, newUps, au.Namespace, au.Name, ev.Type.IsSyncEvent())
+ errRecord = c.updateExternalNodes(ctx, au, nil, newUps, au.Namespace, au.Name, ev.Type.IsSyncEvent())
+ goto updateStatus
}
// for service discovery related configuration
if au.Spec.Discovery.ServiceName == "" || au.Spec.Discovery.Type == "" {
log.Error("If you setup Discovery for ApisixUpstream, you need to specify the ServiceName and Type fields.")
- return fmt.Errorf("No ServiceName or Type fields found")
+ errRecord = fmt.Errorf("No ServiceName or Type fields found")
+ goto updateStatus
}
// updateUpstream for real
upsName := apisixv1.ComposeExternalUpstreamName(au.Namespace, au.Name)
- return c.updateUpstream(ctx, upsName, &au.Spec.ApisixUpstreamConfig, ev.Type.IsSyncEvent())
-
+ errRecord = c.updateUpstream(ctx, upsName, &au.Spec.ApisixUpstreamConfig, ev.Type.IsSyncEvent())
+ goto updateStatus
}
var portLevelSettings map[int32]configv2.ApisixUpstreamConfig
@@ -315,9 +314,8 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
svc, err := c.SvcLister.Services(namespace).Get(name)
if err != nil {
log.Errorf("failed to get service %s: %s", key, err)
- c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
- return err
+ errRecord = err
+ goto updateStatus
}
var subsets []configv2.ApisixUpstreamSubset
@@ -338,25 +336,76 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
err := c.updateUpstream(ctx, apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port, types.ResolveGranularity.Endpoint), &cfg, ev.Type.IsSyncEvent())
if err != nil {
- c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
- return err
+ errRecord = err
+ goto updateStatus
}
err = c.updateUpstream(ctx, apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port, types.ResolveGranularity.Service), &cfg, ev.Type.IsSyncEvent())
if err != nil {
- c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
- c.recordStatus(au, utils.ResourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
- return err
+ errRecord = err
+ goto updateStatus
}
}
}
- if ev.Type != types.EventDelete {
- c.RecordEvent(au, corev1.EventTypeNormal, utils.ResourceSynced, nil)
- c.recordStatus(au, utils.ResourceSynced, nil, metav1.ConditionTrue, au.GetGeneration())
+ }
+updateStatus:
+ c.pool.Queue(func(wu pool.WorkUnit) (interface{}, error) {
+ if wu.IsCancelled() {
+ return nil, nil
}
+ c.updateStatus(multiVersioned, errRecord)
+ return true, nil
+ })
+ return errRecord
+}
+
+func (c *apisixUpstreamController) updateStatus(obj kube.ApisixUpstream, statusErr error) {
+ if obj == nil {
+ return
}
+ var (
+ au kube.ApisixUpstream
+ err error
+ namespace = obj.GetNamespace()
+ name = obj.GetName()
+ )
- return err
+ switch obj.GroupVersion() {
+ case config.ApisixV2beta3:
+ au, err = c.ApisixUpstreamLister.V2beta3(namespace, name)
+ case config.ApisixV2:
+ au, err = c.ApisixUpstreamLister.V2(namespace, name)
+ }
+ if err != nil {
+ if !k8serrors.IsNotFound(err) {
+ log.Warnw("Failed to update status, unable to get ApisixUpstream",
+ zap.Error(err),
+ zap.String("name", name),
+ zap.String("namespace", namespace),
+ )
+ }
+ return
+ }
+ if au.ResourceVersion() != obj.ResourceVersion() {
+ return
+ }
+ var (
+ reason = utils.ResourceSynced
+ condition = metav1.ConditionTrue
+ eventType = corev1.EventTypeNormal
+ )
+ if statusErr != nil {
+ reason = utils.ResourceSyncAborted
+ condition = metav1.ConditionFalse
+ eventType = corev1.EventTypeWarning
+ }
+ switch obj.GroupVersion() {
+ case config.ApisixV2beta3:
+ c.RecordEvent(obj.V2beta3(), eventType, reason, statusErr)
+ c.recordStatus(obj.V2beta3(), reason, statusErr, condition, au.GetGeneration())
+ case config.ApisixV2:
+ c.RecordEvent(obj.V2(), eventType, reason, statusErr)
+ c.recordStatus(obj.V2(), reason, statusErr, condition, au.GetGeneration())
+ }
}
func (c *apisixUpstreamController) updateUpstream(ctx context.Context, upsName string, cfg *configv2.ApisixUpstreamConfig, shouldCompare bool) error {
@@ -581,6 +630,15 @@ func (c *apisixUpstreamController) onUpdate(oldObj, newObj interface{}) {
if prev.ResourceVersion() >= curr.ResourceVersion() {
return
}
+ // Updates triggered by status are ignored.
+ if prev.GetGeneration() == curr.GetGeneration() && prev.GetUID() == curr.GetUID() {
+ switch curr.GroupVersion() {
+ case config.ApisixV2:
+ if reflect.DeepEqual(prev.V2().Spec, curr.V2().Spec) && !reflect.DeepEqual(prev.V2().Status, curr.V2().Status) {
+ return
+ }
+ }
+ }
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
log.Errorf("found ApisixUpstream resource with bad meta namespace key: %s", err)
diff --git a/pkg/providers/ingress/ingress.go b/pkg/providers/ingress/ingress.go
index 74649774..35d9da1d 100644
--- a/pkg/providers/ingress/ingress.go
+++ b/pkg/providers/ingress/ingress.go
@@ -17,11 +17,13 @@ package ingress
import (
"context"
"fmt"
+ "reflect"
"strings"
"sync"
"time"
"go.uber.org/zap"
+ "gopkg.in/go-playground/pool.v3"
corev1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
networkingv1 "k8s.io/api/networking/v1"
@@ -51,6 +53,8 @@ type ingressController struct {
workqueue workqueue.RateLimitingInterface
workers int
+ pool pool.Pool
+
// secretSSLMap stores reference from K8s secret to Ingress
// type: Map<SecretKey, Map<IngressVersionKey, SSL in APISIX>>
// SecretKey -> IngressVersionKey -> []string
@@ -65,6 +69,7 @@ func newIngressController(common *ingressCommon) *ingressController {
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ingress"),
workers: 1,
+ pool: pool.NewLimited(2),
secretSSLMap: new(sync.Map),
}
@@ -81,6 +86,7 @@ func (c *ingressController) run(ctx context.Context) {
log.Info("ingress controller started")
defer log.Infof("ingress controller exited")
defer c.workqueue.ShutDown()
+ defer c.pool.Close()
for i := 0; i < c.workers; i++ {
go c.runWorker(ctx)
@@ -108,7 +114,10 @@ func (c *ingressController) sync(ctx context.Context, ev *types.Event) error {
return err
}
- var ing kube.Ingress
+ var (
+ ing kube.Ingress
+ tctx *translation.TranslateContext
+ )
switch ingEv.GroupVersion {
case kube.IngressV1:
ing, err = c.IngressLister.V1(namespace, name)
@@ -143,84 +152,90 @@ func (c *ingressController) sync(ctx context.Context, ev *types.Event) error {
}
ing = ev.Tombstone.(kube.Ingress)
}
- var tctx *translation.TranslateContext
- if ev.Type == types.EventDelete {
- tctx, err = c.translator.TranslateIngressDeleteEvent(ing)
- } else {
- tctx, err = c.translator.TranslateIngress(ing)
- }
+ {
+ if ev.Type == types.EventDelete {
+ tctx, err = c.translator.TranslateIngressDeleteEvent(ing)
+ } else {
+ tctx, err = c.translator.TranslateIngress(ing)
+ }
+ if err != nil {
+ log.Errorw("failed to translate ingress",
+ zap.Error(err),
+ zap.Any("ingress", ing),
+ )
+ goto updateStatus
+ }
- if err != nil {
- log.Errorw("failed to translate ingress",
- zap.Error(err),
+ for _, ssl := range tctx.SSL {
+ ns, ok1 := ssl.Labels[translation.MetaSecretNamespace]
+ sec, ok2 := ssl.Labels[translation.MetaSecretName]
+ if ok1 && ok2 {
+ // We don't support annotation in Ingress
+ // _caAnnotation = "nginx.ingress.kubernetes.io/auth-tls-secret"
+ c.storeSecretReference(ns+"/"+sec, ingEv.Key, ev.Type, ssl)
+ }
+ }
+
+ log.Debugw("translated ingress resource to a couple of routes, upstreams and pluginConfigs",
zap.Any("ingress", ing),
+ zap.Any("routes", tctx.Routes),
+ zap.Any("upstreams", tctx.Upstreams),
+ zap.Any("ssl", tctx.SSL),
+ zap.Any("pluginConfigs", tctx.PluginConfigs),
)
- return err
}
-
- for _, ssl := range tctx.SSL {
- ns, ok1 := ssl.Labels[translation.MetaSecretNamespace]
- sec, ok2 := ssl.Labels[translation.MetaSecretName]
- if ok1 && ok2 {
- // We don't support annotation in Ingress
- // _caAnnotation = "nginx.ingress.kubernetes.io/auth-tls-secret"
- c.storeSecretReference(ns+"/"+sec, ingEv.Key, ev.Type, ssl)
+ {
+ m := &utils.Manifest{
+ SSLs: tctx.SSL,
+ Routes: tctx.Routes,
+ Upstreams: tctx.Upstreams,
+ PluginConfigs: tctx.PluginConfigs,
}
- }
-
- log.Debugw("translated ingress resource to a couple of routes, upstreams and pluginConfigs",
- zap.Any("ingress", ing),
- zap.Any("routes", tctx.Routes),
- zap.Any("upstreams", tctx.Upstreams),
- zap.Any("ssl", tctx.SSL),
- zap.Any("pluginConfigs", tctx.PluginConfigs),
- )
-
- m := &utils.Manifest{
- SSLs: tctx.SSL,
- Routes: tctx.Routes,
- Upstreams: tctx.Upstreams,
- PluginConfigs: tctx.PluginConfigs,
- }
- var (
- added *utils.Manifest
- updated *utils.Manifest
- deleted *utils.Manifest
- )
+ var (
+ added *utils.Manifest
+ updated *utils.Manifest
+ deleted *utils.Manifest
+ )
- if ev.Type == types.EventDelete {
- deleted = m
- } else if ev.Type.IsAddEvent() {
- added = m
- } else {
- oldCtx, err := c.translator.TranslateOldIngress(ingEv.OldObject)
- if err != nil {
- log.Errorw("failed to translate ingress",
- zap.String("event", "update"),
+ if ev.Type == types.EventDelete {
+ deleted = m
+ } else if ev.Type.IsAddEvent() {
+ added = m
+ } else {
+ oldCtx, _ := c.translator.TranslateOldIngress(ingEv.OldObject)
+ om := &utils.Manifest{
+ Routes: oldCtx.Routes,
+ Upstreams: oldCtx.Upstreams,
+ SSLs: oldCtx.SSL,
+ PluginConfigs: oldCtx.PluginConfigs,
+ }
+ added, updated, deleted = m.Diff(om)
+ }
+ if err = c.SyncManifests(ctx, added, updated, deleted, ev.Type.IsSyncEvent()); err != nil {
+ log.Errorw("failed to sync Ingress to apisix",
zap.Error(err),
- zap.Any("ingress", ingEv.OldObject),
)
- return err
- }
- om := &utils.Manifest{
- Routes: oldCtx.Routes,
- Upstreams: oldCtx.Upstreams,
- SSLs: oldCtx.SSL,
- PluginConfigs: oldCtx.PluginConfigs,
+ goto updateStatus
}
- added, updated, deleted = m.Diff(om)
}
- if err := c.SyncManifests(ctx, added, updated, deleted, ev.Type.IsSyncEvent()); err != nil {
- log.Errorw("failed to sync ingress artifacts",
- zap.Error(err),
- )
- return err
- }
- return nil
+updateStatus:
+ c.pool.Queue(func(wu pool.WorkUnit) (interface{}, error) {
+ if wu.IsCancelled() {
+ return nil, nil
+ }
+ c.UpdateStatus(ing)
+ return true, nil
+ })
+ return err
}
func (c *ingressController) handleSyncErr(obj interface{}, err error) {
+ if err == nil {
+ c.workqueue.Forget(obj)
+ c.MetricsCollector.IncrSyncOperation("ingress", "success")
+ return
+ }
ev := obj.(*types.Event)
event := ev.Object.(kube.IngressEvent)
if k8serrors.IsNotFound(err) && ev.Type != types.EventDelete {
@@ -228,70 +243,57 @@ func (c *ingressController) handleSyncErr(obj interface{}, err error) {
zap.String("event_type", ev.Type.String()),
zap.String("ingress", event.Key),
)
- c.workqueue.Forget(event)
+ c.workqueue.Forget(obj)
return
}
- namespace, name, errLocal := cache.SplitMetaNamespaceKey(event.Key)
- if errLocal != nil {
- log.Errorw("invalid resource key",
- zap.Error(errLocal),
- )
+ log.Warnw("sync ingress failed, will retry",
+ zap.Any("object", obj),
+ zap.Error(err),
+ )
+ c.workqueue.AddRateLimited(obj)
+ c.MetricsCollector.IncrSyncOperation("ingress", "failure")
+}
+
+func (c *ingressController) UpdateStatus(obj kube.Ingress) {
+ if obj == nil {
return
}
+ var (
+ namespace = obj.GetNamespace()
+ name = obj.GetName()
+ ing kube.Ingress
+ err error
+ )
- var ing kube.Ingress
- switch event.GroupVersion {
+ switch obj.GroupVersion() {
case kube.IngressV1:
- ing, errLocal = c.IngressLister.V1(namespace, name)
+ ing, err = c.IngressLister.V1(namespace, name)
case kube.IngressV1beta1:
- ing, errLocal = c.IngressLister.V1beta1(namespace, name)
+ ing, err = c.IngressLister.V1beta1(namespace, name)
case kube.IngressExtensionsV1beta1:
- ing, errLocal = c.IngressLister.ExtensionsV1beta1(namespace, name)
+ ing, err = c.IngressLister.ExtensionsV1beta1(namespace, name)
}
-
- if err == nil {
- // add status
- if ev.Type != types.EventDelete {
- if errLocal == nil {
- switch ing.GroupVersion() {
- case kube.IngressV1:
- c.recordStatus(ing.V1(), utils.ResourceSynced, nil, metav1.ConditionTrue, ing.V1().GetGeneration())
- case kube.IngressV1beta1:
- c.recordStatus(ing.V1beta1(), utils.ResourceSynced, nil, metav1.ConditionTrue, ing.V1beta1().GetGeneration())
- case kube.IngressExtensionsV1beta1:
- c.recordStatus(ing.ExtensionsV1beta1(), utils.ResourceSynced, nil, metav1.ConditionTrue, ing.ExtensionsV1beta1().GetGeneration())
- }
- } else {
- log.Errorw("failed to list ingress resource",
- zap.Error(errLocal),
- )
- }
+ if err != nil {
+ if !k8serrors.IsNotFound(err) {
+ log.Warnw("failed to update status, unable to get Ingress",
+ zap.Error(err),
+ zap.String("name", name),
+ zap.String("namespace", namespace),
+ )
}
- c.workqueue.Forget(obj)
- c.MetricsCollector.IncrSyncOperation("ingress", "success")
return
}
- log.Warnw("sync ingress failed, will retry",
- zap.Any("object", obj),
- zap.Error(err),
- )
-
- if errLocal == nil {
- switch ing.GroupVersion() {
- case kube.IngressV1:
- c.recordStatus(ing.V1(), utils.ResourceSyncAborted, err, metav1.ConditionTrue, ing.V1().GetGeneration())
- case kube.IngressV1beta1:
- c.recordStatus(ing.V1beta1(), utils.ResourceSyncAborted, err, metav1.ConditionTrue, ing.V1beta1().GetGeneration())
- case kube.IngressExtensionsV1beta1:
- c.recordStatus(ing.ExtensionsV1beta1(), utils.ResourceSyncAborted, err, metav1.ConditionTrue, ing.ExtensionsV1beta1().GetGeneration())
- }
- } else {
- log.Errorw("failed to list ingress resource",
- zap.Error(errLocal),
- )
+ if ing.ResourceVersion() != obj.ResourceVersion() {
+ return
+ }
+ switch obj.GroupVersion() {
+ case kube.IngressV1:
+ c.recordStatus(obj.V1())
+ case kube.IngressV1beta1:
+ c.recordStatus(obj.V1beta1())
+ case kube.IngressExtensionsV1beta1:
+ c.recordStatus(obj.ExtensionsV1beta1())
}
- c.workqueue.AddRateLimited(obj)
- c.MetricsCollector.IncrSyncOperation("ingress", "failure")
}
func (c *ingressController) onAdd(obj interface{}) {
@@ -334,6 +336,26 @@ func (c *ingressController) onUpdate(oldObj, newObj interface{}) {
if prev.ResourceVersion() >= curr.ResourceVersion() {
return
}
+ // Updates triggered by status are ignored.
+ if prev.GetGeneration() == curr.GetGeneration() && prev.GetUID() == curr.GetUID() {
+ switch curr.GroupVersion() {
+ case kube.IngressV1:
+ if reflect.DeepEqual(prev.V1().Spec, curr.V1().Spec) &&
+ !reflect.DeepEqual(prev.V1().Status, curr.V1().Status) {
+ return
+ }
+ case kube.IngressV1beta1:
+ if reflect.DeepEqual(prev.V1beta1().Spec, curr.V1beta1().Spec) &&
+ !reflect.DeepEqual(prev.V1beta1().Status, curr.V1beta1().Status) {
+ return
+ }
+ case kube.IngressExtensionsV1beta1:
+ if reflect.DeepEqual(prev.ExtensionsV1beta1().Spec, curr.ExtensionsV1beta1().Spec) &&
+ !reflect.DeepEqual(prev.ExtensionsV1beta1().Status, curr.ExtensionsV1beta1().Status) {
+ return
+ }
+ }
+ }
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
@@ -469,7 +491,7 @@ func (c *ingressController) ResourceSync() {
}
// recordStatus record resources status
-func (c *ingressController) recordStatus(at runtime.Object, reason string, err error, status metav1.ConditionStatus, generation int64) {
+func (c *ingressController) recordStatus(at runtime.Object) {
if c.Kubernetes.DisableStatusUpdates {
return
}
@@ -636,7 +658,7 @@ func (c *ingressController) syncSSLs(ctx context.Context, evType types.EventType
runtimeObj := obj.(runtime.Object)
c.RecordEventS(runtimeObj, corev1.EventTypeWarning, utils.ResourceSyncAborted,
fmt.Sprintf("sync from secret %s changes failed, error: %s", secretKey, err.Error()))
- c.recordStatus(runtimeObj, utils.ResourceSyncAborted, err, metav1.ConditionFalse, obj.GetGeneration())
+ c.recordStatus(runtimeObj)
}(obj)
return true
}
@@ -657,11 +679,11 @@ func (c *ingressController) syncSSLs(ctx context.Context, evType types.EventType
)
c.RecordEventS(runtimeObj, corev1.EventTypeWarning, utils.ResourceSyncAborted,
fmt.Sprintf("sync from secret %s changes failed, error: %s", secretKey, err.Error()))
- c.recordStatus(runtimeObj, utils.ResourceSyncAborted, err, metav1.ConditionFalse, obj.GetGeneration())
+ c.recordStatus(runtimeObj)
} else {
c.RecordEventS(runtimeObj, corev1.EventTypeNormal, utils.ResourceSynced,
fmt.Sprintf("sync from secret %s changes", secretKey))
- c.recordStatus(runtimeObj, utils.ResourceSynced, nil, metav1.ConditionTrue, obj.GetGeneration())
+ c.recordStatus(runtimeObj)
}
}(ssl, obj)
return true
diff --git a/test/e2e/suite-ingress/suite-ingress-features/status.go b/test/e2e/suite-ingress/suite-ingress-features/status.go
index 5a2f1d71..1858b7b3 100644
--- a/test/e2e/suite-ingress/suite-ingress-features/status.go
+++ b/test/e2e/suite-ingress/suite-ingress-features/status.go
@@ -27,9 +27,9 @@ import (
"github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
)
-var _ = ginkgo.Describe("suite-ingress-features: Status subresource Testing", func() {
- routeSuites := func(s *scaffold.Scaffold) {
- ginkgo.It("check the ApisixRoute status is recorded", func() {
+var _ = ginkgo.Describe("suite-ingress-features: apisix.apache.org/v2beta3 CRDs status subresource Testing", func() {
+ suites := func(s *scaffold.Scaffold) {
+ ginkgo.It("check the status is recorded", func() {
backendSvc, backendSvcPort := s.DefaultHTTPBackend()
apisixRoute := fmt.Sprintf(`
apiVersion: apisix.apache.org/v2beta3
@@ -65,16 +65,74 @@ spec:
}
ginkgo.Describe("suite-ingress-features: ApisixRoute scaffold v2beta3", func() {
- routeSuites(scaffold.NewDefaultV2beta3Scaffold())
+ suites(scaffold.NewDefaultV2beta3Scaffold())
})
- ginkgo.Describe("suite-ingress-features: ApisixRoute scaffold v2", func() {
- routeSuites(scaffold.NewDefaultV2Scaffold())
+})
+
+var _ = ginkgo.Describe("suite-ingress-features: CRDs status subresource Testing", func() {
+ s := scaffold.NewDefaultScaffold()
+ ginkgo.It("check ApisixRoute status is recorded", func() {
+ backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+ apisixRoute := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2
+kind: ApisixRoute
+metadata:
+ name: httpbin-route
+spec:
+ http:
+ - name: rule1
+ match:
+ hosts:
+ - httpbin.com
+ paths:
+ - /ip
+ backends:
+ - serviceName: %s
+ servicePort: %d
+`, backendSvc, backendSvcPort[0])
+ assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(apisixRoute))
+ time.Sleep(6 * time.Second)
+ // status should be recorded as successful
+ output, err := s.GetOutputFromString("ar", "httpbin-route", "-o", "yaml")
+ assert.Nil(ginkgo.GinkgoT(), err, "Get output of ApisixRoute resource")
+ assert.Contains(ginkgo.GinkgoT(), output, "type: ResourcesAvailable", "status.conditions.type is recorded")
+ assert.Contains(ginkgo.GinkgoT(), output, "reason: ResourcesSynced", "status.conditions.reason is recorded")
+ assert.Contains(ginkgo.GinkgoT(), output, `status: "True"`, "status.conditions.status is recorded")
+ assert.Contains(ginkgo.GinkgoT(), output, "message: Sync Successfully", "status.conditions.message is recorded")
+
+ apisixRoute = fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2
+kind: ApisixRoute
+metadata:
+ name: httpbin-route
+spec:
+ http:
+ - name: rule1
+ match:
+ hosts:
+ - httpbin.com
+ paths:
+ - /ip
+ plugins:
+ - name: non-existent
+ enable: true
+ backends:
+ - serviceName: %s
+ servicePort: %d
+`, backendSvc, backendSvcPort[0])
+ assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(apisixRoute))
+ time.Sleep(6 * time.Second)
+ // status should be recorded as successful
+ output, err = s.GetOutputFromString("ar", "httpbin-route", "-o", "yaml")
+ assert.Nil(ginkgo.GinkgoT(), err, "Get output of ApisixRoute resource")
+ assert.Contains(ginkgo.GinkgoT(), output, "type: ResourcesAvailable", "status.conditions.type is recorded")
+ assert.Contains(ginkgo.GinkgoT(), output, "reason: ResourceSyncAborted", "status.conditions.reason is recorded")
+ assert.Contains(ginkgo.GinkgoT(), output, `status: "False"`, "status.conditions.status is recorded")
})
- upSuite := func(s *scaffold.Scaffold) {
- ginkgo.It("check the ApisixUpstream status is recorded", func() {
- backendSvc, _ := s.DefaultHTTPBackend()
- apisixUpstream := fmt.Sprintf(`
+ ginkgo.It("check the ApisixUpstream status is recorded", func() {
+ backendSvc, _ := s.DefaultHTTPBackend()
+ apisixUpstream := fmt.Sprintf(`
apiVersion: apisix.apache.org/v2beta3
kind: ApisixUpstream
metadata:
@@ -82,24 +140,144 @@ metadata:
spec:
retries: 2
`, backendSvc)
- assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(apisixUpstream))
+ assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(apisixUpstream))
- // status should be recorded as successful
- output, err := s.GetOutputFromString("au", backendSvc, "-o", "yaml")
- assert.Nil(ginkgo.GinkgoT(), err, "Get output of ApisixUpstream resource"+backendSvc)
- hasType := strings.Contains(output, "type: ResourcesAvailable")
- assert.True(ginkgo.GinkgoT(), hasType, "Status is recorded")
- hasMsg := strings.Contains(output, "message: Sync Successfully")
- assert.True(ginkgo.GinkgoT(), hasMsg, "Status is recorded")
- })
- }
+ // status should be recorded as successful
+ output, err := s.GetOutputFromString("au", backendSvc, "-o", "yaml")
+ assert.Nil(ginkgo.GinkgoT(), err, "Get output of ApisixUpstream resource"+backendSvc)
+ hasType := strings.Contains(output, "type: ResourcesAvailable")
+ assert.True(ginkgo.GinkgoT(), hasType, "Status is recorded")
+ hasMsg := strings.Contains(output, "message: Sync Successfully")
+ assert.True(ginkgo.GinkgoT(), hasMsg, "Status is recorded")
+ })
+
+ ginkgo.It("check ApisixPluginConfig status is recorded", func() {
+ apc := `
+apiVersion: apisix.apache.org/v2
+kind: ApisixPluginConfig
+metadata:
+ name: test-apc
+spec:
+ plugins:
+ - name: echo
+ enable: true
+ config:
+ body: "my custom body"
+`
+ assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(apc))
+ time.Sleep(6 * time.Second)
+ // status should be recorded as successfulen
+ output, err := s.GetOutputFromString("apc", "test-apc", "-o", "yaml")
+ assert.Nil(ginkgo.GinkgoT(), err, "Get output of ApisixPluginConfig resource")
+ assert.Contains(ginkgo.GinkgoT(), output, "type: ResourcesAvailable", "status.conditions.type is recorded")
+ assert.Contains(ginkgo.GinkgoT(), output, "reason: ResourcesSynced", "status.conditions.reason is recorded")
+ assert.Contains(ginkgo.GinkgoT(), output, `status: "True"`, "status.conditions.status is recorded")
+ assert.Contains(ginkgo.GinkgoT(), output, "message: Sync Successfully", "status.conditions.message is recorded")
- ginkgo.Describe("suite-ingress-features: ApisixUpstream scaffold v2beta3", func() {
- upSuite(scaffold.NewDefaultV2beta3Scaffold())
+ apc = `
+apiVersion: apisix.apache.org/v2
+kind: ApisixPluginConfig
+metadata:
+ name: test-apc
+spec:
+ plugins:
+ - name: echo
+ enable: true
+ config:
+ body: "my custom body"
+ - name: non-existent
+ enable: true
+`
+
+ assert.Nil(ginkgo.GinkgoT(), s.CreateVersionedApisixResource(apc))
+ time.Sleep(6 * time.Second)
+ // status should be recorded as failed
+ output, err = s.GetOutputFromString("apc", "test-apc", "-o", "yaml")
+ assert.Nil(ginkgo.GinkgoT(), err, "Get output of ApisixPluginConfig resource")
+ assert.Contains(ginkgo.GinkgoT(), output, "type: ResourcesAvailable", "status.conditions.type is recorded")
+ assert.Contains(ginkgo.GinkgoT(), output, "reason: ResourceSyncAborted", "status.conditions.reason is recorded")
+ assert.Contains(ginkgo.GinkgoT(), output, `status: "False"`, "status.conditions.status is recorded")
})
- ginkgo.Describe("suite-ingress-features: ApisixUpstream scaffold v2", func() {
- upSuite(scaffold.NewDefaultV2Scaffold())
+
+ ginkgo.It("check ApisixTls status is recorded", func() {
+ secretName := "test-apisix-tls"
+ cert := `-----BEGIN CERTIFICATE-----
+MIIDSjCCAjICCQC/34ZwGz7ZXjANBgkqhkiG9w0BAQsFADBmMQswCQYDVQQGEwJD
+TjEQMA4GA1UECAwHSmlhbmdzdTEPMA0GA1UEBwwGU3V6aG91MQ8wDQYDVQQKDAZ6
+aGlsaXUxEDAOBgNVBAsMB3NlY3Rpb24xETAPBgNVBAMMCHRlc3QuY29tMCAXDTIx
+MDIwMzE0MjkwOVoYDzIwNTEwMTI3MTQyOTA5WjBmMQswCQYDVQQGEwJDTjEQMA4G
+A1UECAwHSmlhbmdzdTEPMA0GA1UEBwwGU3V6aG91MQ8wDQYDVQQKDAZ6aGlsaXUx
+EDAOBgNVBAsMB3NlY3Rpb24xETAPBgNVBAMMCHRlc3QuY29tMIIBIjANBgkqhkiG
+9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3DEQ5K9PVYicINTHt3arqrsrftrhotyBuGqM
+xxqGMVO/E2SAa/81fC1UCcjYV4Wila0kl8i5fa8HjtVm5UWlrqxeFLOS3E0Wv2QY
+w46BGZJY4InE9zKwYyC2DkBxE6p14JRjmtW/MQPNaOFjJ4bmCuRHsEzmQIGRM0b7
+oKHjfFwv6l7BahgGf9ShHOMdHSkgWj6+2RU3282lrO9bY1JBTKu2Znv9M79nu1Px
+Tn1wCfcuCwA7WQT/QSrE2R43I2vmbIbuSmeg9ivjMazRYQQ+qxQn/6zhiHvP3QZG
+dKmp8imdYi+r84PKOLDEe/yxlgIdr2Au5WCPWwyYMYPWHzeD1wIDAQABMA0GCSqG
+SIb3DQEBCwUAA4IBAQBYzNe83mPVuz96TZ3fmxtOIuz9b6q5JWiJiOzjAD9902Se
+TNYzMM6T/5e0dBpj8Z2qQlhkfNxJJgTwGEE8SdrZIr8DhswR9a0bXDCZjLatCdeU
+iYpt+TDAuySnLhAcd3GfE5ml6am2dOsOKpxHU/8clUSaz+21fckRopWo+xL6rSVC
+4vvKqiU+LWLTZPQNoOqowl7bxoQO2jMWfN/5zvQOFxAbEufIPa9ti3qonDCXbkYn
+PpET/mPDrcb4bGsZkW/cu0LrPSUVp12br5TAYaXqYS0Ex+jAVTXML9SeEQuvU3dH
+5Uw2wVHxQXHglsdCYUXXFd3HZffb4rSQH+Mk0CBI
+-----END CERTIFICATE-----`
+ key := `-----BEGIN RSA PRIVATE KEY-----
+MIIEpQIBAAKCAQEA3DEQ5K9PVYicINTHt3arqrsrftrhotyBuGqMxxqGMVO/E2SA
+a/81fC1UCcjYV4Wila0kl8i5fa8HjtVm5UWlrqxeFLOS3E0Wv2QYw46BGZJY4InE
+9zKwYyC2DkBxE6p14JRjmtW/MQPNaOFjJ4bmCuRHsEzmQIGRM0b7oKHjfFwv6l7B
+ahgGf9ShHOMdHSkgWj6+2RU3282lrO9bY1JBTKu2Znv9M79nu1PxTn1wCfcuCwA7
+WQT/QSrE2R43I2vmbIbuSmeg9ivjMazRYQQ+qxQn/6zhiHvP3QZGdKmp8imdYi+r
+84PKOLDEe/yxlgIdr2Au5WCPWwyYMYPWHzeD1wIDAQABAoIBAQDGmSKpgE1H0k0v
+d3siyFART3vtkLHOWKBPmxqaQhwixWwjq5QA1FCDTcbshFBMsGVyJpZIqGxVJdbl
+RyjlRaooH6NDfKvLM2R+/2Mujot2av7qlpgmdXuODOTnecwDds2W33/vGTa2mL1e
+CVuLPSqjTD40j0dlivdRjoZJ3Xn2oOrpZ812XU8KeZAjuSEAwcyl2nSbyLGDchBB
+kfYZold3FaaLAf2LoVJ2fs+FwEPzDKoNYEvij9OyC0kwI94T5jQ+Z6XGtHXhb2Hy
+Ek3EZeIhV3YcDIid5AjSvcrNtDI24hwszSmhYVc53EKYkpXHf581a3U/SEEhXDlw
+Y0x6j9QRAoGBAPEP0LDgP7DGXxno4h+nf0AMru0pxlrNVQhLcNQB+dYI0pFTwsg+
+AKenoaaE/EGR1KfiY0uf3rVWNrA5kyX1/i18iJx9LSf9NvNgMo84JVvXINgyE6sd
+hvdqxFlV5FBnh8b7ldvYQy3YI0EQNx+/rmeUYPjInbkdiksAtAey4ADNAoGBAOnW
+K0FoX1ljq3rc9uVasiRi6Ix50NHdZ17RcEpMgwWPenbP1aiWkvA8yFhU708lBaZC
+WIUZ6XbfiG0Y9cMtxhtikoouDs5Ifia8juZ2bhkmSGP2FvZCBJJ/sHhnhpzSZNhW
+SyLBUjnynoXwHoQvkoGnVTHAk1VsY7jLNJdr2MczAoGAMYvMmu4caRr8pPimsVbd
+4q44reouKK+XUJMg55JYZVN+4/vRRxLnU44yvWUL6/YrPS5ctkhvn9nOd739rom2
+6mZ0NaXMyDFVQAR/n8wscYnv6D+ypzL0cJnzLWFoAdalo5JGJN94P03zQQYyLkZZ
+dFSc8cVaFZgqumu0lPiA7ekCgYEAiMeVL8Jcm84YXVrpNMmzkGMmwhzzT/8hWy5J
+b7yHm3YM3Xi+8sl5E/uJ+VldTj9KqbD/VIQOs1EX3TEPeObKjfQ/4YIFeRagbAo5
+0IcP6bgh+g7V6aA+Sm9Ui2mLLSpIgN8hPig0796CabhGMW4eVabKx7pstDgdsNd0
+YOpduE8CgYEAu9k9WOQuRX4f6i5LBIxyaYn6Hw6oJn8e/w+p2+HNBXdyVQNqUHBG
+V5rgnBwhc5LeIFbehKvQOvYSWbwbA1VunMpdYgV6+EBLayumJNqV6jGei4okx2of
+wrw7im4TNSAdwVX4Y1F4svJ2as5SJn5QYGAzXDixNuwzXYrpP9rzA2s=
+-----END RSA PRIVATE KEY-----`
+ // create secret
+ err := s.NewSecret(secretName, cert, key)
+ assert.Nil(ginkgo.GinkgoT(), err, "create secret error")
+ // create ApisixTls resource
+ tlsName := "tls-name"
+ host := "api6.com"
+ assert.Nil(ginkgo.GinkgoT(), s.NewApisixTls(tlsName, host, secretName), "create tls error")
+ time.Sleep(6 * time.Second)
+ // status should be recorded as successfulen
+ output, err := s.GetOutputFromString("atls", tlsName, "-o", "yaml")
+ assert.Nil(ginkgo.GinkgoT(), err, "Get output of ApisixPluginConfig resource")
+ assert.Contains(ginkgo.GinkgoT(), output, "type: ResourcesAvailable", "status.conditions.type is recorded")
+ assert.Contains(ginkgo.GinkgoT(), output, "reason: ResourcesSynced", "status.conditions.reason is recorded")
+ assert.Contains(ginkgo.GinkgoT(), output, `status: "True"`, "status.conditions.status is recorded")
+ assert.Contains(ginkgo.GinkgoT(), output, "message: Sync Successfully", "status.conditions.message is recorded")
+
+ // No secret exists for use
+ assert.Nil(ginkgo.GinkgoT(), s.NewApisixTls(tlsName, host, "non-existent.com"), "create tls error")
+ time.Sleep(6 * time.Second)
+ // status should be recorded as failed
+ output, err = s.GetOutputFromString("atls", tlsName, "-o", "yaml")
+ assert.Nil(ginkgo.GinkgoT(), err, "Get output of ApisixPluginConfig resource")
+ assert.Contains(ginkgo.GinkgoT(), output, "type: ResourcesAvailable", "status.conditions.type is recorded")
+ assert.Contains(ginkgo.GinkgoT(), output, "reason: ResourceSyncAborted", "status.conditions.reason is recorded")
+ assert.Contains(ginkgo.GinkgoT(), output, `status: "False"`, "status.conditions.status is recorded")
})
+
+ //TODO: ApisixGlobal
+ //TODO: ApisixConsumer CRD missing status definition
+ //TODO: ApisixClusterConfig CRD missing status definition
})
var _ = ginkgo.Describe("suite-ingress-features: Ingress LB Status Testing", func() {