You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by to...@apache.org on 2021/05/14 01:55:54 UTC
[apisix-ingress-controller] branch master updated: chore: refactor
the structures of kube clients, shared index informer factories (#431)
This is an automated email from the ASF dual-hosted git repository.
tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new fb11efc chore: refactor the structures of kube clients, shared index informer factories (#431)
fb11efc is described below
commit fb11efc00a914e1992a8a730cf5443a3ea38e8be
Author: Alex Zhang <zc...@gmail.com>
AuthorDate: Fri May 14 09:55:47 2021 +0800
chore: refactor the structures of kube clients, shared index informer factories (#431)
---
pkg/ingress/apisix_route.go | 4 +-
pkg/ingress/apisix_tls.go | 6 +--
pkg/ingress/apisix_upstream.go | 10 ++--
pkg/ingress/controller.go | 104 +++++++++++++++++++----------------------
pkg/ingress/status.go | 11 ++---
pkg/kube/init.go | 53 +++++++++++----------
6 files changed, 90 insertions(+), 98 deletions(-)
diff --git a/pkg/ingress/apisix_route.go b/pkg/ingress/apisix_route.go
index f6fe291..a374626 100644
--- a/pkg/ingress/apisix_route.go
+++ b/pkg/ingress/apisix_route.go
@@ -219,7 +219,7 @@ func (c *apisixRouteController) handleSyncErr(obj interface{}, errOrigin error)
c.controller.recorderEvent(ar.V1(), v1.EventTypeNormal, _resourceSynced, nil)
} else if ar.GroupVersion() == kube.ApisixRouteV2alpha1 {
c.controller.recorderEvent(ar.V2alpha1(), v1.EventTypeNormal, _resourceSynced, nil)
- recordStatus(ar.V2alpha1(), _resourceSynced, nil, metav1.ConditionTrue)
+ c.controller.recordStatus(ar.V2alpha1(), _resourceSynced, nil, metav1.ConditionTrue)
}
} else {
log.Errorw("failed list ApisixRoute",
@@ -241,7 +241,7 @@ func (c *apisixRouteController) handleSyncErr(obj interface{}, errOrigin error)
c.controller.recorderEvent(ar.V1(), v1.EventTypeWarning, _resourceSyncAborted, errOrigin)
} else if ar.GroupVersion() == kube.ApisixRouteV2alpha1 {
c.controller.recorderEvent(ar.V2alpha1(), v1.EventTypeWarning, _resourceSyncAborted, errOrigin)
- recordStatus(ar.V2alpha1(), _resourceSyncAborted, errOrigin, metav1.ConditionFalse)
+ c.controller.recordStatus(ar.V2alpha1(), _resourceSyncAborted, errOrigin, metav1.ConditionFalse)
}
} else {
log.Errorw("failed list ApisixRoute",
diff --git a/pkg/ingress/apisix_tls.go b/pkg/ingress/apisix_tls.go
index 0b8a4fa..169eb1d 100644
--- a/pkg/ingress/apisix_tls.go
+++ b/pkg/ingress/apisix_tls.go
@@ -119,7 +119,7 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
zap.Any("ApisixTls", tls),
)
c.controller.recorderEvent(tls, corev1.EventTypeWarning, _resourceSyncAborted, err)
- recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
+ c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}
log.Debug("got SSL object from ApisixTls",
@@ -136,12 +136,12 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
zap.Any("ssl", ssl),
)
c.controller.recorderEvent(tls, corev1.EventTypeWarning, _resourceSyncAborted, err)
- recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
+ c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}
c.controller.recorderEvent(tls, corev1.EventTypeNormal, _resourceSynced, nil)
- recordStatus(tls, _resourceSynced, nil, metav1.ConditionTrue)
+ c.controller.recordStatus(tls, _resourceSynced, nil, metav1.ConditionTrue)
return err
}
diff --git a/pkg/ingress/apisix_upstream.go b/pkg/ingress/apisix_upstream.go
index 9c9d8eb..a05d9c1 100644
--- a/pkg/ingress/apisix_upstream.go
+++ b/pkg/ingress/apisix_upstream.go
@@ -124,7 +124,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
if err != nil {
log.Errorf("failed to get service %s: %s", key, err)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
- recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
+ c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}
@@ -139,7 +139,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
}
log.Errorf("failed to get upstream %s: %s", upsName, err)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
- recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
+ c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}
var newUps *apisixv1.Upstream
@@ -156,7 +156,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
zap.Error(err),
)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
- recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
+ c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}
} else {
@@ -178,12 +178,12 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
zap.String("cluster", clusterName),
)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
- recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
+ c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
return err
}
}
c.controller.recorderEvent(au, corev1.EventTypeNormal, _resourceSynced, nil)
- recordStatus(au, _resourceSynced, nil, metav1.ConditionTrue)
+ c.controller.recordStatus(au, _resourceSynced, nil, metav1.ConditionTrue)
return err
}
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index a388e11..354e75c 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -25,7 +25,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
- "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
listerscorev1 "k8s.io/client-go/listers/core/v1"
@@ -38,8 +37,6 @@ import (
"github.com/apache/apisix-ingress-controller/pkg/apisix"
"github.com/apache/apisix-ingress-controller/pkg/config"
"github.com/apache/apisix-ingress-controller/pkg/kube"
- crdclientset "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned"
- "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/informers/externalversions"
listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
listersv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v2alpha1"
"github.com/apache/apisix-ingress-controller/pkg/kube/translation"
@@ -64,18 +61,16 @@ const (
// Controller is the ingress apisix controller object.
type Controller struct {
- name string
- namespace string
- cfg *config.Config
- wg sync.WaitGroup
- watchingNamespace map[string]struct{}
- apisix apisix.APISIX
- translator translation.Translator
- apiServer *api.Server
- clientset kubernetes.Interface
- crdClientset crdclientset.Interface
- metricsCollector metrics.Collector
- crdInformerFactory externalversions.SharedInformerFactory
+ name string
+ namespace string
+ cfg *config.Config
+ wg sync.WaitGroup
+ watchingNamespace map[string]struct{}
+ apisix apisix.APISIX
+ translator translation.Translator
+ apiServer *api.Server
+ metricsCollector metrics.Collector
+ kubeClient *kube.KubeClient
// recorder event
recorder record.EventRecorder
// this map enrolls which ApisixTls objects refer to a Kubernetes
@@ -123,7 +118,8 @@ func NewController(cfg *config.Config) (*Controller, error) {
return nil, err
}
- if err := kube.InitInformer(cfg); err != nil {
+ kubeClient, err := kube.NewKubeClient(cfg)
+ if err != nil {
return nil, err
}
@@ -132,9 +128,6 @@ func NewController(cfg *config.Config) (*Controller, error) {
return nil, err
}
- crdClientset := kube.GetApisixClient()
- sharedInformerFactory := externalversions.NewSharedInformerFactory(crdClientset, cfg.Kubernetes.ResyncInterval.Duration)
-
var (
watchingNamespace map[string]struct{}
ingressInformer cache.SharedIndexInformer
@@ -146,63 +139,60 @@ func NewController(cfg *config.Config) (*Controller, error) {
watchingNamespace[ns] = struct{}{}
}
}
- kube.EndpointsInformer = kube.CoreSharedInformerFactory.Core().V1().Endpoints()
ingressLister := kube.NewIngressLister(
- kube.CoreSharedInformerFactory.Networking().V1().Ingresses().Lister(),
- kube.CoreSharedInformerFactory.Networking().V1beta1().Ingresses().Lister(),
- kube.CoreSharedInformerFactory.Extensions().V1beta1().Ingresses().Lister(),
+ kubeClient.SharedIndexInformerFactory.Networking().V1().Ingresses().Lister(),
+ kubeClient.SharedIndexInformerFactory.Networking().V1beta1().Ingresses().Lister(),
+ kubeClient.SharedIndexInformerFactory.Extensions().V1beta1().Ingresses().Lister(),
)
- apisixRouteLister := kube.NewApisixRouteLister(sharedInformerFactory.Apisix().V1().ApisixRoutes().Lister(),
- sharedInformerFactory.Apisix().V2alpha1().ApisixRoutes().Lister())
+ apisixRouteLister := kube.NewApisixRouteLister(kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixRoutes().Lister(),
+ kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixRoutes().Lister())
if cfg.Kubernetes.IngressVersion == config.IngressNetworkingV1 {
- ingressInformer = kube.CoreSharedInformerFactory.Networking().V1().Ingresses().Informer()
+ ingressInformer = kubeClient.SharedIndexInformerFactory.Networking().V1().Ingresses().Informer()
} else if cfg.Kubernetes.IngressVersion == config.IngressNetworkingV1beta1 {
- ingressInformer = kube.CoreSharedInformerFactory.Networking().V1beta1().Ingresses().Informer()
+ ingressInformer = kubeClient.SharedIndexInformerFactory.Networking().V1beta1().Ingresses().Informer()
} else {
- ingressInformer = kube.CoreSharedInformerFactory.Extensions().V1beta1().Ingresses().Informer()
+ ingressInformer = kubeClient.SharedIndexInformerFactory.Extensions().V1beta1().Ingresses().Informer()
}
if cfg.Kubernetes.ApisixRouteVersion == config.ApisixRouteV2alpha1 {
- apisixRouteInformer = sharedInformerFactory.Apisix().V2alpha1().ApisixRoutes().Informer()
+ apisixRouteInformer = kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixRoutes().Informer()
} else {
- apisixRouteInformer = sharedInformerFactory.Apisix().V1().ApisixRoutes().Informer()
+ apisixRouteInformer = kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixRoutes().Informer()
}
// recorder
eventBroadcaster := record.NewBroadcaster()
- eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kube.GetKubeClient().CoreV1().Events("")})
+ eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.Client.CoreV1().Events("")})
c := &Controller{
- name: podName,
- namespace: podNamespace,
- cfg: cfg,
- apiServer: apiSrv,
- apisix: client,
- metricsCollector: metrics.NewPrometheusCollector(podName, podNamespace),
- clientset: kube.GetKubeClient(),
- crdClientset: crdClientset,
- crdInformerFactory: sharedInformerFactory,
- watchingNamespace: watchingNamespace,
- secretSSLMap: new(sync.Map),
- recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}),
-
- epInformer: kube.CoreSharedInformerFactory.Core().V1().Endpoints().Informer(),
- epLister: kube.CoreSharedInformerFactory.Core().V1().Endpoints().Lister(),
- svcInformer: kube.CoreSharedInformerFactory.Core().V1().Services().Informer(),
- svcLister: kube.CoreSharedInformerFactory.Core().V1().Services().Lister(),
+ name: podName,
+ namespace: podNamespace,
+ cfg: cfg,
+ apiServer: apiSrv,
+ apisix: client,
+ metricsCollector: metrics.NewPrometheusCollector(podName, podNamespace),
+ kubeClient: kubeClient,
+ watchingNamespace: watchingNamespace,
+ secretSSLMap: new(sync.Map),
+ recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}),
+
+ epInformer: kubeClient.SharedIndexInformerFactory.Core().V1().Endpoints().Informer(),
+ epLister: kubeClient.SharedIndexInformerFactory.Core().V1().Endpoints().Lister(),
+ svcInformer: kubeClient.SharedIndexInformerFactory.Core().V1().Services().Informer(),
+ svcLister: kubeClient.SharedIndexInformerFactory.Core().V1().Services().Lister(),
ingressLister: ingressLister,
ingressInformer: ingressInformer,
- secretInformer: kube.CoreSharedInformerFactory.Core().V1().Secrets().Informer(),
- secretLister: kube.CoreSharedInformerFactory.Core().V1().Secrets().Lister(),
+ secretInformer: kubeClient.SharedIndexInformerFactory.Core().V1().Secrets().Informer(),
+ secretLister: kubeClient.SharedIndexInformerFactory.Core().V1().Secrets().Lister(),
apisixRouteInformer: apisixRouteInformer,
apisixRouteLister: apisixRouteLister,
- apisixUpstreamInformer: sharedInformerFactory.Apisix().V1().ApisixUpstreams().Informer(),
- apisixUpstreamLister: sharedInformerFactory.Apisix().V1().ApisixUpstreams().Lister(),
- apisixTlsInformer: sharedInformerFactory.Apisix().V1().ApisixTlses().Informer(),
- apisixTlsLister: sharedInformerFactory.Apisix().V1().ApisixTlses().Lister(),
- apisixClusterConfigInformer: sharedInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Informer(),
- apisixClusterConfigLister: sharedInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Lister(),
+ apisixUpstreamInformer: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixUpstreams().Informer(),
+ apisixUpstreamLister: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixUpstreams().Lister(),
+ apisixTlsInformer: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixTlses().Informer(),
+ apisixTlsLister: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixTlses().Lister(),
+ apisixClusterConfigInformer: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Informer(),
+ apisixClusterConfigLister: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Lister(),
}
c.translator = translation.NewTranslator(&translation.TranslatorOptions{
EndpointsLister: c.epLister,
@@ -267,7 +257,7 @@ func (c *Controller) Run(stop chan struct{}) error {
Namespace: c.namespace,
Name: c.cfg.Kubernetes.ElectionID,
},
- Client: c.clientset.CoordinationV1(),
+ Client: c.kubeClient.Client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: c.name,
EventRecorder: c,
diff --git a/pkg/ingress/status.go b/pkg/ingress/status.go
index 5833288..492fbd6 100644
--- a/pkg/ingress/status.go
+++ b/pkg/ingress/status.go
@@ -23,7 +23,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "github.com/apache/apisix-ingress-controller/pkg/kube"
configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
"github.com/apache/apisix-ingress-controller/pkg/log"
@@ -35,7 +34,7 @@ const (
)
// recordStatus record resources status
-func recordStatus(at interface{}, reason string, err error, status v1.ConditionStatus) {
+func (c *Controller) recordStatus(at interface{}, reason string, err error, status v1.ConditionStatus) {
// build condition
message := _commonSuccessMessage
if err != nil {
@@ -47,6 +46,7 @@ func recordStatus(at interface{}, reason string, err error, status v1.ConditionS
Status: status,
Message: message,
}
+ client := c.kubeClient.APISIXClient
switch v := at.(type) {
case *configv1.ApisixTls:
@@ -56,7 +56,7 @@ func recordStatus(at interface{}, reason string, err error, status v1.ConditionS
v.Status.Conditions = &conditions
}
meta.SetStatusCondition(v.Status.Conditions, condition)
- if _, errRecord := kube.GetApisixClient().ApisixV1().ApisixTlses(v.Namespace).
+ if _, errRecord := client.ApisixV1().ApisixTlses(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixTls",
zap.Error(errRecord),
@@ -71,7 +71,7 @@ func recordStatus(at interface{}, reason string, err error, status v1.ConditionS
v.Status.Conditions = &conditions
}
meta.SetStatusCondition(v.Status.Conditions, condition)
- if _, errRecord := kube.GetApisixClient().ApisixV1().ApisixUpstreams(v.Namespace).
+ if _, errRecord := client.ApisixV1().ApisixUpstreams(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixUpstream",
zap.Error(errRecord),
@@ -86,7 +86,7 @@ func recordStatus(at interface{}, reason string, err error, status v1.ConditionS
v.Status.Conditions = &conditions
}
meta.SetStatusCondition(v.Status.Conditions, condition)
- if _, errRecord := kube.GetApisixClient().ApisixV2alpha1().ApisixRoutes(v.Namespace).
+ if _, errRecord := client.ApisixV2alpha1().ApisixRoutes(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixRoute",
zap.Error(errRecord),
@@ -98,5 +98,4 @@ func recordStatus(at interface{}, reason string, err error, status v1.ConditionS
// This should not be executed
log.Errorf("unsupported resource record: %s", v)
}
-
}
diff --git a/pkg/kube/init.go b/pkg/kube/init.go
index 0e36922..d7b21e9 100644
--- a/pkg/kube/init.go
+++ b/pkg/kube/init.go
@@ -16,46 +16,49 @@ package kube
import (
"k8s.io/client-go/informers"
- coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"github.com/apache/apisix-ingress-controller/pkg/config"
clientset "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned"
+ "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/informers/externalversions"
)
-var (
- EndpointsInformer coreinformers.EndpointsInformer
- kubeClient kubernetes.Interface
- apisixKubeClient *clientset.Clientset
- CoreSharedInformerFactory informers.SharedInformerFactory
-)
-
-func GetKubeClient() kubernetes.Interface {
- return kubeClient
-}
-
-func GetApisixClient() clientset.Interface {
- return apisixKubeClient
+// KubeClient contains some objects used to communicate with Kubernetes API Server.
+type KubeClient struct {
+ // Client is the object used to operate Kubernetes builtin resources.
+ Client kubernetes.Interface
+ // APISIXClient is the object used to operate resources under apisix.apache.org group.
+ APISIXClient clientset.Interface
+ // SharedIndexInformerFactory is the index informer factory object used to watch and
+ // list Kubernetes builtin resources.
+ SharedIndexInformerFactory informers.SharedInformerFactory
+ // APISIXSharedIndexInformerFactory is the index informer factory object used to watch
+ // and list Kubernetes resources in apisix.apache.org group.
+ APISIXSharedIndexInformerFactory externalversions.SharedInformerFactory
}
-// initInformer initializes all related shared informers.
-// Deprecate: will be refactored in the future without notification.
-func InitInformer(cfg *config.Config) error {
- var err error
+// NewKubeClient creates a high-level Kubernetes client.
+func NewKubeClient(cfg *config.Config) (*KubeClient, error) {
restConfig, err := BuildRestConfig(cfg.Kubernetes.Kubeconfig, "")
if err != nil {
- return err
+ return nil, err
}
- kubeClient, err = kubernetes.NewForConfig(restConfig)
+ kubeClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
- return err
+ return nil, err
}
- apisixKubeClient, err = clientset.NewForConfig(restConfig)
+ apisixKubeClient, err := clientset.NewForConfig(restConfig)
if err != nil {
- return err
+ return nil, err
}
- CoreSharedInformerFactory = informers.NewSharedInformerFactory(kubeClient, cfg.Kubernetes.ResyncInterval.Duration)
+ factory := informers.NewSharedInformerFactory(kubeClient, cfg.Kubernetes.ResyncInterval.Duration)
+ apisixFactory := externalversions.NewSharedInformerFactory(apisixKubeClient, cfg.Kubernetes.ResyncInterval.Duration)
- return nil
+ return &KubeClient{
+ Client: kubeClient,
+ APISIXClient: apisixKubeClient,
+ SharedIndexInformerFactory: factory,
+ APISIXSharedIndexInformerFactory: apisixFactory,
+ }, nil
}