You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by zh...@apache.org on 2022/12/16 14:17:51 UTC
[apisix-ingress-controller] branch master updated: refactor: unified factory and informer (#1530)
This is an automated email from the ASF dual-hosted git repository.
zhangjintao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 4208ca7c refactor: unified factory and informer (#1530)
4208ca7c is described below
commit 4208ca7cef4e54e22544050deed45bd768ad5ffa
Author: Xin Rong <al...@gmail.com>
AuthorDate: Fri Dec 16 22:17:44 2022 +0800
refactor: unified factory and informer (#1530)
Co-authored-by: Jintao Zhang <zh...@apache.org>
---
pkg/providers/apisix/apisix_cluster_config.go | 22 +---
pkg/providers/apisix/apisix_consumer.go | 24 ++--
pkg/providers/apisix/apisix_plugin_config.go | 27 ++--
pkg/providers/apisix/apisix_route.go | 32 ++---
pkg/providers/apisix/apisix_tls.go | 26 ++--
pkg/providers/apisix/apisix_upstream.go | 31 ++---
pkg/providers/apisix/provider.go | 88 ++------------
pkg/providers/apisix/provider_init.go | 23 +---
pkg/providers/controller.go | 142 ++++++++++++++++++----
pkg/providers/ingress/ingress.go | 46 ++-----
pkg/providers/ingress/provider.go | 34 +-----
pkg/providers/k8s/configmap/configmap.go | 14 +--
pkg/providers/k8s/configmap/provider.go | 8 +-
pkg/providers/k8s/namespace/namespace_provider.go | 2 +-
pkg/providers/k8s/pod/provider.go | 4 -
pkg/providers/types/types.go | 66 +++++++---
16 files changed, 239 insertions(+), 350 deletions(-)
diff --git a/pkg/providers/apisix/apisix_cluster_config.go b/pkg/providers/apisix/apisix_cluster_config.go
index 43a1ba69..b9ebcb62 100644
--- a/pkg/providers/apisix/apisix_cluster_config.go
+++ b/pkg/providers/apisix/apisix_cluster_config.go
@@ -43,23 +43,15 @@ type apisixClusterConfigController struct {
workqueue workqueue.RateLimitingInterface
workers int
-
- apisixClusterConfigLister kube.ApisixClusterConfigLister
- apisixClusterConfigInformer cache.SharedIndexInformer
}
-func newApisixClusterConfigController(common *apisixCommon,
- apisixClusterConfigInformer cache.SharedIndexInformer, apisixClusterConfigLister kube.ApisixClusterConfigLister) *apisixClusterConfigController {
+func newApisixClusterConfigController(common *apisixCommon) *apisixClusterConfigController {
c := &apisixClusterConfigController{
apisixCommon: common,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(time.Second, 60*time.Second, 5), "ApisixClusterConfig"),
workers: 1,
-
- apisixClusterConfigLister: apisixClusterConfigLister,
- apisixClusterConfigInformer: apisixClusterConfigInformer,
}
-
- c.apisixClusterConfigInformer.AddEventHandler(
+ c.ApisixClusterConfigInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
UpdateFunc: c.onUpdate,
@@ -74,10 +66,6 @@ func (c *apisixClusterConfigController) run(ctx context.Context) {
defer log.Info("ApisixClusterConfig controller exited")
defer c.workqueue.ShutDown()
- if ok := cache.WaitForCacheSync(ctx.Done(), c.apisixClusterConfigInformer.HasSynced); !ok {
- log.Error("cache sync failed")
- return
- }
for i := 0; i < c.workers; i++ {
go c.runWorker(ctx)
}
@@ -108,9 +96,9 @@ func (c *apisixClusterConfigController) sync(ctx context.Context, ev *types.Even
var multiVersioned kube.ApisixClusterConfig
switch event.GroupVersion {
case config.ApisixV2beta3:
- multiVersioned, err = c.apisixClusterConfigLister.V2beta3(name)
+ multiVersioned, err = c.ApisixClusterConfigLister.V2beta3(name)
case config.ApisixV2:
- multiVersioned, err = c.apisixClusterConfigLister.V2(name)
+ multiVersioned, err = c.ApisixClusterConfigLister.V2(name)
default:
return fmt.Errorf("unsupported ApisixClusterConfig group version %s", event.GroupVersion)
}
@@ -419,7 +407,7 @@ func (c *apisixClusterConfigController) onDelete(obj interface{}) {
}
func (c *apisixClusterConfigController) ResourceSync() {
- objs := c.apisixClusterConfigInformer.GetIndexer().List()
+ objs := c.ApisixClusterConfigInformer.GetIndexer().List()
for _, obj := range objs {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
diff --git a/pkg/providers/apisix/apisix_consumer.go b/pkg/providers/apisix/apisix_consumer.go
index 944befe7..02645379 100644
--- a/pkg/providers/apisix/apisix_consumer.go
+++ b/pkg/providers/apisix/apisix_consumer.go
@@ -42,23 +42,16 @@ type apisixConsumerController struct {
workqueue workqueue.RateLimitingInterface
workers int
-
- apisixConsumerLister kube.ApisixConsumerLister
- apisixConsumerInformer cache.SharedIndexInformer
}
-func newApisixConsumerController(common *apisixCommon,
- apisixConsumerInformer cache.SharedIndexInformer, apisixConsumerLister kube.ApisixConsumerLister) *apisixConsumerController {
+func newApisixConsumerController(common *apisixCommon) *apisixConsumerController {
c := &apisixConsumerController{
apisixCommon: common,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixConsumer"),
workers: 1,
-
- apisixConsumerLister: apisixConsumerLister,
- apisixConsumerInformer: apisixConsumerInformer,
}
- c.apisixConsumerInformer.AddEventHandler(
+ c.ApisixConsumerInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
UpdateFunc: c.onUpdate,
@@ -71,15 +64,12 @@ func newApisixConsumerController(common *apisixCommon,
func (c *apisixConsumerController) run(ctx context.Context) {
log.Info("ApisixConsumer controller started")
defer log.Info("ApisixConsumer controller exited")
- if ok := cache.WaitForCacheSync(ctx.Done(), c.apisixConsumerInformer.HasSynced); !ok {
- log.Error("cache sync failed")
- return
- }
+ defer c.workqueue.ShutDown()
+
for i := 0; i < c.workers; i++ {
go c.runWorker(ctx)
}
<-ctx.Done()
- c.workqueue.ShutDown()
}
func (c *apisixConsumerController) runWorker(ctx context.Context) {
@@ -106,9 +96,9 @@ func (c *apisixConsumerController) sync(ctx context.Context, ev *types.Event) er
var multiVersioned kube.ApisixConsumer
switch event.GroupVersion {
case config.ApisixV2beta3:
- multiVersioned, err = c.apisixConsumerLister.V2beta3(namespace, name)
+ multiVersioned, err = c.ApisixConsumerLister.V2beta3(namespace, name)
case config.ApisixV2:
- multiVersioned, err = c.apisixConsumerLister.V2(namespace, name)
+ multiVersioned, err = c.ApisixConsumerLister.V2(namespace, name)
default:
return fmt.Errorf("unsupported ApisixConsumer group version %s", event.GroupVersion)
}
@@ -334,7 +324,7 @@ func (c *apisixConsumerController) onDelete(obj interface{}) {
}
func (c *apisixConsumerController) ResourceSync() {
- objs := c.apisixConsumerInformer.GetIndexer().List()
+ objs := c.ApisixConsumerInformer.GetIndexer().List()
for _, obj := range objs {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
diff --git a/pkg/providers/apisix/apisix_plugin_config.go b/pkg/providers/apisix/apisix_plugin_config.go
index 9d9ea31e..065ca3e3 100644
--- a/pkg/providers/apisix/apisix_plugin_config.go
+++ b/pkg/providers/apisix/apisix_plugin_config.go
@@ -43,23 +43,16 @@ type apisixPluginConfigController struct {
workqueue workqueue.RateLimitingInterface
workers int
-
- apisixPluginConfigLister kube.ApisixPluginConfigLister
- apisixPluginConfigInformer cache.SharedIndexInformer
}
-func newApisixPluginConfigController(common *apisixCommon,
- apisixPluginConfigInformer cache.SharedIndexInformer, apisixPluginConfigLister kube.ApisixPluginConfigLister) *apisixPluginConfigController {
+func newApisixPluginConfigController(common *apisixCommon) *apisixPluginConfigController {
c := &apisixPluginConfigController{
apisixCommon: common,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixPluginConfig"),
workers: 1,
-
- apisixPluginConfigLister: apisixPluginConfigLister,
- apisixPluginConfigInformer: apisixPluginConfigInformer,
}
- c.apisixPluginConfigInformer.AddEventHandler(
+ c.ApisixPluginConfigInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
UpdateFunc: c.onUpdate,
@@ -74,12 +67,6 @@ func (c *apisixPluginConfigController) run(ctx context.Context) {
defer log.Info("ApisixPluginConfig controller exited")
defer c.workqueue.ShutDown()
- ok := cache.WaitForCacheSync(ctx.Done(), c.apisixPluginConfigInformer.HasSynced)
- if !ok {
- log.Error("cache sync failed")
- return
- }
-
for i := 0; i < c.workers; i++ {
go c.runWorker(ctx)
}
@@ -111,9 +98,9 @@ func (c *apisixPluginConfigController) sync(ctx context.Context, ev *types.Event
)
switch obj.GroupVersion {
case config.ApisixV2beta3:
- apc, err = c.apisixPluginConfigLister.V2beta3(namespace, name)
+ apc, err = c.ApisixPluginConfigLister.V2beta3(namespace, name)
case config.ApisixV2:
- apc, err = c.apisixPluginConfigLister.V2(namespace, name)
+ apc, err = c.ApisixPluginConfigLister.V2(namespace, name)
default:
return fmt.Errorf("unsupported ApisixPluginConfig group version %s", obj.GroupVersion)
}
@@ -242,9 +229,9 @@ func (c *apisixPluginConfigController) handleSyncErr(obj interface{}, errOrigin
var apc kube.ApisixPluginConfig
switch event.GroupVersion {
case config.ApisixV2beta3:
- apc, errLocal = c.apisixPluginConfigLister.V2beta3(namespace, name)
+ apc, errLocal = c.ApisixPluginConfigLister.V2beta3(namespace, name)
case config.ApisixV2:
- apc, errLocal = c.apisixPluginConfigLister.V2(namespace, name)
+ apc, errLocal = c.ApisixPluginConfigLister.V2(namespace, name)
default:
errLocal = fmt.Errorf("unsupported ApisixPluginConfig group version %s", event.GroupVersion)
}
@@ -382,7 +369,7 @@ func (c *apisixPluginConfigController) onDelete(obj interface{}) {
}
func (c *apisixPluginConfigController) ResourceSync() {
- objs := c.apisixPluginConfigInformer.GetIndexer().List()
+ objs := c.ApisixPluginConfigInformer.GetIndexer().List()
for _, obj := range objs {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
diff --git a/pkg/providers/apisix/apisix_route.go b/pkg/providers/apisix/apisix_route.go
index d5d95ac2..2786b8fe 100644
--- a/pkg/providers/apisix/apisix_route.go
+++ b/pkg/providers/apisix/apisix_route.go
@@ -47,11 +47,6 @@ type apisixRouteController struct {
relatedWorkqueue workqueue.RateLimitingInterface
workers int
- svcInformer cache.SharedIndexInformer
- apisixRouteLister kube.ApisixRouteLister
- apisixRouteInformer cache.SharedIndexInformer
- apisixUpstreamInformer cache.SharedIndexInformer
-
svcLock sync.RWMutex
// service key -> apisix route key
svcMap map[string]map[string]struct{}
@@ -66,30 +61,25 @@ type routeEvent struct {
Type string
}
-func newApisixRouteController(common *apisixCommon, apisixRouteInformer cache.SharedIndexInformer, apisixRouteLister kube.ApisixRouteLister) *apisixRouteController {
+func newApisixRouteController(common *apisixCommon) *apisixRouteController {
c := &apisixRouteController{
apisixCommon: common,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixRoute"),
relatedWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixRouteRelated"),
workers: 1,
- svcInformer: common.SvcInformer,
- apisixRouteLister: apisixRouteLister,
- apisixRouteInformer: apisixRouteInformer,
- apisixUpstreamInformer: common.ApisixUpstreamInformer,
-
svcMap: make(map[string]map[string]struct{}),
apisixUpstreamMap: make(map[string]map[string]struct{}),
}
- c.apisixRouteInformer.AddEventHandler(
+ c.ApisixRouteInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
UpdateFunc: c.onUpdate,
DeleteFunc: c.onDelete,
},
)
- c.svcInformer.AddEventHandler(
+ c.SvcInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onSvcAdd,
},
@@ -110,12 +100,6 @@ func (c *apisixRouteController) run(ctx context.Context) {
defer c.workqueue.ShutDown()
defer c.relatedWorkqueue.ShutDown()
- ok := cache.WaitForCacheSync(ctx.Done(), c.apisixRouteInformer.HasSynced, c.svcInformer.HasSynced)
- if !ok {
- log.Error("cache sync failed")
- return
- }
-
for i := 0; i < c.workers; i++ {
go c.runWorker(ctx)
go c.runRelatedWorker(ctx)
@@ -306,9 +290,9 @@ func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error
)
switch obj.GroupVersion {
case config.ApisixV2beta3:
- ar, err = c.apisixRouteLister.V2beta3(namespace, name)
+ ar, err = c.ApisixRouteLister.V2beta3(namespace, name)
case config.ApisixV2:
- ar, err = c.apisixRouteLister.V2(namespace, name)
+ ar, err = c.ApisixRouteLister.V2(namespace, name)
default:
log.Errorw("unknown ApisixRoute version",
zap.String("version", obj.GroupVersion),
@@ -494,9 +478,9 @@ func (c *apisixRouteController) handleSyncErr(obj interface{}, errOrigin error)
var ar kube.ApisixRoute
switch event.GroupVersion {
case config.ApisixV2beta3:
- ar, errLocal = c.apisixRouteLister.V2beta3(namespace, name)
+ ar, errLocal = c.ApisixRouteLister.V2beta3(namespace, name)
case config.ApisixV2:
- ar, errLocal = c.apisixRouteLister.V2(namespace, name)
+ ar, errLocal = c.ApisixRouteLister.V2(namespace, name)
default:
log.Errorw("unknown ApisixRoute version",
zap.String("version", event.GroupVersion),
@@ -641,7 +625,7 @@ func (c *apisixRouteController) onDelete(obj interface{}) {
}
func (c *apisixRouteController) ResourceSync() {
- objs := c.apisixRouteInformer.GetIndexer().List()
+ objs := c.ApisixRouteInformer.GetIndexer().List()
c.svcLock.Lock()
c.apisixUpstreamLock.Lock()
diff --git a/pkg/providers/apisix/apisix_tls.go b/pkg/providers/apisix/apisix_tls.go
index d648fbbb..c5714cfc 100644
--- a/pkg/providers/apisix/apisix_tls.go
+++ b/pkg/providers/apisix/apisix_tls.go
@@ -46,10 +46,6 @@ type apisixTlsController struct {
workqueue workqueue.RateLimitingInterface
workers int
- secretInformer cache.SharedIndexInformer
- apisixTlsLister kube.ApisixTlsLister
- apisixTlsInformer cache.SharedIndexInformer
-
// secretSSLMap stores reference from K8s secret to ApisixTls
// type: Map<SecretKey, Map<ApisixTlsKey, SSL object in APISIX>>
// SecretKey -> ApisixTlsKey -> SSL object in APISIX
@@ -57,20 +53,16 @@ type apisixTlsController struct {
secretSSLMap *sync.Map
}
-func newApisixTlsController(common *apisixCommon, apisixTlsInformer cache.SharedIndexInformer, apisixTlsLister kube.ApisixTlsLister) *apisixTlsController {
+func newApisixTlsController(common *apisixCommon) *apisixTlsController {
c := &apisixTlsController{
apisixCommon: common,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixTls"),
workers: 1,
- secretInformer: common.SecretInformer,
- apisixTlsLister: apisixTlsLister,
- apisixTlsInformer: apisixTlsInformer,
-
secretSSLMap: new(sync.Map),
}
- c.apisixTlsInformer.AddEventHandler(
+ c.ApisixTlsInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
UpdateFunc: c.onUpdate,
@@ -85,10 +77,6 @@ func (c *apisixTlsController) run(ctx context.Context) {
defer log.Info("ApisixTls controller exited")
defer c.workqueue.ShutDown()
- if ok := cache.WaitForCacheSync(ctx.Done(), c.apisixTlsInformer.HasSynced, c.secretInformer.HasSynced); !ok {
- log.Errorf("informers sync failed")
- return
- }
for i := 0; i < c.workers; i++ {
go c.runWorker(ctx)
}
@@ -120,9 +108,9 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
var multiVersionedTls kube.ApisixTls
switch event.GroupVersion {
case config.ApisixV2beta3:
- multiVersionedTls, err = c.apisixTlsLister.V2beta3(namespace, name)
+ multiVersionedTls, err = c.ApisixTlsLister.V2beta3(namespace, name)
case config.ApisixV2:
- multiVersionedTls, err = c.apisixTlsLister.V2(namespace, name)
+ multiVersionedTls, err = c.ApisixTlsLister.V2(namespace, name)
default:
return fmt.Errorf("unsupported ApisixTls group version %s", event.GroupVersion)
}
@@ -385,7 +373,7 @@ func (c *apisixTlsController) onDelete(obj interface{}) {
}
func (c *apisixTlsController) ResourceSync() {
- objs := c.apisixTlsInformer.GetIndexer().List()
+ objs := c.ApisixTlsInformer.GetIndexer().List()
for _, obj := range objs {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
@@ -501,7 +489,7 @@ func (c *apisixTlsController) syncSSLsAndUpdateStatusV2beta3(ctx context.Context
return true
}
- multiVersioned, err := c.apisixTlsLister.V2beta3(tlsNamespace, tlsName)
+ multiVersioned, err := c.ApisixTlsLister.V2beta3(tlsNamespace, tlsName)
if err != nil {
log.Warnw("secret related ApisixTls resource not found, skip",
zap.String("ApisixTls", tlsMetaKey),
@@ -588,7 +576,7 @@ func (c *apisixTlsController) syncSSLsAndUpdateStatusV2(ctx context.Context, ev
return true
}
- multiVersioned, err := c.apisixTlsLister.V2(tlsNamespace, tlsName)
+ multiVersioned, err := c.ApisixTlsLister.V2(tlsNamespace, tlsName)
if err != nil {
log.Warnw("secret related ApisixTls resource not found, skip",
zap.String("ApisixTls", tlsMetaKey),
diff --git a/pkg/providers/apisix/apisix_upstream.go b/pkg/providers/apisix/apisix_upstream.go
index 0db5ae44..39803238 100644
--- a/pkg/providers/apisix/apisix_upstream.go
+++ b/pkg/providers/apisix/apisix_upstream.go
@@ -26,7 +26,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
- listerscorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
@@ -48,11 +47,6 @@ type apisixUpstreamController struct {
svcWorkqueue workqueue.RateLimitingInterface
workers int
- svcInformer cache.SharedIndexInformer
- svcLister listerscorev1.ServiceLister
- apisixUpstreamInformer cache.SharedIndexInformer
- apisixUpstreamLister kube.ApisixUpstreamLister
-
externalSvcLock sync.RWMutex
// external name service name -> apisix upstream name
externalServiceMap map[string]map[string]struct{}
@@ -69,23 +63,18 @@ func newApisixUpstreamController(common *apisixCommon, notifyApisixUpstreamChang
svcWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixUpstreamService"),
workers: 1,
- svcInformer: common.SvcInformer,
- svcLister: common.SvcLister,
- apisixUpstreamLister: common.ApisixUpstreamLister,
- apisixUpstreamInformer: common.ApisixUpstreamInformer,
-
externalServiceMap: make(map[string]map[string]struct{}),
notifyApisixUpstreamChange: notifyApisixUpstreamChange,
}
- c.apisixUpstreamInformer.AddEventHandler(
+ c.ApisixUpstreamInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
UpdateFunc: c.onUpdate,
DeleteFunc: c.onDelete,
},
)
- c.svcInformer.AddEventHandler(
+ c.SvcInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onSvcAdd,
UpdateFunc: c.onSvcUpdate,
@@ -101,10 +90,6 @@ func (c *apisixUpstreamController) run(ctx context.Context) {
defer c.workqueue.ShutDown()
defer c.svcWorkqueue.ShutDown()
- if ok := cache.WaitForCacheSync(ctx.Done(), c.apisixUpstreamInformer.HasSynced, c.svcInformer.HasSynced); !ok {
- log.Error("cache sync failed")
- return
- }
for i := 0; i < c.workers; i++ {
go c.runWorker(ctx)
go c.runSvcWorker(ctx)
@@ -154,9 +139,9 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
var multiVersioned kube.ApisixUpstream
switch event.GroupVersion {
case config.ApisixV2beta3:
- multiVersioned, err = c.apisixUpstreamLister.V2beta3(namespace, name)
+ multiVersioned, err = c.ApisixUpstreamLister.V2beta3(namespace, name)
case config.ApisixV2:
- multiVersioned, err = c.apisixUpstreamLister.V2(namespace, name)
+ multiVersioned, err = c.ApisixUpstreamLister.V2(namespace, name)
default:
return fmt.Errorf("unsupported ApisixUpstream group version %s", event.GroupVersion)
}
@@ -204,7 +189,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
}
}
- svc, err := c.svcLister.Services(namespace).Get(name)
+ svc, err := c.SvcLister.Services(namespace).Get(name)
if err != nil {
log.Errorf("failed to get service %s: %s", key, err)
c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
@@ -323,7 +308,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
}
}
- svc, err := c.svcLister.Services(namespace).Get(name)
+ svc, err := c.SvcLister.Services(namespace).Get(name)
if err != nil {
log.Errorf("failed to get service %s: %s", key, err)
c.RecordEvent(au, corev1.EventTypeWarning, utils.ResourceSyncAborted, err)
@@ -652,7 +637,7 @@ func (c *apisixUpstreamController) onDelete(obj interface{}) {
}
func (c *apisixUpstreamController) ResourceSync() {
- objs := c.apisixUpstreamInformer.GetIndexer().List()
+ objs := c.ApisixUpstreamInformer.GetIndexer().List()
for _, obj := range objs {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
@@ -791,7 +776,7 @@ func (c *apisixUpstreamController) handleSvcChange(ctx context.Context, key stri
if err != nil {
return err
}
- au, err := c.apisixUpstreamLister.V2(ns, name)
+ au, err := c.ApisixUpstreamLister.V2(ns, name)
if err != nil {
return err
}
diff --git a/pkg/providers/apisix/provider.go b/pkg/providers/apisix/provider.go
index 9a8d2a2a..150da89c 100644
--- a/pkg/providers/apisix/provider.go
+++ b/pkg/providers/apisix/provider.go
@@ -18,14 +18,9 @@ package apisix
import (
"context"
- "fmt"
corev1 "k8s.io/api/core/v1"
- "k8s.io/client-go/tools/cache"
- "github.com/apache/apisix-ingress-controller/pkg/config"
- "github.com/apache/apisix-ingress-controller/pkg/kube"
- "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/informers/externalversions"
apisixtranslation "github.com/apache/apisix-ingress-controller/pkg/providers/apisix/translation"
"github.com/apache/apisix-ingress-controller/pkg/providers/k8s/namespace"
"github.com/apache/apisix-ingress-controller/pkg/providers/translation"
@@ -70,14 +65,6 @@ type apisixProvider struct {
apisixClusterConfigController *apisixClusterConfigController
apisixConsumerController *apisixConsumerController
apisixPluginConfigController *apisixPluginConfigController
-
- apisixRouteInformer cache.SharedIndexInformer
- apisixClusterConfigInformer cache.SharedIndexInformer
- apisixConsumerInformer cache.SharedIndexInformer
- apisixPluginConfigInformer cache.SharedIndexInformer
- apisixTlsInformer cache.SharedIndexInformer
-
- apisixSharedInformerFactory externalversions.SharedInformerFactory
}
func NewProvider(common *providertypes.Common, namespaceProvider namespace.WatchingNamespaceProvider,
@@ -88,15 +75,11 @@ func NewProvider(common *providertypes.Common, namespaceProvider namespace.Watch
namespaceProvider: namespaceProvider,
}
- apisixFactory := common.KubeClient.NewAPISIXSharedIndexInformerFactory()
- p.apisixSharedInformerFactory = apisixFactory
-
p.apisixTranslator = apisixtranslation.NewApisixTranslator(&apisixtranslation.TranslatorOptions{
- Apisix: common.APISIX,
- ClusterName: common.Config.APISIX.DefaultClusterName,
-
- ApisixUpstreamLister: common.ApisixUpstreamLister,
+ Apisix: common.APISIX,
+ ClusterName: common.Config.APISIX.DefaultClusterName,
ServiceLister: common.SvcLister,
+ ApisixUpstreamLister: common.ApisixUpstreamLister,
SecretLister: common.SecretLister,
}, translator)
c := &apisixCommon{
@@ -105,51 +88,12 @@ func NewProvider(common *providertypes.Common, namespaceProvider namespace.Watch
translator: p.apisixTranslator,
}
- switch c.Config.Kubernetes.APIVersion {
- case config.ApisixV2beta3:
- p.apisixRouteInformer = apisixFactory.Apisix().V2beta3().ApisixRoutes().Informer()
- p.apisixTlsInformer = apisixFactory.Apisix().V2beta3().ApisixTlses().Informer()
- p.apisixClusterConfigInformer = apisixFactory.Apisix().V2beta3().ApisixClusterConfigs().Informer()
- p.apisixConsumerInformer = apisixFactory.Apisix().V2beta3().ApisixConsumers().Informer()
- p.apisixPluginConfigInformer = apisixFactory.Apisix().V2beta3().ApisixPluginConfigs().Informer()
-
- case config.ApisixV2:
- p.apisixRouteInformer = apisixFactory.Apisix().V2().ApisixRoutes().Informer()
- p.apisixTlsInformer = apisixFactory.Apisix().V2().ApisixTlses().Informer()
- p.apisixClusterConfigInformer = apisixFactory.Apisix().V2().ApisixClusterConfigs().Informer()
- p.apisixConsumerInformer = apisixFactory.Apisix().V2().ApisixConsumers().Informer()
- p.apisixPluginConfigInformer = apisixFactory.Apisix().V2().ApisixPluginConfigs().Informer()
- default:
- panic(fmt.Errorf("unsupported API version %v", c.Config.Kubernetes.APIVersion))
- }
-
- apisixRouteLister := kube.NewApisixRouteLister(
- apisixFactory.Apisix().V2beta3().ApisixRoutes().Lister(),
- apisixFactory.Apisix().V2().ApisixRoutes().Lister(),
- )
- apisixTlsLister := kube.NewApisixTlsLister(
- apisixFactory.Apisix().V2beta3().ApisixTlses().Lister(),
- apisixFactory.Apisix().V2().ApisixTlses().Lister(),
- )
- apisixClusterConfigLister := kube.NewApisixClusterConfigLister(
- apisixFactory.Apisix().V2beta3().ApisixClusterConfigs().Lister(),
- apisixFactory.Apisix().V2().ApisixClusterConfigs().Lister(),
- )
- apisixConsumerLister := kube.NewApisixConsumerLister(
- apisixFactory.Apisix().V2beta3().ApisixConsumers().Lister(),
- apisixFactory.Apisix().V2().ApisixConsumers().Lister(),
- )
- apisixPluginConfigLister := kube.NewApisixPluginConfigLister(
- apisixFactory.Apisix().V2beta3().ApisixPluginConfigs().Lister(),
- apisixFactory.Apisix().V2().ApisixPluginConfigs().Lister(),
- )
-
p.apisixUpstreamController = newApisixUpstreamController(c, p.NotifyApisixUpstreamChange)
- p.apisixRouteController = newApisixRouteController(c, p.apisixRouteInformer, apisixRouteLister)
- p.apisixTlsController = newApisixTlsController(c, p.apisixTlsInformer, apisixTlsLister)
- p.apisixClusterConfigController = newApisixClusterConfigController(c, p.apisixClusterConfigInformer, apisixClusterConfigLister)
- p.apisixConsumerController = newApisixConsumerController(c, p.apisixConsumerInformer, apisixConsumerLister)
- p.apisixPluginConfigController = newApisixPluginConfigController(c, p.apisixPluginConfigInformer, apisixPluginConfigLister)
+ p.apisixRouteController = newApisixRouteController(c)
+ p.apisixTlsController = newApisixTlsController(c)
+ p.apisixClusterConfigController = newApisixClusterConfigController(c)
+ p.apisixConsumerController = newApisixConsumerController(c)
+ p.apisixPluginConfigController = newApisixPluginConfigController(c)
return p, p.apisixTranslator, nil
}
@@ -157,22 +101,6 @@ func NewProvider(common *providertypes.Common, namespaceProvider namespace.Watch
func (p *apisixProvider) Run(ctx context.Context) {
e := utils.ParallelExecutor{}
- e.Add(func() {
- p.apisixRouteInformer.Run(ctx.Done())
- })
- e.Add(func() {
- p.apisixTlsInformer.Run(ctx.Done())
- })
- e.Add(func() {
- p.apisixClusterConfigInformer.Run(ctx.Done())
- })
- e.Add(func() {
- p.apisixConsumerInformer.Run(ctx.Done())
- })
- e.Add(func() {
- p.apisixPluginConfigInformer.Run(ctx.Done())
- })
-
e.Add(func() {
p.apisixUpstreamController.run(ctx)
})
diff --git a/pkg/providers/apisix/provider_init.go b/pkg/providers/apisix/provider_init.go
index bbcf9989..3eef900f 100644
--- a/pkg/providers/apisix/provider_init.go
+++ b/pkg/providers/apisix/provider_init.go
@@ -16,7 +16,6 @@ package apisix
import (
"context"
- "fmt"
"sync"
"go.uber.org/zap"
@@ -49,16 +48,6 @@ func (p *apisixProvider) Init(ctx context.Context) error {
pluginConfigMapA6 = make(map[string]string)
)
- p.apisixSharedInformerFactory.Start(ctx.Done())
- synced := p.apisixSharedInformerFactory.WaitForCacheSync(ctx.Done())
- for v, ok := range synced {
- if !ok {
- err := fmt.Errorf("%s cache failed to sync", v.Name())
- log.Error(err.Error())
- return err
- }
- }
-
namespaces := p.namespaceProvider.WatchingNamespaces()
for _, key := range namespaces {
@@ -69,7 +58,7 @@ func (p *apisixProvider) Init(ctx context.Context) error {
// ApisixRoute
switch p.common.Config.Kubernetes.APIVersion {
case config.ApisixV2beta3:
- retRoutes, err := p.apisixSharedInformerFactory.Apisix().V2beta3().ApisixRoutes().Lister().ApisixRoutes(ns).List(labels.Everything())
+ retRoutes, err := p.common.ApisixRouteLister.V2beta3Lister().List(labels.Everything())
if err != nil {
log.Error(err.Error())
ctx.Done()
@@ -104,7 +93,7 @@ func (p *apisixProvider) Init(ctx context.Context) error {
}
}
case config.ApisixV2:
- retRoutes, err := p.apisixSharedInformerFactory.Apisix().V2().ApisixRoutes().Lister().ApisixRoutes(ns).List(labels.Everything())
+ retRoutes, err := p.common.ApisixRouteLister.V2Lister().List(labels.Everything())
if err != nil {
log.Error(err.Error())
ctx.Done()
@@ -149,7 +138,7 @@ func (p *apisixProvider) Init(ctx context.Context) error {
switch p.common.Config.Kubernetes.APIVersion {
case config.ApisixV2beta3:
// ApisixConsumer
- retConsumer, err := p.apisixSharedInformerFactory.Apisix().V2beta3().ApisixConsumers().Lister().ApisixConsumers(ns).List(labels.Everything())
+ retConsumer, err := p.common.ApisixFactory.Apisix().V2beta3().ApisixConsumers().Lister().List(labels.Everything())
if err != nil {
log.Error(err.Error())
ctx.Done()
@@ -165,7 +154,7 @@ func (p *apisixProvider) Init(ctx context.Context) error {
}
}
// ApisixTls
- retSSL, err := p.apisixSharedInformerFactory.Apisix().V2beta3().ApisixTlses().Lister().ApisixTlses(ns).List(labels.Everything())
+ retSSL, err := p.common.ApisixFactory.Apisix().V2beta3().ApisixTlses().Lister().ApisixTlses(ns).List(labels.Everything())
if err != nil {
log.Error(err.Error())
ctx.Done()
@@ -182,7 +171,7 @@ func (p *apisixProvider) Init(ctx context.Context) error {
}
case config.ApisixV2:
// ApisixConsumer
- retConsumer, err := p.apisixSharedInformerFactory.Apisix().V2().ApisixConsumers().Lister().ApisixConsumers(ns).List(labels.Everything())
+ retConsumer, err := p.common.ApisixFactory.Apisix().V2().ApisixConsumers().Lister().ApisixConsumers(ns).List(labels.Everything())
if err != nil {
log.Error(err.Error())
ctx.Done()
@@ -198,7 +187,7 @@ func (p *apisixProvider) Init(ctx context.Context) error {
}
}
// ApisixTls
- retSSL, err := p.apisixSharedInformerFactory.Apisix().V2().ApisixTlses().Lister().ApisixTlses(ns).List(labels.Everything())
+ retSSL, err := p.common.ApisixFactory.Apisix().V2().ApisixTlses().Lister().ApisixTlses(ns).List(labels.Everything())
if err != nil {
log.Error(err.Error())
ctx.Done()
diff --git a/pkg/providers/controller.go b/pkg/providers/controller.go
index c3f563ff..592d49cb 100644
--- a/pkg/providers/controller.go
+++ b/pkg/providers/controller.go
@@ -27,6 +27,9 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
+ extensionsv1beta1 "k8s.io/client-go/listers/extensions/v1beta1"
+ networkingv1 "k8s.io/client-go/listers/networking/v1"
+ networkingv1beta1 "k8s.io/client-go/listers/networking/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
@@ -37,6 +40,8 @@ import (
"github.com/apache/apisix-ingress-controller/pkg/config"
"github.com/apache/apisix-ingress-controller/pkg/kube"
apisixscheme "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned/scheme"
+ v2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v2"
+ "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v2beta3"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/metrics"
apisixprovider "github.com/apache/apisix-ingress-controller/pkg/providers/apisix"
@@ -217,26 +222,80 @@ func (c *Controller) initSharedInformers() *providertypes.ListerInformer {
kubeFactory := c.kubeClient.NewSharedIndexInformerFactory()
apisixFactory := c.kubeClient.NewAPISIXSharedIndexInformerFactory()
- epLister, epInformer := kube.NewEndpointListerAndInformer(kubeFactory, c.cfg.Kubernetes.WatchEndpointSlices)
- svcInformer := kubeFactory.Core().V1().Services().Informer()
- svcLister := kubeFactory.Core().V1().Services().Lister()
+ var (
+ ingressInformer cache.SharedIndexInformer
+
+ ingressListerV1 networkingv1.IngressLister
+ ingressListerV1beta1 networkingv1beta1.IngressLister
+ ingressListerExtensionsV1beta1 extensionsv1beta1.IngressLister
+ )
var (
- apisixUpstreamInformer cache.SharedIndexInformer
+ apisixUpstreamInformer cache.SharedIndexInformer
+ apisixRouteInformer cache.SharedIndexInformer
+ apisixPluginConfigInformer cache.SharedIndexInformer
+ apisixConsumerInformer cache.SharedIndexInformer
+ apisixTlsInformer cache.SharedIndexInformer
+ apisixClusterConfigInformer cache.SharedIndexInformer
+
+ apisixRouteListerV2beta3 v2beta3.ApisixRouteLister
+ apisixUpstreamListerV2beta3 v2beta3.ApisixUpstreamLister
+ apisixTlsListerV2beta3 v2beta3.ApisixTlsLister
+ apisixClusterConfigListerV2beta3 v2beta3.ApisixClusterConfigLister
+ apisixConsumerListerV2beta3 v2beta3.ApisixConsumerLister
+ apisixPluginConfigListerV2beta3 v2beta3.ApisixPluginConfigLister
+
+ apisixRouteListerV2 v2.ApisixRouteLister
+ apisixUpstreamListerV2 v2.ApisixUpstreamLister
+ apisixTlsListerV2 v2.ApisixTlsLister
+ apisixClusterConfigListerV2 v2.ApisixClusterConfigLister
+ apisixConsumerListerV2 v2.ApisixConsumerLister
+ apisixPluginConfigListerV2 v2.ApisixPluginConfigLister
)
+
switch c.cfg.Kubernetes.APIVersion {
case config.ApisixV2beta3:
+ apisixRouteInformer = apisixFactory.Apisix().V2beta3().ApisixRoutes().Informer()
+ apisixTlsInformer = apisixFactory.Apisix().V2beta3().ApisixTlses().Informer()
+ apisixClusterConfigInformer = apisixFactory.Apisix().V2beta3().ApisixClusterConfigs().Informer()
+ apisixConsumerInformer = apisixFactory.Apisix().V2beta3().ApisixConsumers().Informer()
+ apisixPluginConfigInformer = apisixFactory.Apisix().V2beta3().ApisixPluginConfigs().Informer()
apisixUpstreamInformer = apisixFactory.Apisix().V2beta3().ApisixUpstreams().Informer()
+
+ apisixRouteListerV2beta3 = apisixFactory.Apisix().V2beta3().ApisixRoutes().Lister()
+ apisixUpstreamListerV2beta3 = apisixFactory.Apisix().V2beta3().ApisixUpstreams().Lister()
+ apisixTlsListerV2beta3 = apisixFactory.Apisix().V2beta3().ApisixTlses().Lister()
+ apisixClusterConfigListerV2beta3 = apisixFactory.Apisix().V2beta3().ApisixClusterConfigs().Lister()
+ apisixConsumerListerV2beta3 = apisixFactory.Apisix().V2beta3().ApisixConsumers().Lister()
+ apisixPluginConfigListerV2beta3 = apisixFactory.Apisix().V2beta3().ApisixPluginConfigs().Lister()
case config.ApisixV2:
+ apisixRouteInformer = apisixFactory.Apisix().V2().ApisixRoutes().Informer()
+ apisixTlsInformer = apisixFactory.Apisix().V2().ApisixTlses().Informer()
+ apisixClusterConfigInformer = apisixFactory.Apisix().V2().ApisixClusterConfigs().Informer()
+ apisixConsumerInformer = apisixFactory.Apisix().V2().ApisixConsumers().Informer()
+ apisixPluginConfigInformer = apisixFactory.Apisix().V2().ApisixPluginConfigs().Informer()
apisixUpstreamInformer = apisixFactory.Apisix().V2().ApisixUpstreams().Informer()
+
+ apisixRouteListerV2 = apisixFactory.Apisix().V2().ApisixRoutes().Lister()
+ apisixUpstreamListerV2 = apisixFactory.Apisix().V2().ApisixUpstreams().Lister()
+ apisixTlsListerV2 = apisixFactory.Apisix().V2().ApisixTlses().Lister()
+ apisixClusterConfigListerV2 = apisixFactory.Apisix().V2().ApisixClusterConfigs().Lister()
+ apisixConsumerListerV2 = apisixFactory.Apisix().V2().ApisixConsumers().Lister()
+ apisixPluginConfigListerV2 = apisixFactory.Apisix().V2().ApisixPluginConfigs().Lister()
default:
panic(fmt.Errorf("unsupported API version %v", c.cfg.Kubernetes.APIVersion))
}
- apisixUpstreamLister := kube.NewApisixUpstreamLister(
- apisixFactory.Apisix().V2beta3().ApisixUpstreams().Lister(),
- apisixFactory.Apisix().V2().ApisixUpstreams().Lister(),
- )
+ apisixUpstreamLister := kube.NewApisixUpstreamLister(apisixUpstreamListerV2beta3, apisixUpstreamListerV2)
+ apisixRouteLister := kube.NewApisixRouteLister(apisixRouteListerV2beta3, apisixRouteListerV2)
+ apisixTlsLister := kube.NewApisixTlsLister(apisixTlsListerV2beta3, apisixTlsListerV2)
+ apisixClusterConfigLister := kube.NewApisixClusterConfigLister(apisixClusterConfigListerV2beta3, apisixClusterConfigListerV2)
+ apisixConsumerLister := kube.NewApisixConsumerLister(apisixConsumerListerV2beta3, apisixConsumerListerV2)
+ apisixPluginConfigLister := kube.NewApisixPluginConfigLister(apisixPluginConfigListerV2beta3, apisixPluginConfigListerV2)
+
+ epLister, epInformer := kube.NewEndpointListerAndInformer(kubeFactory, c.cfg.Kubernetes.WatchEndpointSlices)
+ svcInformer := kubeFactory.Core().V1().Services().Informer()
+ svcLister := kubeFactory.Core().V1().Services().Lister()
podInformer := kubeFactory.Core().V1().Pods().Informer()
podLister := kubeFactory.Core().V1().Pods().Lister()
@@ -247,19 +306,50 @@ func (c *Controller) initSharedInformers() *providertypes.ListerInformer {
configmapInformer := kubeFactory.Core().V1().ConfigMaps().Informer()
configmapLister := kubeFactory.Core().V1().ConfigMaps().Lister()
+ switch c.cfg.Kubernetes.IngressVersion {
+ case config.IngressNetworkingV1:
+ ingressInformer = kubeFactory.Networking().V1().Ingresses().Informer()
+ ingressListerV1 = kubeFactory.Networking().V1().Ingresses().Lister()
+ case config.IngressNetworkingV1beta1:
+ ingressInformer = kubeFactory.Networking().V1beta1().Ingresses().Informer()
+ ingressListerV1beta1 = kubeFactory.Networking().V1beta1().Ingresses().Lister()
+ default:
+ ingressInformer = kubeFactory.Extensions().V1beta1().Ingresses().Informer()
+ ingressListerExtensionsV1beta1 = kubeFactory.Extensions().V1beta1().Ingresses().Lister()
+ }
+
+ ingressLister := kube.NewIngressLister(ingressListerV1, ingressListerV1beta1, ingressListerExtensionsV1beta1)
+
listerInformer := &providertypes.ListerInformer{
- EpLister: epLister,
- EpInformer: epInformer,
- SvcLister: svcLister,
- SvcInformer: svcInformer,
- SecretLister: secretLister,
- SecretInformer: secretInformer,
- PodLister: podLister,
- PodInformer: podInformer,
- ApisixUpstreamLister: apisixUpstreamLister,
- ApisixUpstreamInformer: apisixUpstreamInformer,
- ConfigMapInformer: configmapInformer,
- ConfigMapLister: configmapLister,
+ ApisixFactory: apisixFactory,
+ KubeFactory: kubeFactory,
+
+ EpLister: epLister,
+ EpInformer: epInformer,
+ SvcLister: svcLister,
+ SvcInformer: svcInformer,
+ SecretLister: secretLister,
+ SecretInformer: secretInformer,
+ PodLister: podLister,
+ PodInformer: podInformer,
+ ConfigMapInformer: configmapInformer,
+ ConfigMapLister: configmapLister,
+ IngressInformer: ingressInformer,
+ IngressLister: ingressLister,
+
+ ApisixUpstreamLister: apisixUpstreamLister,
+ ApisixRouteLister: apisixRouteLister,
+ ApisixConsumerLister: apisixConsumerLister,
+ ApisixTlsLister: apisixTlsLister,
+ ApisixPluginConfigLister: apisixPluginConfigLister,
+ ApisixClusterConfigLister: apisixClusterConfigLister,
+
+ ApisixUpstreamInformer: apisixUpstreamInformer,
+ ApisixPluginConfigInformer: apisixPluginConfigInformer,
+ ApisixRouteInformer: apisixRouteInformer,
+ ApisixClusterConfigInformer: apisixClusterConfigInformer,
+ ApisixConsumerInformer: apisixConsumerInformer,
+ ApisixTlsInformer: apisixTlsInformer,
}
return listerInformer
@@ -380,6 +470,14 @@ func (c *Controller) run(ctx context.Context) {
ctx.Done()
return
}
+
+ // Wait Resouce sync
+ if ok := c.informers.StartAndWaitForCacheSync(ctx); !ok {
+ ctx.Done()
+ return
+ }
+
+ // Compare resource
if err = c.apisixProvider.Init(ctx); err != nil {
ctx.Done()
return
@@ -393,10 +491,6 @@ func (c *Controller) run(ctx context.Context) {
c.checkClusterHealth(ctx, cancelFunc)
})
- e.Add(func() {
- c.informers.Run(ctx)
- })
-
e.Add(func() {
c.namespaceProvider.Run(ctx)
})
diff --git a/pkg/providers/ingress/ingress.go b/pkg/providers/ingress/ingress.go
index 5dedaef0..e82aa41a 100644
--- a/pkg/providers/ingress/ingress.go
+++ b/pkg/providers/ingress/ingress.go
@@ -50,13 +50,6 @@ type ingressController struct {
workqueue workqueue.RateLimitingInterface
workers int
- ingressLister kube.IngressLister
- ingressInformer cache.SharedIndexInformer
-
- secretInformer cache.SharedIndexInformer
- endpointInformer cache.SharedIndexInformer
- serviceInformer cache.SharedIndexInformer
-
// secretSSLMap stores reference from K8s secret to Ingress
// type: Map<SecretKey, Map<IngressVersionKey, SSL in APISIX>>
// SecretKey -> IngressVersionKey -> []string
@@ -65,24 +58,17 @@ type ingressController struct {
secretSSLMap *sync.Map
}
-func newIngressController(common *ingressCommon, ingressLister kube.IngressLister, ingressInformer, secretInformer, endpointInformer, serviceInformer cache.SharedIndexInformer) *ingressController {
+func newIngressController(common *ingressCommon) *ingressController {
c := &ingressController{
ingressCommon: common,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ingress"),
workers: 1,
- ingressLister: ingressLister,
- ingressInformer: ingressInformer,
-
- secretInformer: secretInformer,
- endpointInformer: endpointInformer,
- serviceInformer: serviceInformer,
-
secretSSLMap: new(sync.Map),
}
- c.ingressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ c.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
UpdateFunc: c.onUpdate,
DeleteFunc: c.OnDelete,
@@ -95,14 +81,6 @@ func (c *ingressController) run(ctx context.Context) {
defer log.Infof("ingress controller exited")
defer c.workqueue.ShutDown()
- if !cache.WaitForCacheSync(ctx.Done(),
- c.ingressInformer.HasSynced,
- c.secretInformer.HasSynced,
- c.serviceInformer.HasSynced,
- c.endpointInformer.HasSynced) {
- log.Errorf("cache sync failed")
- return
- }
for i := 0; i < c.workers; i++ {
go c.runWorker(ctx)
}
@@ -132,11 +110,11 @@ func (c *ingressController) sync(ctx context.Context, ev *types.Event) error {
var ing kube.Ingress
switch ingEv.GroupVersion {
case kube.IngressV1:
- ing, err = c.ingressLister.V1(namespace, name)
+ ing, err = c.IngressLister.V1(namespace, name)
case kube.IngressV1beta1:
- ing, err = c.ingressLister.V1beta1(namespace, name)
+ ing, err = c.IngressLister.V1beta1(namespace, name)
case kube.IngressExtensionsV1beta1:
- ing, err = c.ingressLister.ExtensionsV1beta1(namespace, name)
+ ing, err = c.IngressLister.ExtensionsV1beta1(namespace, name)
default:
err = fmt.Errorf("unsupported group version %s, one of (%s/%s/%s) is expected", ingEv.GroupVersion,
kube.IngressV1, kube.IngressV1beta1, kube.IngressExtensionsV1beta1)
@@ -258,11 +236,11 @@ func (c *ingressController) handleSyncErr(obj interface{}, err error) {
var ing kube.Ingress
switch event.GroupVersion {
case kube.IngressV1:
- ing, errLocal = c.ingressLister.V1(namespace, name)
+ ing, errLocal = c.IngressLister.V1(namespace, name)
case kube.IngressV1beta1:
- ing, errLocal = c.ingressLister.V1beta1(namespace, name)
+ ing, errLocal = c.IngressLister.V1beta1(namespace, name)
case kube.IngressExtensionsV1beta1:
- ing, errLocal = c.ingressLister.ExtensionsV1beta1(namespace, name)
+ ing, errLocal = c.IngressLister.ExtensionsV1beta1(namespace, name)
}
if err == nil {
@@ -453,7 +431,7 @@ func (c *ingressController) isIngressEffective(ing kube.Ingress) bool {
}
func (c *ingressController) ResourceSync() {
- objs := c.ingressInformer.GetIndexer().List()
+ objs := c.IngressInformer.GetIndexer().List()
for _, obj := range objs {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
@@ -614,13 +592,13 @@ func (c *ingressController) syncSSLs(ctx context.Context, evType types.EventType
)
switch ingressVersion {
case kube.IngressV1:
- ing, err = c.ingressLister.V1(ingressNamespace, ingressName)
+ ing, err = c.IngressLister.V1(ingressNamespace, ingressName)
obj = ing.V1()
case kube.IngressV1beta1:
- ing, err = c.ingressLister.V1(ingressNamespace, ingressName)
+ ing, err = c.IngressLister.V1(ingressNamespace, ingressName)
obj = ing.V1beta1()
case kube.IngressExtensionsV1beta1:
- ing, err = c.ingressLister.V1(ingressNamespace, ingressName)
+ ing, err = c.IngressLister.V1(ingressNamespace, ingressName)
obj = ing.ExtensionsV1beta1()
}
if err != nil {
diff --git a/pkg/providers/ingress/provider.go b/pkg/providers/ingress/provider.go
index c5e75b56..15e19db3 100644
--- a/pkg/providers/ingress/provider.go
+++ b/pkg/providers/ingress/provider.go
@@ -20,10 +20,7 @@ import (
"context"
corev1 "k8s.io/api/core/v1"
- "k8s.io/client-go/tools/cache"
- "github.com/apache/apisix-ingress-controller/pkg/config"
- "github.com/apache/apisix-ingress-controller/pkg/kube"
apisixtranslation "github.com/apache/apisix-ingress-controller/pkg/providers/apisix/translation"
ingresstranslation "github.com/apache/apisix-ingress-controller/pkg/providers/ingress/translation"
"github.com/apache/apisix-ingress-controller/pkg/providers/k8s/namespace"
@@ -58,11 +55,6 @@ type ingressProvider struct {
name string
ingressController *ingressController
-
- ingressInformer cache.SharedIndexInformer
- serviceInformer cache.SharedIndexInformer
- endpointInformer cache.SharedIndexInformer
- secretInformer cache.SharedIndexInformer
}
func NewProvider(common *providertypes.Common, namespaceProvider namespace.WatchingNamespaceProvider,
@@ -71,26 +63,6 @@ func NewProvider(common *providertypes.Common, namespaceProvider namespace.Watch
name: ProviderName,
}
- kubeFactory := common.KubeClient.NewSharedIndexInformerFactory()
- switch common.Config.Kubernetes.IngressVersion {
- case config.IngressNetworkingV1:
- p.ingressInformer = kubeFactory.Networking().V1().Ingresses().Informer()
- case config.IngressNetworkingV1beta1:
- p.ingressInformer = kubeFactory.Networking().V1beta1().Ingresses().Informer()
- default:
- p.ingressInformer = kubeFactory.Extensions().V1beta1().Ingresses().Informer()
- }
-
- p.endpointInformer = common.EpInformer
- p.serviceInformer = common.SvcInformer
- p.secretInformer = common.SecretInformer
-
- ingressLister := kube.NewIngressLister(
- kubeFactory.Networking().V1().Ingresses().Lister(),
- kubeFactory.Networking().V1beta1().Ingresses().Lister(),
- kubeFactory.Extensions().V1beta1().Ingresses().Lister(),
- )
-
c := &ingressCommon{
Common: common,
namespaceProvider: namespaceProvider,
@@ -101,7 +73,7 @@ func NewProvider(common *providertypes.Common, namespaceProvider namespace.Watch
}, translator, apisixTranslator),
}
- p.ingressController = newIngressController(c, ingressLister, p.ingressInformer, p.secretInformer, p.endpointInformer, p.serviceInformer)
+ p.ingressController = newIngressController(c)
return p, nil
}
@@ -109,10 +81,6 @@ func NewProvider(common *providertypes.Common, namespaceProvider namespace.Watch
func (p *ingressProvider) Run(ctx context.Context) {
e := utils.ParallelExecutor{}
- e.Add(func() {
- p.ingressInformer.Run(ctx.Done())
- })
-
e.Add(func() {
p.ingressController.run(ctx)
})
diff --git a/pkg/providers/k8s/configmap/configmap.go b/pkg/providers/k8s/configmap/configmap.go
index 46bff207..6ce1bfdd 100644
--- a/pkg/providers/k8s/configmap/configmap.go
+++ b/pkg/providers/k8s/configmap/configmap.go
@@ -21,7 +21,6 @@ import (
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
- v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
@@ -42,16 +41,11 @@ type configmapController struct {
workqueue workqueue.RateLimitingInterface
workers int
- configmapInformer cache.SharedIndexInformer
- configmapLister v1.ConfigMapLister
-
subscriptionList map[subscripKey]struct{}
}
func newConfigMapController(common *providertypes.Common) *configmapController {
ctl := &configmapController{
- configmapInformer: common.ConfigMapInformer,
- configmapLister: common.ConfigMapLister,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ConfigMap"),
workers: 1,
@@ -60,7 +54,7 @@ func newConfigMapController(common *providertypes.Common) *configmapController {
Common: common,
}
- ctl.configmapInformer.AddEventHandler(
+ ctl.ConfigMapInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: ctl.onAdd,
UpdateFunc: ctl.onUpdate,
@@ -90,10 +84,6 @@ func (c *configmapController) IsSubscribing(key string) bool {
}
func (c *configmapController) run(ctx context.Context) {
- if ok := cache.WaitForCacheSync(ctx.Done(), c.configmapInformer.HasSynced); !ok {
- log.Error("namespace informers sync failed")
- return
- }
log.Info("configmap controller started")
defer log.Info("configmap controller exited")
for i := 0; i < c.workers; i++ {
@@ -126,7 +116,7 @@ func (c *configmapController) sync(ctx context.Context, ev *types.Event) error {
log.Errorf("invalid resource key: %s", key)
return err
}
- cm, err := c.configmapLister.ConfigMaps(namespace).Get(name)
+ cm, err := c.ConfigMapLister.ConfigMaps(namespace).Get(name)
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorw("sync failed, unable to get ConfigMap",
diff --git a/pkg/providers/k8s/configmap/provider.go b/pkg/providers/k8s/configmap/provider.go
index 9b771186..0b226d5d 100644
--- a/pkg/providers/k8s/configmap/provider.go
+++ b/pkg/providers/k8s/configmap/provider.go
@@ -19,8 +19,6 @@ package configmap
import (
"context"
- "k8s.io/client-go/tools/cache"
-
"github.com/apache/apisix-ingress-controller/pkg/config"
providertypes "github.com/apache/apisix-ingress-controller/pkg/providers/types"
"github.com/apache/apisix-ingress-controller/pkg/providers/utils"
@@ -33,17 +31,13 @@ type Provider interface {
}
type configmapProvider struct {
- cfg *config.Config
-
- configmapInformer cache.SharedIndexInformer
+ cfg *config.Config
configmapController *configmapController
}
func NewProvider(common *providertypes.Common) (Provider, error) {
p := &configmapProvider{
cfg: common.Config,
-
- configmapInformer: common.ConfigMapInformer,
}
p.configmapController = newConfigMapController(common)
diff --git a/pkg/providers/k8s/namespace/namespace_provider.go b/pkg/providers/k8s/namespace/namespace_provider.go
index 98a26ecb..6e020edd 100644
--- a/pkg/providers/k8s/namespace/namespace_provider.go
+++ b/pkg/providers/k8s/namespace/namespace_provider.go
@@ -65,7 +65,7 @@ func NewWatchingNamespaceProvider(ctx context.Context, kube *kube.KubeClient, cf
for _, selector := range cfg.Kubernetes.NamespaceSelector {
labelSlice := strings.Split(selector, "=")
if len(labelSlice) != 2 {
- return nil, fmt.Errorf("Bad namespace-selector format: %s, expected namespace-selector format: xxx=xxx", selector)
+ return nil, fmt.Errorf("bad namespace-selector format: %s, expected namespace-selector format: xxx=xxx", selector)
}
c.watchingLabels[labelSlice[0]] = labelSlice[1]
}
diff --git a/pkg/providers/k8s/pod/provider.go b/pkg/providers/k8s/pod/provider.go
index 2c177ac9..8ed810da 100644
--- a/pkg/providers/k8s/pod/provider.go
+++ b/pkg/providers/k8s/pod/provider.go
@@ -58,10 +58,6 @@ func NewProvider(common *providertypes.Common, namespaceProvider namespace.Watch
func (p *podProvider) Run(ctx context.Context) {
e := utils.ParallelExecutor{}
- e.Add(func() {
- p.podInformer.Run(ctx.Done())
- })
-
e.Add(func() {
p.podController.run(ctx)
})
diff --git a/pkg/providers/types/types.go b/pkg/providers/types/types.go
index 5ed99c0d..8dddd791 100644
--- a/pkg/providers/types/types.go
+++ b/pkg/providers/types/types.go
@@ -22,6 +22,7 @@ import (
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/client-go/informers"
listerscorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
@@ -30,6 +31,7 @@ import (
apisixcache "github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
"github.com/apache/apisix-ingress-controller/pkg/config"
"github.com/apache/apisix-ingress-controller/pkg/kube"
+ "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/informers/externalversions"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/metrics"
"github.com/apache/apisix-ingress-controller/pkg/providers/utils"
@@ -43,6 +45,12 @@ type Provider interface {
}
type ListerInformer struct {
+ KubeFactory informers.SharedInformerFactory
+ ApisixFactory externalversions.SharedInformerFactory
+
+ NamespaceInformer cache.SharedIndexInformer
+ NamespaceLister listerscorev1.NamespaceLister
+
EpLister kube.EndpointLister
EpInformer cache.SharedIndexInformer
@@ -55,36 +63,58 @@ type ListerInformer struct {
PodLister listerscorev1.PodLister
PodInformer cache.SharedIndexInformer
- ApisixUpstreamLister kube.ApisixUpstreamLister
- ApisixUpstreamInformer cache.SharedIndexInformer
-
ConfigMapLister listerscorev1.ConfigMapLister
ConfigMapInformer cache.SharedIndexInformer
+
+ IngressLister kube.IngressLister
+ IngressInformer cache.SharedIndexInformer
+
+ ApisixUpstreamInformer cache.SharedIndexInformer
+ ApisixRouteInformer cache.SharedIndexInformer
+ ApisixPluginConfigInformer cache.SharedIndexInformer
+ ApisixConsumerInformer cache.SharedIndexInformer
+ ApisixTlsInformer cache.SharedIndexInformer
+ ApisixClusterConfigInformer cache.SharedIndexInformer
+
+ ApisixRouteLister kube.ApisixRouteLister
+ ApisixUpstreamLister kube.ApisixUpstreamLister
+ ApisixPluginConfigLister kube.ApisixPluginConfigLister
+ ApisixConsumerLister kube.ApisixConsumerLister
+ ApisixTlsLister kube.ApisixTlsLister
+ ApisixClusterConfigLister kube.ApisixClusterConfigLister
}
-func (c *ListerInformer) Run(ctx context.Context) {
+func (c *ListerInformer) StartAndWaitForCacheSync(ctx context.Context) bool {
+ succ := true
e := utils.ParallelExecutor{}
e.Add(func() {
- c.EpInformer.Run(ctx.Done())
- })
- e.Add(func() {
- c.SvcInformer.Run(ctx.Done())
- })
- e.Add(func() {
- c.SecretInformer.Run(ctx.Done())
- })
- e.Add(func() {
- c.ConfigMapInformer.Run(ctx.Done())
- })
- e.Add(func() {
- c.PodInformer.Run(ctx.Done())
+ c.KubeFactory.Start(ctx.Done())
+ kube := c.KubeFactory.WaitForCacheSync(ctx.Done())
+
+ for resource, ok := range kube {
+ if !ok {
+ succ = false
+ log.Error(fmt.Sprintf("%s cache failed to sync", resource.Name()))
+ return
+ }
+ }
})
+
e.Add(func() {
- c.ApisixUpstreamInformer.Run(ctx.Done())
+ c.ApisixFactory.Start(ctx.Done())
+ crds := c.ApisixFactory.WaitForCacheSync(ctx.Done())
+ for crd, ok := range crds {
+ if !ok {
+ succ = false
+ log.Error(fmt.Sprintf("%s cache failed to sync", crd.Name()))
+ return
+ }
+ }
})
e.Wait()
+ return succ
}
type Common struct {