You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by GitBox <gi...@apache.org> on 2021/05/16 01:25:19 UTC

[GitHub] [apisix-ingress-controller] tokers commented on a change in pull request #453: fix: panic of start leading. sync ingress failed when apisix not start.

tokers commented on a change in pull request #453:
URL: https://github.com/apache/apisix-ingress-controller/pull/453#discussion_r633025408



##########
File path: pkg/apisix/cluster.go
##########
@@ -290,6 +303,39 @@ func (c *cluster) GlobalRule() GlobalRule {
 	return c.globalRules
 }
 
+// HealthCheck implements Cluster.HealthCheck method.
+func (c *cluster) HealthCheck(ctx context.Context, backoff wait.Backoff) (err error) {
+	if c.cacheSyncErr != nil {
+		err = c.cacheSyncErr
+		return
+	}
+	if atomic.LoadInt32(&c.cacheState) == _cacheSyncing {
+		return
+	}
+	var lastCheckErr error
+	err = wait.ExponentialBackoffWithContext(ctx, backoff, func() (done bool, _ error) {
+		if lastCheckErr = c.healthCheck(ctx); lastCheckErr != nil {
+			log.Warnf("failed to HealthCheck for cluster %s: %s, will retry", c.name, lastCheckErr)
+			return
+		}
+		done = true
+		return
+	})
+	if err != nil {
+		// if ErrWaitTimeout then set lastSyncErr
+		c.cacheSyncErr = lastCheckErr
+	}
+	return err
+}
+
+func (c *cluster) healthCheck(ctx context.Context) (err error) {
+	// TODO

Review comment:
       Just use TCP socket probe is OK.

##########
File path: pkg/ingress/controller.go
##########
@@ -423,3 +449,26 @@ func (c *Controller) syncSSL(ctx context.Context, ssl *apisixv1.Ssl, event types
 	}
 	return err
 }
