You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by to...@apache.org on 2021/05/26 08:40:04 UTC
[apisix-ingress-controller] branch master updated: chore: add
health check to apisix-admin and make the leader election recyclable (#499)
This is an automated email from the ASF dual-hosted git repository.
tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 582c4b3 chore: add health check to apisix-admin and make the leader election recyclable (#499)
582c4b3 is described below
commit 582c4b362f26ffa8372bf520c3f774170a56c290
Author: Alex Zhang <zc...@gmail.com>
AuthorDate: Wed May 26 16:39:58 2021 +0800
chore: add health check to apisix-admin and make the leader election recyclable (#499)
Co-authored-by: Liu Peng <vs...@gmail.com>
---
pkg/apisix/apisix.go | 6 +-
pkg/apisix/cluster.go | 122 ++++++++++++++++-----
pkg/apisix/nonexistentclient.go | 4 +
pkg/ingress/apisix_cluster_config.go | 3 +-
pkg/ingress/apisix_route.go | 3 +-
pkg/ingress/apisix_tls.go | 3 +-
pkg/ingress/apisix_upstream.go | 3 +-
pkg/ingress/controller.go | 151 +++++++++++++++++---------
pkg/ingress/endpoint.go | 2 +-
pkg/ingress/ingress.go | 2 +-
pkg/ingress/secret.go | 2 +-
pkg/kube/apisix/apis/config/v2alpha1/types.go | 4 +
pkg/kube/init.go | 29 +++--
13 files changed, 235 insertions(+), 99 deletions(-)
diff --git a/pkg/apisix/apisix.go b/pkg/apisix/apisix.go
index ffcb405..4200e0a 100644
--- a/pkg/apisix/apisix.go
+++ b/pkg/apisix/apisix.go
@@ -50,6 +50,8 @@ type Cluster interface {
String() string
// HasSynced checks whether all resources in APISIX cluster is synced to cache.
HasSynced(context.Context) error
+ // HealthCheck checks apisix cluster health in realtime.
+ HealthCheck(context.Context) error
}
// Route is the specific client interface to take over the create, update,
@@ -122,6 +124,7 @@ type apisix struct {
func NewClient() (APISIX, error) {
cli := &apisix{
nonExistentCluster: newNonExistentCluster(),
+ clusters: make(map[string]Cluster),
}
return cli, nil
}
@@ -160,9 +163,6 @@ func (c *apisix) AddCluster(co *ClusterOptions) error {
if err != nil {
return err
}
- if c.clusters == nil {
- c.clusters = make(map[string]Cluster)
- }
c.clusters[co.Name] = cluster
return nil
}
diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
index d41e5dd..4d42d1e 100644
--- a/pkg/apisix/cluster.go
+++ b/pkg/apisix/cluster.go
@@ -21,7 +21,9 @@ import (
"fmt"
"io"
"io/ioutil"
+ "net"
"net/http"
+ "net/url"
"strings"
"sync/atomic"
"time"
@@ -37,8 +39,7 @@ import (
const (
_defaultTimeout = 5 * time.Second
- _cacheNotSync = iota
- _cacheSyncing
+ _cacheSyncing = iota
_cacheSynced
)
@@ -50,6 +51,19 @@ var (
ErrDuplicatedCluster = errors.New("duplicated cluster")
_errReadOnClosedResBody = errors.New("http: read on closed response body")
+
+ // Default shared transport for apisix client
+ _defaultTransport = &http.Transport{
+ Proxy: http.ProxyFromEnvironment,
+ Dial: (&net.Dialer{
+ Timeout: 3 * time.Second,
+ }).Dial,
+ DialContext: (&net.Dialer{
+ Timeout: 3 * time.Second,
+ }).DialContext,
+ ResponseHeaderTimeout: 30 * time.Second,
+ ExpectContinueTimeout: 1 * time.Second,
+ }
)
// ClusterOptions contains parameters to customize APISIX client.
@@ -63,6 +77,7 @@ type ClusterOptions struct {
type cluster struct {
name string
baseURL string
+ baseURLHost string
adminKey string
cli *http.Client
cacheState int32
@@ -86,19 +101,21 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
}
o.BaseURL = strings.TrimSuffix(o.BaseURL, "/")
+ u, err := url.Parse(o.BaseURL)
+ if err != nil {
+ return nil, err
+ }
+
c := &cluster{
- name: o.Name,
- baseURL: o.BaseURL,
- adminKey: o.AdminKey,
+ name: o.Name,
+ baseURL: o.BaseURL,
+ baseURLHost: u.Host,
+ adminKey: o.AdminKey,
cli: &http.Client{
- Timeout: o.Timeout,
- Transport: &http.Transport{
- ResponseHeaderTimeout: o.Timeout,
- ExpectContinueTimeout: o.Timeout,
- },
+ Timeout: o.Timeout,
+ Transport: _defaultTransport,
},
- cache: nil,
- cacheState: _cacheNotSync,
+ cacheState: _cacheSyncing, // default state
cacheSynced: make(chan struct{}),
}
c.route = newRouteClient(c)
@@ -108,6 +125,11 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
c.globalRules = newGlobalRuleClient(c)
c.consumer = newConsumerClient(c)
+ c.cache, err = cache.NewMemDBCache()
+ if err != nil {
+ return nil, err
+ }
+
go c.syncCache()
return c, nil
@@ -116,9 +138,6 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
func (c *cluster) syncCache() {
log.Infow("syncing cache", zap.String("cluster", c.name))
now := time.Now()
- if !atomic.CompareAndSwapInt32(&c.cacheState, _cacheNotSync, _cacheSyncing) {
- panic("dubious state when sync cache")
- }
defer func() {
if c.cacheSyncErr == nil {
log.Infow("cache synced",
@@ -134,13 +153,20 @@ func (c *cluster) syncCache() {
}()
backoff := wait.Backoff{
- Duration: time.Second,
- Factor: 2,
- Steps: 6,
- }
- err := wait.ExponentialBackoff(backoff, c.syncCacheOnce)
+ Duration: 2 * time.Second,
+ Factor: 1,
+ Steps: 5,
+ }
+ var lastSyncErr error
+ err := wait.ExponentialBackoff(backoff, func() (done bool, _ error) {
+ // impossibly return: false, nil
+ // so can safe used
+ done, lastSyncErr = c.syncCacheOnce()
+ return
+ })
if err != nil {
- c.cacheSyncErr = err
+ // if ErrWaitTimeout then set lastSyncErr
+ c.cacheSyncErr = lastSyncErr
}
close(c.cacheSynced)
@@ -150,12 +176,6 @@ func (c *cluster) syncCache() {
}
func (c *cluster) syncCacheOnce() (bool, error) {
- dbcache, err := cache.NewMemDBCache()
- if err != nil {
- return false, err
- }
- c.cache = dbcache
-
routes, err := c.route.List(context.TODO())
if err != nil {
log.Errorf("failed to list route in APISIX: %s", err)
@@ -306,6 +326,54 @@ func (c *cluster) GlobalRule() GlobalRule {
return c.globalRules
}
+// HealthCheck implements Cluster.HealthCheck method.
+func (c *cluster) HealthCheck(ctx context.Context) (err error) {
+ if c.cacheSyncErr != nil {
+ err = c.cacheSyncErr
+ return
+ }
+ if atomic.LoadInt32(&c.cacheState) == _cacheSyncing {
+ return
+ }
+
+ // Retry three times in a row, and exit if all of them fail.
+ backoff := wait.Backoff{
+ Duration: 5 * time.Second,
+ Factor: 1,
+ Steps: 3,
+ }
+ var lastCheckErr error
+ err = wait.ExponentialBackoffWithContext(ctx, backoff, func() (done bool, _ error) {
+ if lastCheckErr = c.healthCheck(ctx); lastCheckErr != nil {
+ log.Warnf("failed to check health for cluster %s: %s, will retry", c.name, lastCheckErr)
+ return
+ }
+ done = true
+ return
+ })
+ if err != nil {
+ // if ErrWaitTimeout then set lastSyncErr
+ c.cacheSyncErr = lastCheckErr
+ }
+ return err
+}
+
+func (c *cluster) healthCheck(ctx context.Context) (err error) {
+ // tcp socket probe
+ d := net.Dialer{Timeout: 3 * time.Second}
+ conn, err := d.DialContext(ctx, "tcp", c.baseURLHost)
+ if err != nil {
+ return err
+ }
+ if er := conn.Close(); er != nil {
+ log.Warnw("failed to close tcp probe connection",
+ zap.Error(err),
+ zap.String("cluster", c.name),
+ )
+ }
+ return
+}
+
func (c *cluster) applyAuth(req *http.Request) {
if c.adminKey != "" {
req.Header.Set("X-API-Key", c.adminKey)
diff --git a/pkg/apisix/nonexistentclient.go b/pkg/apisix/nonexistentclient.go
index 63c12a4..a3c38ee 100644
--- a/pkg/apisix/nonexistentclient.go
+++ b/pkg/apisix/nonexistentclient.go
@@ -180,6 +180,10 @@ func (nc *nonExistentCluster) HasSynced(_ context.Context) error {
return nil
}
+func (nc *nonExistentCluster) HealthCheck(_ context.Context) error {
+ return nil
+}
+
func (nc *nonExistentCluster) String() string {
return "non-existent cluster"
}
diff --git a/pkg/ingress/apisix_cluster_config.go b/pkg/ingress/apisix_cluster_config.go
index c275a3c..5df2e5d 100644
--- a/pkg/ingress/apisix_cluster_config.go
+++ b/pkg/ingress/apisix_cluster_config.go
@@ -54,6 +54,8 @@ func (c *Controller) newApisixClusterConfigController() *apisixClusterConfigCont
func (c *apisixClusterConfigController) run(ctx context.Context) {
log.Info("ApisixClusterConfig controller started")
defer log.Info("ApisixClusterConfig controller exited")
+ defer c.workqueue.ShutDown()
+
if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixClusterConfigInformer.HasSynced); !ok {
log.Error("cache sync failed")
return
@@ -62,7 +64,6 @@ func (c *apisixClusterConfigController) run(ctx context.Context) {
go c.runWorker(ctx)
}
<-ctx.Done()
- c.workqueue.ShutDown()
}
func (c *apisixClusterConfigController) runWorker(ctx context.Context) {
diff --git a/pkg/ingress/apisix_route.go b/pkg/ingress/apisix_route.go
index a374626..1172c73 100644
--- a/pkg/ingress/apisix_route.go
+++ b/pkg/ingress/apisix_route.go
@@ -56,6 +56,8 @@ func (c *Controller) newApisixRouteController() *apisixRouteController {
func (c *apisixRouteController) run(ctx context.Context) {
log.Info("ApisixRoute controller started")
defer log.Info("ApisixRoute controller exited")
+ defer c.workqueue.ShutDown()
+
ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixRouteInformer.HasSynced)
if !ok {
log.Error("cache sync failed")
@@ -66,7 +68,6 @@ func (c *apisixRouteController) run(ctx context.Context) {
go c.runWorker(ctx)
}
<-ctx.Done()
- c.workqueue.ShutDown()
}
func (c *apisixRouteController) runWorker(ctx context.Context) {
diff --git a/pkg/ingress/apisix_tls.go b/pkg/ingress/apisix_tls.go
index 169eb1d..643d37b 100644
--- a/pkg/ingress/apisix_tls.go
+++ b/pkg/ingress/apisix_tls.go
@@ -57,6 +57,8 @@ func (c *Controller) newApisixTlsController() *apisixTlsController {
func (c *apisixTlsController) run(ctx context.Context) {
log.Info("ApisixTls controller started")
defer log.Info("ApisixTls controller exited")
+ defer c.workqueue.ShutDown()
+
if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixTlsInformer.HasSynced, c.controller.secretInformer.HasSynced); !ok {
log.Errorf("informers sync failed")
return
@@ -66,7 +68,6 @@ func (c *apisixTlsController) run(ctx context.Context) {
}
<-ctx.Done()
- c.workqueue.ShutDown()
}
func (c *apisixTlsController) runWorker(ctx context.Context) {
diff --git a/pkg/ingress/apisix_upstream.go b/pkg/ingress/apisix_upstream.go
index a05d9c1..4ba0162 100644
--- a/pkg/ingress/apisix_upstream.go
+++ b/pkg/ingress/apisix_upstream.go
@@ -57,6 +57,8 @@ func (c *Controller) newApisixUpstreamController() *apisixUpstreamController {
func (c *apisixUpstreamController) run(ctx context.Context) {
log.Info("ApisixUpstream controller started")
defer log.Info("ApisixUpstream controller exited")
+ defer c.workqueue.ShutDown()
+
if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixUpstreamInformer.HasSynced, c.controller.svcInformer.HasSynced); !ok {
log.Error("cache sync failed")
return
@@ -66,7 +68,6 @@ func (c *apisixUpstreamController) run(ctx context.Context) {
}
<-ctx.Done()
- c.workqueue.ShutDown()
}
func (c *apisixUpstreamController) runWorker(ctx context.Context) {
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index f573564..9914c02 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -135,9 +135,7 @@ func NewController(cfg *config.Config) (*Controller, error) {
}
var (
- watchingNamespace map[string]struct{}
- ingressInformer cache.SharedIndexInformer
- apisixRouteInformer cache.SharedIndexInformer
+ watchingNamespace map[string]struct{}
)
if len(cfg.Kubernetes.AppNamespaces) > 1 || cfg.Kubernetes.AppNamespaces[0] != v1.NamespaceAll {
watchingNamespace = make(map[string]struct{}, len(cfg.Kubernetes.AppNamespaces))
@@ -146,27 +144,6 @@ func NewController(cfg *config.Config) (*Controller, error) {
}
}
- ingressLister := kube.NewIngressLister(
- kubeClient.SharedIndexInformerFactory.Networking().V1().Ingresses().Lister(),
- kubeClient.SharedIndexInformerFactory.Networking().V1beta1().Ingresses().Lister(),
- kubeClient.SharedIndexInformerFactory.Extensions().V1beta1().Ingresses().Lister(),
- )
- apisixRouteLister := kube.NewApisixRouteLister(kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixRoutes().Lister(),
- kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixRoutes().Lister())
-
- if cfg.Kubernetes.IngressVersion == config.IngressNetworkingV1 {
- ingressInformer = kubeClient.SharedIndexInformerFactory.Networking().V1().Ingresses().Informer()
- } else if cfg.Kubernetes.IngressVersion == config.IngressNetworkingV1beta1 {
- ingressInformer = kubeClient.SharedIndexInformerFactory.Networking().V1beta1().Ingresses().Informer()
- } else {
- ingressInformer = kubeClient.SharedIndexInformerFactory.Extensions().V1beta1().Ingresses().Informer()
- }
- if cfg.Kubernetes.ApisixRouteVersion == config.ApisixRouteV2alpha1 {
- apisixRouteInformer = kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixRoutes().Informer()
- } else {
- apisixRouteInformer = kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixRoutes().Informer()
- }
-
// recorder
utilruntime.Must(apisixscheme.AddToScheme(scheme.Scheme))
eventBroadcaster := record.NewBroadcaster()
@@ -183,24 +160,35 @@ func NewController(cfg *config.Config) (*Controller, error) {
watchingNamespace: watchingNamespace,
secretSSLMap: new(sync.Map),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}),
-
- epInformer: kubeClient.SharedIndexInformerFactory.Core().V1().Endpoints().Informer(),
- epLister: kubeClient.SharedIndexInformerFactory.Core().V1().Endpoints().Lister(),
- svcInformer: kubeClient.SharedIndexInformerFactory.Core().V1().Services().Informer(),
- svcLister: kubeClient.SharedIndexInformerFactory.Core().V1().Services().Lister(),
- ingressLister: ingressLister,
- ingressInformer: ingressInformer,
- secretInformer: kubeClient.SharedIndexInformerFactory.Core().V1().Secrets().Informer(),
- secretLister: kubeClient.SharedIndexInformerFactory.Core().V1().Secrets().Lister(),
- apisixRouteInformer: apisixRouteInformer,
- apisixRouteLister: apisixRouteLister,
- apisixUpstreamInformer: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixUpstreams().Informer(),
- apisixUpstreamLister: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixUpstreams().Lister(),
- apisixTlsInformer: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixTlses().Informer(),
- apisixTlsLister: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixTlses().Lister(),
- apisixClusterConfigInformer: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Informer(),
- apisixClusterConfigLister: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Lister(),
}
+ return c, nil
+}
+
+func (c *Controller) initWhenStartLeading() {
+ var (
+ ingressInformer cache.SharedIndexInformer
+ apisixRouteInformer cache.SharedIndexInformer
+ )
+
+ kubeFactory := c.kubeClient.NewSharedIndexInformerFactory()
+ apisixFactory := c.kubeClient.NewAPISIXSharedIndexInformerFactory()
+
+ c.epLister = kubeFactory.Core().V1().Endpoints().Lister()
+ c.svcLister = kubeFactory.Core().V1().Services().Lister()
+ c.ingressLister = kube.NewIngressLister(
+ kubeFactory.Networking().V1().Ingresses().Lister(),
+ kubeFactory.Networking().V1beta1().Ingresses().Lister(),
+ kubeFactory.Extensions().V1beta1().Ingresses().Lister(),
+ )
+ c.secretLister = kubeFactory.Core().V1().Secrets().Lister()
+ c.apisixRouteLister = kube.NewApisixRouteLister(
+ apisixFactory.Apisix().V1().ApisixRoutes().Lister(),
+ apisixFactory.Apisix().V2alpha1().ApisixRoutes().Lister(),
+ )
+ c.apisixUpstreamLister = apisixFactory.Apisix().V1().ApisixUpstreams().Lister()
+ c.apisixTlsLister = apisixFactory.Apisix().V1().ApisixTlses().Lister()
+ c.apisixClusterConfigLister = apisixFactory.Apisix().V2alpha1().ApisixClusterConfigs().Lister()
+
c.translator = translation.NewTranslator(&translation.TranslatorOptions{
EndpointsLister: c.epLister,
ServiceLister: c.svcLister,
@@ -208,15 +196,35 @@ func NewController(cfg *config.Config) (*Controller, error) {
SecretLister: c.secretLister,
})
+ if c.cfg.Kubernetes.IngressVersion == config.IngressNetworkingV1 {
+ ingressInformer = kubeFactory.Networking().V1().Ingresses().Informer()
+ } else if c.cfg.Kubernetes.IngressVersion == config.IngressNetworkingV1beta1 {
+ ingressInformer = kubeFactory.Networking().V1beta1().Ingresses().Informer()
+ } else {
+ ingressInformer = kubeFactory.Extensions().V1beta1().Ingresses().Informer()
+ }
+ if c.cfg.Kubernetes.ApisixRouteVersion == config.ApisixRouteV2alpha1 {
+ apisixRouteInformer = apisixFactory.Apisix().V2alpha1().ApisixRoutes().Informer()
+ } else {
+ apisixRouteInformer = apisixFactory.Apisix().V1().ApisixRoutes().Informer()
+ }
+
+ c.epInformer = kubeFactory.Core().V1().Endpoints().Informer()
+ c.svcInformer = kubeFactory.Core().V1().Services().Informer()
+ c.ingressInformer = ingressInformer
+ c.apisixRouteInformer = apisixRouteInformer
+ c.apisixUpstreamInformer = apisixFactory.Apisix().V1().ApisixUpstreams().Informer()
+ c.apisixClusterConfigInformer = apisixFactory.Apisix().V2alpha1().ApisixClusterConfigs().Informer()
+ c.secretInformer = kubeFactory.Core().V1().Secrets().Informer()
+ c.apisixTlsInformer = apisixFactory.Apisix().V1().ApisixTlses().Informer()
+
c.endpointsController = c.newEndpointsController()
c.apisixUpstreamController = c.newApisixUpstreamController()
+ c.ingressController = c.newIngressController()
c.apisixRouteController = c.newApisixRouteController()
c.apisixClusterConfigController = c.newApisixClusterConfigController()
c.apisixTlsController = c.newApisixTlsController()
- c.ingressController = c.newIngressController()
c.secretController = c.newSecretController()
-
- return c, nil
}
// recorderEvent recorder events for resources
@@ -320,30 +328,47 @@ election:
}
func (c *Controller) run(ctx context.Context) {
- log.Infow("controller now is running as leader",
+ log.Infow("controller tries to leading ...",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
+
+ var cancelFunc context.CancelFunc
+ ctx, cancelFunc = context.WithCancel(ctx)
+ defer cancelFunc()
+
+ // give up leader
defer c.leaderContextCancelFunc()
- c.metricsCollector.ResetLeader(true)
- err := c.apisix.AddCluster(&apisix.ClusterOptions{
+ clusterOpts := &apisix.ClusterOptions{
Name: c.cfg.APISIX.DefaultClusterName,
AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
BaseURL: c.cfg.APISIX.DefaultClusterBaseURL,
- })
+ }
+ err := c.apisix.AddCluster(clusterOpts)
if err != nil && err != apisix.ErrDuplicatedCluster {
- // TODO give up the leader role.
+ // TODO give up the leader role
log.Errorf("failed to add default cluster: %s", err)
return
}
if err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {
- // TODO give up the leader role.
+ // TODO give up the leader role
log.Errorf("failed to wait the default cluster to be ready: %s", err)
+
+ // re-create apisix cluster, used in next c.run
+ if err = c.apisix.UpdateCluster(clusterOpts); err != nil {
+ log.Errorf("failed to update default cluster: %s", err)
+ return
+ }
return
}
+ c.initWhenStartLeading()
+
+ c.goAttach(func() {
+ c.checkClusterHealth(ctx, cancelFunc)
+ })
c.goAttach(func() {
c.epInformer.Run(ctx.Done())
})
@@ -390,6 +415,13 @@ func (c *Controller) run(ctx context.Context) {
c.secretController.run(ctx)
})
+ c.metricsCollector.ResetLeader(true)
+
+ log.Infow("controller now is running as leader",
+ zap.String("namespace", c.namespace),
+ zap.String("pod", c.name),
+ )
+
<-ctx.Done()
c.wg.Wait()
}
@@ -426,3 +458,22 @@ func (c *Controller) syncSSL(ctx context.Context, ssl *apisixv1.Ssl, event types
}
return err
}
+
+func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context.CancelFunc) {
+ defer cancelFunc()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(5 * time.Second):
+ }
+
+ err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HealthCheck(ctx)
+ if err != nil {
+ // Finally failed health check, then give up leader.
+ log.Warnf("failed to check health for default cluster: %s, give up leader", err)
+ return
+ }
+ log.Debugf("success check health for default cluster")
+ }
+}
diff --git a/pkg/ingress/endpoint.go b/pkg/ingress/endpoint.go
index fbf64f3..cfcea7f 100644
--- a/pkg/ingress/endpoint.go
+++ b/pkg/ingress/endpoint.go
@@ -58,6 +58,7 @@ func (c *Controller) newEndpointsController() *endpointsController {
func (c *endpointsController) run(ctx context.Context) {
log.Info("endpoints controller started")
defer log.Info("endpoints controller exited")
+ defer c.workqueue.ShutDown()
if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.epInformer.HasSynced); !ok {
log.Error("informers sync failed")
@@ -82,7 +83,6 @@ func (c *endpointsController) run(ctx context.Context) {
}
<-ctx.Done()
- c.workqueue.ShutDown()
}
func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error {
diff --git a/pkg/ingress/ingress.go b/pkg/ingress/ingress.go
index 73077b7..6167e7d 100644
--- a/pkg/ingress/ingress.go
+++ b/pkg/ingress/ingress.go
@@ -57,6 +57,7 @@ func (c *Controller) newIngressController() *ingressController {
func (c *ingressController) run(ctx context.Context) {
log.Info("ingress controller started")
defer log.Infof("ingress controller exited")
+ defer c.workqueue.ShutDown()
if !cache.WaitForCacheSync(ctx.Done(), c.controller.ingressInformer.HasSynced) {
log.Errorf("cache sync failed")
@@ -66,7 +67,6 @@ func (c *ingressController) run(ctx context.Context) {
go c.runWorker(ctx)
}
<-ctx.Done()
- c.workqueue.ShutDown()
}
func (c *ingressController) runWorker(ctx context.Context) {
diff --git a/pkg/ingress/secret.go b/pkg/ingress/secret.go
index 143eddc..8b3aec2 100644
--- a/pkg/ingress/secret.go
+++ b/pkg/ingress/secret.go
@@ -59,6 +59,7 @@ func (c *Controller) newSecretController() *secretController {
func (c *secretController) run(ctx context.Context) {
log.Info("secret controller started")
defer log.Info("secret controller exited")
+ defer c.workqueue.ShutDown()
if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.secretInformer.HasSynced); !ok {
log.Error("informers sync failed")
@@ -70,7 +71,6 @@ func (c *secretController) run(ctx context.Context) {
}
<-ctx.Done()
- c.workqueue.ShutDown()
}
func (c *secretController) runWorker(ctx context.Context) {
diff --git a/pkg/kube/apisix/apis/config/v2alpha1/types.go b/pkg/kube/apisix/apis/config/v2alpha1/types.go
index 08b4a8b..1fdf482 100644
--- a/pkg/kube/apisix/apis/config/v2alpha1/types.go
+++ b/pkg/kube/apisix/apis/config/v2alpha1/types.go
@@ -20,6 +20,8 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
+
+ "github.com/apache/apisix-ingress-controller/pkg/types"
)
const (
@@ -299,6 +301,8 @@ type ApisixClusterAdminConfig struct {
BaseURL string `json:"baseURL" yaml:"baseURL"`
// AdminKey is used to verify the admin API user.
AdminKey string `json:"adminKey" yaml:"adminKey"`
+ // ClientTimeout is request timeout for the APISIX Admin API client
+ ClientTimeout types.TimeDuration `json:"clientTimeout" yaml:"clientTimeout"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
diff --git a/pkg/kube/init.go b/pkg/kube/init.go
index d7b21e9..55af64e 100644
--- a/pkg/kube/init.go
+++ b/pkg/kube/init.go
@@ -25,16 +25,12 @@ import (
// KubeClient contains some objects used to communicate with Kubernetes API Server.
type KubeClient struct {
+ cfg *config.Config
+
// Client is the object used to operate Kubernetes builtin resources.
Client kubernetes.Interface
// APISIXClient is the object used to operate resources under apisix.apache.org group.
APISIXClient clientset.Interface
- // SharedIndexInformerFactory is the index informer factory object used to watch and
- // list Kubernetes builtin resources.
- SharedIndexInformerFactory informers.SharedInformerFactory
- // APISIXSharedIndexInformerFactory is the index informer factory object used to watch
- // and list Kubernetes resources in apisix.apache.org group.
- APISIXSharedIndexInformerFactory externalversions.SharedInformerFactory
}
// NewKubeClient creates a high-level Kubernetes client.
@@ -52,13 +48,22 @@ func NewKubeClient(cfg *config.Config) (*KubeClient, error) {
if err != nil {
return nil, err
}
- factory := informers.NewSharedInformerFactory(kubeClient, cfg.Kubernetes.ResyncInterval.Duration)
- apisixFactory := externalversions.NewSharedInformerFactory(apisixKubeClient, cfg.Kubernetes.ResyncInterval.Duration)
return &KubeClient{
- Client: kubeClient,
- APISIXClient: apisixKubeClient,
- SharedIndexInformerFactory: factory,
- APISIXSharedIndexInformerFactory: apisixFactory,
+ cfg: cfg,
+ Client: kubeClient,
+ APISIXClient: apisixKubeClient,
}, nil
}
+
+// SharedIndexInformerFactory is the index informer factory object used to watch and
+// list Kubernetes builtin resources.
+func (k *KubeClient) NewSharedIndexInformerFactory() informers.SharedInformerFactory {
+ return informers.NewSharedInformerFactory(k.Client, k.cfg.Kubernetes.ResyncInterval.Duration)
+}
+
+// APISIXSharedIndexInformerFactory is the index informer factory object used to watch
+// and list Kubernetes resources in apisix.apache.org group.
+func (k *KubeClient) NewAPISIXSharedIndexInformerFactory() externalversions.SharedInformerFactory {
+ return externalversions.NewSharedInformerFactory(k.APISIXClient, k.cfg.Kubernetes.ResyncInterval.Duration)
+}