You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by to...@apache.org on 2021/05/26 08:40:04 UTC

[apisix-ingress-controller] branch master updated: chore: add health check to apisix-admin and make the leader election recyclable (#499)

This is an automated email from the ASF dual-hosted git repository.

tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 582c4b3  chore: add health check to apisix-admin and make the leader election recyclable (#499)
582c4b3 is described below

commit 582c4b362f26ffa8372bf520c3f774170a56c290
Author: Alex Zhang <zc...@gmail.com>
AuthorDate: Wed May 26 16:39:58 2021 +0800

    chore: add health check to apisix-admin and make the leader election recyclable (#499)
    
    Co-authored-by: Liu Peng <vs...@gmail.com>
---
 pkg/apisix/apisix.go                          |   6 +-
 pkg/apisix/cluster.go                         | 122 ++++++++++++++++-----
 pkg/apisix/nonexistentclient.go               |   4 +
 pkg/ingress/apisix_cluster_config.go          |   3 +-
 pkg/ingress/apisix_route.go                   |   3 +-
 pkg/ingress/apisix_tls.go                     |   3 +-
 pkg/ingress/apisix_upstream.go                |   3 +-
 pkg/ingress/controller.go                     | 151 +++++++++++++++++---------
 pkg/ingress/endpoint.go                       |   2 +-
 pkg/ingress/ingress.go                        |   2 +-
 pkg/ingress/secret.go                         |   2 +-
 pkg/kube/apisix/apis/config/v2alpha1/types.go |   4 +
 pkg/kube/init.go                              |  29 +++--
 13 files changed, 235 insertions(+), 99 deletions(-)

diff --git a/pkg/apisix/apisix.go b/pkg/apisix/apisix.go
index ffcb405..4200e0a 100644
--- a/pkg/apisix/apisix.go
+++ b/pkg/apisix/apisix.go
@@ -50,6 +50,8 @@ type Cluster interface {
 	String() string
 	// HasSynced checks whether all resources in APISIX cluster is synced to cache.
 	HasSynced(context.Context) error
+	// HealthCheck checks apisix cluster health in realtime.
+	HealthCheck(context.Context) error
 }
 
 // Route is the specific client interface to take over the create, update,
@@ -122,6 +124,7 @@ type apisix struct {
 func NewClient() (APISIX, error) {
 	cli := &apisix{
 		nonExistentCluster: newNonExistentCluster(),
+		clusters:           make(map[string]Cluster),
 	}
 	return cli, nil
 }
@@ -160,9 +163,6 @@ func (c *apisix) AddCluster(co *ClusterOptions) error {
 	if err != nil {
 		return err
 	}
-	if c.clusters == nil {
-		c.clusters = make(map[string]Cluster)
-	}
 	c.clusters[co.Name] = cluster
 	return nil
 }
diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
index d41e5dd..4d42d1e 100644
--- a/pkg/apisix/cluster.go
+++ b/pkg/apisix/cluster.go
@@ -21,7 +21,9 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
+	"net"
 	"net/http"
+	"net/url"
 	"strings"
 	"sync/atomic"
 	"time"
@@ -37,8 +39,7 @@ import (
 const (
 	_defaultTimeout = 5 * time.Second
 
-	_cacheNotSync = iota
-	_cacheSyncing
+	_cacheSyncing = iota
 	_cacheSynced
 )
 
@@ -50,6 +51,19 @@ var (
 	ErrDuplicatedCluster = errors.New("duplicated cluster")
 
 	_errReadOnClosedResBody = errors.New("http: read on closed response body")
+
+	// Default shared transport for apisix client
+	_defaultTransport = &http.Transport{
+		Proxy: http.ProxyFromEnvironment,
+		Dial: (&net.Dialer{
+			Timeout: 3 * time.Second,
+		}).Dial,
+		DialContext: (&net.Dialer{
+			Timeout: 3 * time.Second,
+		}).DialContext,
+		ResponseHeaderTimeout: 30 * time.Second,
+		ExpectContinueTimeout: 1 * time.Second,
+	}
 )
 
 // ClusterOptions contains parameters to customize APISIX client.
@@ -63,6 +77,7 @@ type ClusterOptions struct {
 type cluster struct {
 	name         string
 	baseURL      string
+	baseURLHost  string
 	adminKey     string
 	cli          *http.Client
 	cacheState   int32
@@ -86,19 +101,21 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
 	}
 	o.BaseURL = strings.TrimSuffix(o.BaseURL, "/")
 
+	u, err := url.Parse(o.BaseURL)
+	if err != nil {
+		return nil, err
+	}
+
 	c := &cluster{
-		name:     o.Name,
-		baseURL:  o.BaseURL,
-		adminKey: o.AdminKey,
+		name:        o.Name,
+		baseURL:     o.BaseURL,
+		baseURLHost: u.Host,
+		adminKey:    o.AdminKey,
 		cli: &http.Client{
-			Timeout: o.Timeout,
-			Transport: &http.Transport{
-				ResponseHeaderTimeout: o.Timeout,
-				ExpectContinueTimeout: o.Timeout,
-			},
+			Timeout:   o.Timeout,
+			Transport: _defaultTransport,
 		},
-		cache:       nil,
-		cacheState:  _cacheNotSync,
+		cacheState:  _cacheSyncing, // default state
 		cacheSynced: make(chan struct{}),
 	}
 	c.route = newRouteClient(c)
@@ -108,6 +125,11 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
 	c.globalRules = newGlobalRuleClient(c)
 	c.consumer = newConsumerClient(c)
 
+	c.cache, err = cache.NewMemDBCache()
+	if err != nil {
+		return nil, err
+	}
+
 	go c.syncCache()
 
 	return c, nil
@@ -116,9 +138,6 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
 func (c *cluster) syncCache() {
 	log.Infow("syncing cache", zap.String("cluster", c.name))
 	now := time.Now()
-	if !atomic.CompareAndSwapInt32(&c.cacheState, _cacheNotSync, _cacheSyncing) {
-		panic("dubious state when sync cache")
-	}
 	defer func() {
 		if c.cacheSyncErr == nil {
 			log.Infow("cache synced",
@@ -134,13 +153,20 @@ func (c *cluster) syncCache() {
 	}()
 
 	backoff := wait.Backoff{
-		Duration: time.Second,
-		Factor:   2,
-		Steps:    6,
-	}
-	err := wait.ExponentialBackoff(backoff, c.syncCacheOnce)
+		Duration: 2 * time.Second,
+		Factor:   1,
+		Steps:    5,
+	}
+	var lastSyncErr error
+	err := wait.ExponentialBackoff(backoff, func() (done bool, _ error) {
+		// impossibly return: false, nil
+		// so can safe used
+		done, lastSyncErr = c.syncCacheOnce()
+		return
+	})
 	if err != nil {
-		c.cacheSyncErr = err
+		// if ErrWaitTimeout then set lastSyncErr
+		c.cacheSyncErr = lastSyncErr
 	}
 	close(c.cacheSynced)
 
@@ -150,12 +176,6 @@ func (c *cluster) syncCache() {
 }
 
 func (c *cluster) syncCacheOnce() (bool, error) {
-	dbcache, err := cache.NewMemDBCache()
-	if err != nil {
-		return false, err
-	}
-	c.cache = dbcache
-
 	routes, err := c.route.List(context.TODO())
 	if err != nil {
 		log.Errorf("failed to list route in APISIX: %s", err)
@@ -306,6 +326,54 @@ func (c *cluster) GlobalRule() GlobalRule {
 	return c.globalRules
 }
 
+// HealthCheck implements Cluster.HealthCheck method.
+func (c *cluster) HealthCheck(ctx context.Context) (err error) {
+	if c.cacheSyncErr != nil {
+		err = c.cacheSyncErr
+		return
+	}
+	if atomic.LoadInt32(&c.cacheState) == _cacheSyncing {
+		return
+	}
+
+	// Retry three times in a row, and exit if all of them fail.
+	backoff := wait.Backoff{
+		Duration: 5 * time.Second,
+		Factor:   1,
+		Steps:    3,
+	}
+	var lastCheckErr error
+	err = wait.ExponentialBackoffWithContext(ctx, backoff, func() (done bool, _ error) {
+		if lastCheckErr = c.healthCheck(ctx); lastCheckErr != nil {
+			log.Warnf("failed to check health for cluster %s: %s, will retry", c.name, lastCheckErr)
+			return
+		}
+		done = true
+		return
+	})
+	if err != nil {
+		// if ErrWaitTimeout then set lastSyncErr
+		c.cacheSyncErr = lastCheckErr
+	}
+	return err
+}
+
+func (c *cluster) healthCheck(ctx context.Context) (err error) {
+	// tcp socket probe
+	d := net.Dialer{Timeout: 3 * time.Second}
+	conn, err := d.DialContext(ctx, "tcp", c.baseURLHost)
+	if err != nil {
+		return err
+	}
+	if er := conn.Close(); er != nil {
+		log.Warnw("failed to close tcp probe connection",
+			zap.Error(err),
+			zap.String("cluster", c.name),
+		)
+	}
+	return
+}
+
 func (c *cluster) applyAuth(req *http.Request) {
 	if c.adminKey != "" {
 		req.Header.Set("X-API-Key", c.adminKey)
diff --git a/pkg/apisix/nonexistentclient.go b/pkg/apisix/nonexistentclient.go
index 63c12a4..a3c38ee 100644
--- a/pkg/apisix/nonexistentclient.go
+++ b/pkg/apisix/nonexistentclient.go
@@ -180,6 +180,10 @@ func (nc *nonExistentCluster) HasSynced(_ context.Context) error {
 	return nil
 }
 
+func (nc *nonExistentCluster) HealthCheck(_ context.Context) error {
+	return nil
+}
+
 func (nc *nonExistentCluster) String() string {
 	return "non-existent cluster"
 }
diff --git a/pkg/ingress/apisix_cluster_config.go b/pkg/ingress/apisix_cluster_config.go
index c275a3c..5df2e5d 100644
--- a/pkg/ingress/apisix_cluster_config.go
+++ b/pkg/ingress/apisix_cluster_config.go
@@ -54,6 +54,8 @@ func (c *Controller) newApisixClusterConfigController() *apisixClusterConfigCont
 func (c *apisixClusterConfigController) run(ctx context.Context) {
 	log.Info("ApisixClusterConfig controller started")
 	defer log.Info("ApisixClusterConfig controller exited")
+	defer c.workqueue.ShutDown()
+
 	if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixClusterConfigInformer.HasSynced); !ok {
 		log.Error("cache sync failed")
 		return
@@ -62,7 +64,6 @@ func (c *apisixClusterConfigController) run(ctx context.Context) {
 		go c.runWorker(ctx)
 	}
 	<-ctx.Done()
-	c.workqueue.ShutDown()
 }
 
 func (c *apisixClusterConfigController) runWorker(ctx context.Context) {
diff --git a/pkg/ingress/apisix_route.go b/pkg/ingress/apisix_route.go
index a374626..1172c73 100644
--- a/pkg/ingress/apisix_route.go
+++ b/pkg/ingress/apisix_route.go
@@ -56,6 +56,8 @@ func (c *Controller) newApisixRouteController() *apisixRouteController {
 func (c *apisixRouteController) run(ctx context.Context) {
 	log.Info("ApisixRoute controller started")
 	defer log.Info("ApisixRoute controller exited")
+	defer c.workqueue.ShutDown()
+
 	ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixRouteInformer.HasSynced)
 	if !ok {
 		log.Error("cache sync failed")
@@ -66,7 +68,6 @@ func (c *apisixRouteController) run(ctx context.Context) {
 		go c.runWorker(ctx)
 	}
 	<-ctx.Done()
-	c.workqueue.ShutDown()
 }
 
 func (c *apisixRouteController) runWorker(ctx context.Context) {
diff --git a/pkg/ingress/apisix_tls.go b/pkg/ingress/apisix_tls.go
index 169eb1d..643d37b 100644
--- a/pkg/ingress/apisix_tls.go
+++ b/pkg/ingress/apisix_tls.go
@@ -57,6 +57,8 @@ func (c *Controller) newApisixTlsController() *apisixTlsController {
 func (c *apisixTlsController) run(ctx context.Context) {
 	log.Info("ApisixTls controller started")
 	defer log.Info("ApisixTls controller exited")
+	defer c.workqueue.ShutDown()
+
 	if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixTlsInformer.HasSynced, c.controller.secretInformer.HasSynced); !ok {
 		log.Errorf("informers sync failed")
 		return
@@ -66,7 +68,6 @@ func (c *apisixTlsController) run(ctx context.Context) {
 	}
 
 	<-ctx.Done()
-	c.workqueue.ShutDown()
 }
 
 func (c *apisixTlsController) runWorker(ctx context.Context) {
diff --git a/pkg/ingress/apisix_upstream.go b/pkg/ingress/apisix_upstream.go
index a05d9c1..4ba0162 100644
--- a/pkg/ingress/apisix_upstream.go
+++ b/pkg/ingress/apisix_upstream.go
@@ -57,6 +57,8 @@ func (c *Controller) newApisixUpstreamController() *apisixUpstreamController {
 func (c *apisixUpstreamController) run(ctx context.Context) {
 	log.Info("ApisixUpstream controller started")
 	defer log.Info("ApisixUpstream controller exited")
+	defer c.workqueue.ShutDown()
+
 	if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixUpstreamInformer.HasSynced, c.controller.svcInformer.HasSynced); !ok {
 		log.Error("cache sync failed")
 		return
@@ -66,7 +68,6 @@ func (c *apisixUpstreamController) run(ctx context.Context) {
 	}
 
 	<-ctx.Done()
-	c.workqueue.ShutDown()
 }
 
 func (c *apisixUpstreamController) runWorker(ctx context.Context) {
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index f573564..9914c02 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -135,9 +135,7 @@ func NewController(cfg *config.Config) (*Controller, error) {
 	}
 
 	var (
-		watchingNamespace   map[string]struct{}
-		ingressInformer     cache.SharedIndexInformer
-		apisixRouteInformer cache.SharedIndexInformer
+		watchingNamespace map[string]struct{}
 	)
 	if len(cfg.Kubernetes.AppNamespaces) > 1 || cfg.Kubernetes.AppNamespaces[0] != v1.NamespaceAll {
 		watchingNamespace = make(map[string]struct{}, len(cfg.Kubernetes.AppNamespaces))
@@ -146,27 +144,6 @@ func NewController(cfg *config.Config) (*Controller, error) {
 		}
 	}
 
-	ingressLister := kube.NewIngressLister(
-		kubeClient.SharedIndexInformerFactory.Networking().V1().Ingresses().Lister(),
-		kubeClient.SharedIndexInformerFactory.Networking().V1beta1().Ingresses().Lister(),
-		kubeClient.SharedIndexInformerFactory.Extensions().V1beta1().Ingresses().Lister(),
-	)
-	apisixRouteLister := kube.NewApisixRouteLister(kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixRoutes().Lister(),
-		kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixRoutes().Lister())
-
-	if cfg.Kubernetes.IngressVersion == config.IngressNetworkingV1 {
-		ingressInformer = kubeClient.SharedIndexInformerFactory.Networking().V1().Ingresses().Informer()
-	} else if cfg.Kubernetes.IngressVersion == config.IngressNetworkingV1beta1 {
-		ingressInformer = kubeClient.SharedIndexInformerFactory.Networking().V1beta1().Ingresses().Informer()
-	} else {
-		ingressInformer = kubeClient.SharedIndexInformerFactory.Extensions().V1beta1().Ingresses().Informer()
-	}
-	if cfg.Kubernetes.ApisixRouteVersion == config.ApisixRouteV2alpha1 {
-		apisixRouteInformer = kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixRoutes().Informer()
-	} else {
-		apisixRouteInformer = kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixRoutes().Informer()
-	}
-
 	// recorder
 	utilruntime.Must(apisixscheme.AddToScheme(scheme.Scheme))
 	eventBroadcaster := record.NewBroadcaster()
@@ -183,24 +160,35 @@ func NewController(cfg *config.Config) (*Controller, error) {
 		watchingNamespace: watchingNamespace,
 		secretSSLMap:      new(sync.Map),
 		recorder:          eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}),
-
-		epInformer:                  kubeClient.SharedIndexInformerFactory.Core().V1().Endpoints().Informer(),
-		epLister:                    kubeClient.SharedIndexInformerFactory.Core().V1().Endpoints().Lister(),
-		svcInformer:                 kubeClient.SharedIndexInformerFactory.Core().V1().Services().Informer(),
-		svcLister:                   kubeClient.SharedIndexInformerFactory.Core().V1().Services().Lister(),
-		ingressLister:               ingressLister,
-		ingressInformer:             ingressInformer,
-		secretInformer:              kubeClient.SharedIndexInformerFactory.Core().V1().Secrets().Informer(),
-		secretLister:                kubeClient.SharedIndexInformerFactory.Core().V1().Secrets().Lister(),
-		apisixRouteInformer:         apisixRouteInformer,
-		apisixRouteLister:           apisixRouteLister,
-		apisixUpstreamInformer:      kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixUpstreams().Informer(),
-		apisixUpstreamLister:        kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixUpstreams().Lister(),
-		apisixTlsInformer:           kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixTlses().Informer(),
-		apisixTlsLister:             kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixTlses().Lister(),
-		apisixClusterConfigInformer: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Informer(),
-		apisixClusterConfigLister:   kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Lister(),
 	}
+	return c, nil
+}
+
+func (c *Controller) initWhenStartLeading() {
+	var (
+		ingressInformer     cache.SharedIndexInformer
+		apisixRouteInformer cache.SharedIndexInformer
+	)
+
+	kubeFactory := c.kubeClient.NewSharedIndexInformerFactory()
+	apisixFactory := c.kubeClient.NewAPISIXSharedIndexInformerFactory()
+
+	c.epLister = kubeFactory.Core().V1().Endpoints().Lister()
+	c.svcLister = kubeFactory.Core().V1().Services().Lister()
+	c.ingressLister = kube.NewIngressLister(
+		kubeFactory.Networking().V1().Ingresses().Lister(),
+		kubeFactory.Networking().V1beta1().Ingresses().Lister(),
+		kubeFactory.Extensions().V1beta1().Ingresses().Lister(),
+	)
+	c.secretLister = kubeFactory.Core().V1().Secrets().Lister()
+	c.apisixRouteLister = kube.NewApisixRouteLister(
+		apisixFactory.Apisix().V1().ApisixRoutes().Lister(),
+		apisixFactory.Apisix().V2alpha1().ApisixRoutes().Lister(),
+	)
+	c.apisixUpstreamLister = apisixFactory.Apisix().V1().ApisixUpstreams().Lister()
+	c.apisixTlsLister = apisixFactory.Apisix().V1().ApisixTlses().Lister()
+	c.apisixClusterConfigLister = apisixFactory.Apisix().V2alpha1().ApisixClusterConfigs().Lister()
+
 	c.translator = translation.NewTranslator(&translation.TranslatorOptions{
 		EndpointsLister:      c.epLister,
 		ServiceLister:        c.svcLister,
@@ -208,15 +196,35 @@ func NewController(cfg *config.Config) (*Controller, error) {
 		SecretLister:         c.secretLister,
 	})
 
+	if c.cfg.Kubernetes.IngressVersion == config.IngressNetworkingV1 {
+		ingressInformer = kubeFactory.Networking().V1().Ingresses().Informer()
+	} else if c.cfg.Kubernetes.IngressVersion == config.IngressNetworkingV1beta1 {
+		ingressInformer = kubeFactory.Networking().V1beta1().Ingresses().Informer()
+	} else {
+		ingressInformer = kubeFactory.Extensions().V1beta1().Ingresses().Informer()
+	}
+	if c.cfg.Kubernetes.ApisixRouteVersion == config.ApisixRouteV2alpha1 {
+		apisixRouteInformer = apisixFactory.Apisix().V2alpha1().ApisixRoutes().Informer()
+	} else {
+		apisixRouteInformer = apisixFactory.Apisix().V1().ApisixRoutes().Informer()
+	}
+
+	c.epInformer = kubeFactory.Core().V1().Endpoints().Informer()
+	c.svcInformer = kubeFactory.Core().V1().Services().Informer()
+	c.ingressInformer = ingressInformer
+	c.apisixRouteInformer = apisixRouteInformer
+	c.apisixUpstreamInformer = apisixFactory.Apisix().V1().ApisixUpstreams().Informer()
+	c.apisixClusterConfigInformer = apisixFactory.Apisix().V2alpha1().ApisixClusterConfigs().Informer()
+	c.secretInformer = kubeFactory.Core().V1().Secrets().Informer()
+	c.apisixTlsInformer = apisixFactory.Apisix().V1().ApisixTlses().Informer()
+
 	c.endpointsController = c.newEndpointsController()
 	c.apisixUpstreamController = c.newApisixUpstreamController()
+	c.ingressController = c.newIngressController()
 	c.apisixRouteController = c.newApisixRouteController()
 	c.apisixClusterConfigController = c.newApisixClusterConfigController()
 	c.apisixTlsController = c.newApisixTlsController()
-	c.ingressController = c.newIngressController()
 	c.secretController = c.newSecretController()
-
-	return c, nil
 }
 
 // recorderEvent recorder events for resources
@@ -320,30 +328,47 @@ election:
 }
 
 func (c *Controller) run(ctx context.Context) {
-	log.Infow("controller now is running as leader",
+	log.Infow("controller tries to leading ...",
 		zap.String("namespace", c.namespace),
 		zap.String("pod", c.name),
 	)
+
+	var cancelFunc context.CancelFunc
+	ctx, cancelFunc = context.WithCancel(ctx)
+	defer cancelFunc()
+
+	// give up leader
 	defer c.leaderContextCancelFunc()
-	c.metricsCollector.ResetLeader(true)
 
-	err := c.apisix.AddCluster(&apisix.ClusterOptions{
+	clusterOpts := &apisix.ClusterOptions{
 		Name:     c.cfg.APISIX.DefaultClusterName,
 		AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
 		BaseURL:  c.cfg.APISIX.DefaultClusterBaseURL,
-	})
+	}
+	err := c.apisix.AddCluster(clusterOpts)
 	if err != nil && err != apisix.ErrDuplicatedCluster {
-		// TODO give up the leader role.
+		// TODO give up the leader role
 		log.Errorf("failed to add default cluster: %s", err)
 		return
 	}
 
 	if err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {
-		// TODO give up the leader role.
+		// TODO give up the leader role
 		log.Errorf("failed to wait the default cluster to be ready: %s", err)
+
+		// re-create apisix cluster, used in next c.run
+		if err = c.apisix.UpdateCluster(clusterOpts); err != nil {
+			log.Errorf("failed to update default cluster: %s", err)
+			return
+		}
 		return
 	}
 
+	c.initWhenStartLeading()
+
+	c.goAttach(func() {
+		c.checkClusterHealth(ctx, cancelFunc)
+	})
 	c.goAttach(func() {
 		c.epInformer.Run(ctx.Done())
 	})
@@ -390,6 +415,13 @@ func (c *Controller) run(ctx context.Context) {
 		c.secretController.run(ctx)
 	})
 
+	c.metricsCollector.ResetLeader(true)
+
+	log.Infow("controller now is running as leader",
+		zap.String("namespace", c.namespace),
+		zap.String("pod", c.name),
+	)
+
 	<-ctx.Done()
 	c.wg.Wait()
 }
@@ -426,3 +458,22 @@ func (c *Controller) syncSSL(ctx context.Context, ssl *apisixv1.Ssl, event types
 	}
 	return err
 }
+
+func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context.CancelFunc) {
+	defer cancelFunc()
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case <-time.After(5 * time.Second):
+		}
+
+		err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HealthCheck(ctx)
+		if err != nil {
+			// Finally failed health check, then give up leader.
+			log.Warnf("failed to check health for default cluster: %s, give up leader", err)
+			return
+		}
+		log.Debugf("success check health for default cluster")
+	}
+}
diff --git a/pkg/ingress/endpoint.go b/pkg/ingress/endpoint.go
index fbf64f3..cfcea7f 100644
--- a/pkg/ingress/endpoint.go
+++ b/pkg/ingress/endpoint.go
@@ -58,6 +58,7 @@ func (c *Controller) newEndpointsController() *endpointsController {
 func (c *endpointsController) run(ctx context.Context) {
 	log.Info("endpoints controller started")
 	defer log.Info("endpoints controller exited")
+	defer c.workqueue.ShutDown()
 
 	if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.epInformer.HasSynced); !ok {
 		log.Error("informers sync failed")
@@ -82,7 +83,6 @@ func (c *endpointsController) run(ctx context.Context) {
 	}
 
 	<-ctx.Done()
-	c.workqueue.ShutDown()
 }
 
 func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error {
diff --git a/pkg/ingress/ingress.go b/pkg/ingress/ingress.go
index 73077b7..6167e7d 100644
--- a/pkg/ingress/ingress.go
+++ b/pkg/ingress/ingress.go
@@ -57,6 +57,7 @@ func (c *Controller) newIngressController() *ingressController {
 func (c *ingressController) run(ctx context.Context) {
 	log.Info("ingress controller started")
 	defer log.Infof("ingress controller exited")
+	defer c.workqueue.ShutDown()
 
 	if !cache.WaitForCacheSync(ctx.Done(), c.controller.ingressInformer.HasSynced) {
 		log.Errorf("cache sync failed")
@@ -66,7 +67,6 @@ func (c *ingressController) run(ctx context.Context) {
 		go c.runWorker(ctx)
 	}
 	<-ctx.Done()
-	c.workqueue.ShutDown()
 }
 
 func (c *ingressController) runWorker(ctx context.Context) {
diff --git a/pkg/ingress/secret.go b/pkg/ingress/secret.go
index 143eddc..8b3aec2 100644
--- a/pkg/ingress/secret.go
+++ b/pkg/ingress/secret.go
@@ -59,6 +59,7 @@ func (c *Controller) newSecretController() *secretController {
 func (c *secretController) run(ctx context.Context) {
 	log.Info("secret controller started")
 	defer log.Info("secret controller exited")
+	defer c.workqueue.ShutDown()
 
 	if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.secretInformer.HasSynced); !ok {
 		log.Error("informers sync failed")
@@ -70,7 +71,6 @@ func (c *secretController) run(ctx context.Context) {
 	}
 
 	<-ctx.Done()
-	c.workqueue.ShutDown()
 }
 
 func (c *secretController) runWorker(ctx context.Context) {
diff --git a/pkg/kube/apisix/apis/config/v2alpha1/types.go b/pkg/kube/apisix/apis/config/v2alpha1/types.go
index 08b4a8b..1fdf482 100644
--- a/pkg/kube/apisix/apis/config/v2alpha1/types.go
+++ b/pkg/kube/apisix/apis/config/v2alpha1/types.go
@@ -20,6 +20,8 @@ import (
 	corev1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/util/intstr"
+
+	"github.com/apache/apisix-ingress-controller/pkg/types"
 )
 
 const (
@@ -299,6 +301,8 @@ type ApisixClusterAdminConfig struct {
 	BaseURL string `json:"baseURL" yaml:"baseURL"`
 	// AdminKey is used to verify the admin API user.
 	AdminKey string `json:"adminKey" yaml:"adminKey"`
+	// ClientTimeout is request timeout for the APISIX Admin API client
+	ClientTimeout types.TimeDuration `json:"clientTimeout" yaml:"clientTimeout"`
 }
 
 // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
diff --git a/pkg/kube/init.go b/pkg/kube/init.go
index d7b21e9..55af64e 100644
--- a/pkg/kube/init.go
+++ b/pkg/kube/init.go
@@ -25,16 +25,12 @@ import (
 
 // KubeClient contains some objects used to communicate with Kubernetes API Server.
 type KubeClient struct {
+	cfg *config.Config
+
 	// Client is the object used to operate Kubernetes builtin resources.
 	Client kubernetes.Interface
 	// APISIXClient is the object used to operate resources under apisix.apache.org group.
 	APISIXClient clientset.Interface
-	// SharedIndexInformerFactory is the index informer factory object used to watch and
-	// list Kubernetes builtin resources.
-	SharedIndexInformerFactory informers.SharedInformerFactory
-	// APISIXSharedIndexInformerFactory is the index informer factory object used to watch
-	// and list Kubernetes resources in apisix.apache.org group.
-	APISIXSharedIndexInformerFactory externalversions.SharedInformerFactory
 }
 
 // NewKubeClient creates a high-level Kubernetes client.
@@ -52,13 +48,22 @@ func NewKubeClient(cfg *config.Config) (*KubeClient, error) {
 	if err != nil {
 		return nil, err
 	}
-	factory := informers.NewSharedInformerFactory(kubeClient, cfg.Kubernetes.ResyncInterval.Duration)
-	apisixFactory := externalversions.NewSharedInformerFactory(apisixKubeClient, cfg.Kubernetes.ResyncInterval.Duration)
 
 	return &KubeClient{
-		Client:                           kubeClient,
-		APISIXClient:                     apisixKubeClient,
-		SharedIndexInformerFactory:       factory,
-		APISIXSharedIndexInformerFactory: apisixFactory,
+		cfg:          cfg,
+		Client:       kubeClient,
+		APISIXClient: apisixKubeClient,
 	}, nil
 }
+
+// SharedIndexInformerFactory is the index informer factory object used to watch and
+// list Kubernetes builtin resources.
+func (k *KubeClient) NewSharedIndexInformerFactory() informers.SharedInformerFactory {
+	return informers.NewSharedInformerFactory(k.Client, k.cfg.Kubernetes.ResyncInterval.Duration)
+}
+
+// APISIXSharedIndexInformerFactory is the index informer factory object used to watch
+// and list Kubernetes resources in apisix.apache.org group.
+func (k *KubeClient) NewAPISIXSharedIndexInformerFactory() externalversions.SharedInformerFactory {
+	return externalversions.NewSharedInformerFactory(k.APISIXClient, k.cfg.Kubernetes.ResyncInterval.Duration)
+}