+
+func (c *Controller) checkClusterHealthy(ctx context.Context) {
+	for {
+		select {
+		case <-ctx.Done():
+		case <-time.After(5 * time.Second):
+		}
+
+		// Retry three times in a row, and exit if all of them fail.
+		backoff := wait.Backoff{
+			Duration: 5 * time.Second,
+			Factor:   1,
+			Steps:    3,
+		}
+		err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HealthCheck(ctx, backoff)
+		if err != nil {
+			// Finally failed health check, then give up leader.
+			c.leaderContextCancelFunc()
+			log.Warnf("failed to HealthCheck for default cluster: %s, give up leader", err)

Review comment:
       ```suggestion
   			log.Warnf("failed to check health for default cluster: %s, give up leader", err)
   ```

##########
File path: pkg/kube/init.go
##########
@@ -25,16 +25,12 @@ import (
 
 // KubeClient contains some objects used to communicate with Kubernetes API Server.
 type KubeClient struct {
+	cfg *config.Config
+
 	// Client is the object used to operate Kubernetes builtin resources.
 	Client kubernetes.Interface
 	// APISIXClient is the object used to operate resources under apisix.apache.org group.
 	APISIXClient clientset.Interface
-	// SharedIndexInformerFactory is the index informer factory object used to watch and

Review comment:
       Why change this?

##########
File path: pkg/config/config.go
##########
@@ -85,6 +85,8 @@ type APISIXConfig struct {
 	// DefaultClusterAdminKey is the admin key for the default cluster.
 	// TODO: Obsolete the plain way to specify admin_key, which is insecure.
 	DefaultClusterAdminKey string `json:"default_cluster_admin_key" yaml:"default_cluster_admin_key"`
+	// DefaultClusterClientTimeout is the request timeout for default cluster client.
+	DefaultClusterClientTimeout types.TimeDuration `json:"default_cluster_client_timeout" yaml:"default_cluster_client_timeout"`

Review comment:
       We will introduce `ApisixClusterConfig` to set these options, so don't add it here.

##########
File path: pkg/apisix/cluster.go
##########
@@ -50,6 +50,19 @@ var (
 	ErrDuplicatedCluster = errors.New("duplicated cluster")
 
 	_errReadOnClosedResBody = errors.New("http: read on closed response body")
+
+	// Default shared transport if apisix client
+	defaultTransport = &http.Transport{

Review comment:
       ```suggestion
   	_defaultTransport = &http.Transport{
   ```

##########
File path: pkg/apisix/apisix.go
##########
@@ -19,6 +19,7 @@ import (
 	"sync"
 
 	v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+	"k8s.io/apimachinery/pkg/util/wait"

Review comment:
       By convention, we put the 3-party package at the middle of `import`.
   
   ```go
   import (
   	std
   	......
   
   	"k8s.io/apimachinery/pkg/util/wait"
   
   	v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
   )
   ```

##########
File path: pkg/ingress/apisix_cluster_config.go
##########
@@ -62,7 +64,6 @@ func (c *apisixClusterConfigController) run(ctx context.Context) {
 		go c.runWorker(ctx)
 	}
 	<-ctx.Done()
-	c.workqueue.ShutDown()

Review comment:
       It's not related to PR change. If you wanna change them to use `defer`, just in another PR.

##########
File path: pkg/ingress/controller.go
##########
@@ -317,30 +326,42 @@ election:
 }
 
 func (c *Controller) run(ctx context.Context) {
-	log.Infow("controller now is running as leader",
+	log.Infow("controller is start leading ...",

Review comment:
       ```suggestion
   	log.Infow("controller tries to leading ...",
   ```

##########
File path: pkg/ingress/controller.go
##########
@@ -317,30 +326,42 @@ election:
 }
 
 func (c *Controller) run(ctx context.Context) {
-	log.Infow("controller now is running as leader",
+	log.Infow("controller is start leading ...",
 		zap.String("namespace", c.namespace),
 		zap.String("pod", c.name),
 	)
+
 	defer c.leaderContextCancelFunc()
 	c.metricsCollector.ResetLeader(true)
 
-	err := c.apisix.AddCluster(&apisix.ClusterOptions{
+	clusterOpts := &apisix.ClusterOptions{
 		Name:     c.cfg.APISIX.DefaultClusterName,
 		AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
 		BaseURL:  c.cfg.APISIX.DefaultClusterBaseURL,
-	})
+		Timeout:  c.cfg.APISIX.DefaultClusterClientTimeout.Duration,
+	}
+	err := c.apisix.AddCluster(clusterOpts)
 	if err != nil && err != apisix.ErrDuplicatedCluster {
-		// TODO give up the leader role.

Review comment:
       Don't remove the TODO, unless you really achieve this.

##########
File path: pkg/apisix/nonexistentclient.go
##########
@@ -20,6 +20,7 @@ import (
 
 	"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
 	v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+	"k8s.io/apimachinery/pkg/util/wait"

Review comment:
       Ditto with the `import` convention.

##########
File path: pkg/ingress/controller.go
##########
@@ -317,30 +326,42 @@ election:
 }
 
 func (c *Controller) run(ctx context.Context) {
-	log.Infow("controller now is running as leader",
+	log.Infow("controller is start leading ...",
 		zap.String("namespace", c.namespace),
 		zap.String("pod", c.name),
 	)
+
 	defer c.leaderContextCancelFunc()
 	c.metricsCollector.ResetLeader(true)
 
-	err := c.apisix.AddCluster(&apisix.ClusterOptions{
+	clusterOpts := &apisix.ClusterOptions{
 		Name:     c.cfg.APISIX.DefaultClusterName,
 		AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
 		BaseURL:  c.cfg.APISIX.DefaultClusterBaseURL,
-	})
+		Timeout:  c.cfg.APISIX.DefaultClusterClientTimeout.Duration,
+	}
+	err := c.apisix.AddCluster(clusterOpts)
 	if err != nil && err != apisix.ErrDuplicatedCluster {
-		// TODO give up the leader role.
 		log.Errorf("failed to add default cluster: %s", err)
 		return
 	}
 
 	if err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {
-		// TODO give up the leader role.

Review comment:
       Ditto.

##########
File path: pkg/ingress/controller.go
##########
@@ -180,40 +158,71 @@ func NewController(cfg *config.Config) (*Controller, error) {
 		watchingNamespace: watchingNamespace,
 		secretSSLMap:      new(sync.Map),
 		recorder:          eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}),
-
-		epInformer:                  kubeClient.SharedIndexInformerFactory.Core().V1().Endpoints().Informer(),
-		epLister:                    kubeClient.SharedIndexInformerFactory.Core().V1().Endpoints().Lister(),
-		svcInformer:                 kubeClient.SharedIndexInformerFactory.Core().V1().Services().Informer(),
-		svcLister:                   kubeClient.SharedIndexInformerFactory.Core().V1().Services().Lister(),
-		ingressLister:               ingressLister,
-		ingressInformer:             ingressInformer,
-		secretInformer:              kubeClient.SharedIndexInformerFactory.Core().V1().Secrets().Informer(),
-		secretLister:                kubeClient.SharedIndexInformerFactory.Core().V1().Secrets().Lister(),
-		apisixRouteInformer:         apisixRouteInformer,
-		apisixRouteLister:           apisixRouteLister,
-		apisixUpstreamInformer:      kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixUpstreams().Informer(),
-		apisixUpstreamLister:        kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixUpstreams().Lister(),
-		apisixTlsInformer:           kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixTlses().Informer(),
-		apisixTlsLister:             kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixTlses().Lister(),
-		apisixClusterConfigInformer: kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Informer(),
-		apisixClusterConfigLister:   kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Lister(),
 	}
+	return c, nil
+}
+
+func (c *Controller) initWhenStartLeader() {

Review comment:
       ```suggestion
   func (c *Controller) initWhenStartLeading() {
   ```

##########
File path: pkg/kube/apisix/apis/config/v2alpha1/types.go
##########
@@ -17,6 +17,7 @@ package v2alpha1
 import (
 	"encoding/json"
 
+	"github.com/apache/apisix-ingress-controller/pkg/types"

Review comment:
       Put our own packages at the bottom of `import`.

##########
File path: pkg/ingress/controller.go
##########
@@ -423,3 +449,26 @@ func (c *Controller) syncSSL(ctx context.Context, ssl *apisixv1.Ssl, event types
 	}
 	return err
 }
+
+func (c *Controller) checkClusterHealthy(ctx context.Context) {

Review comment:
       ```suggestion
   func (c *Controller) checkClusterHealth(ctx context.Context) {
   ```

##########
File path: pkg/apisix/apisix.go
##########
@@ -50,6 +51,8 @@ type Cluster interface {
 	String() string
 	// HasSynced checks whether all resources in APISIX cluster is synced to cache.
 	HasSynced(context.Context) error
+	// HealthCheck checks apisix cluster health in realtime.
+	HealthCheck(context.Context, wait.Backoff) error

Review comment:
       IMHO We shall not accept a `wait.Backoff` object from the caller. What we try to do is hide some details and just tell the caller whether the cluster is healthy or not.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org