You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by kv...@apache.org on 2021/02/08 06:32:34 UTC

[apisix-ingress-controller] branch master updated: chore: optimized the log for waiting cache sync, and other event details (#240)

This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new fe0edaf  chore: optimized the log for waiting cache sync, and other event details (#240)
fe0edaf is described below

commit fe0edaf7f3e8f8ffb5808c9f2e0860ee6fae295a
Author: Alex Zhang <zc...@gmail.com>
AuthorDate: Mon Feb 8 14:32:26 2021 +0800

    chore: optimized the log for waiting cache sync, and other event details (#240)
---
 Dockerfile             |  2 +-
 pkg/apisix/cluster.go  | 19 +++++++++++++++++++
 pkg/apisix/resource.go |  8 ++++----
 pkg/apisix/route.go    | 18 +++++++++---------
 pkg/apisix/service.go  | 18 +++++++++---------
 pkg/apisix/ssl.go      | 16 ++++++++--------
 pkg/apisix/upstream.go | 16 ++++++++--------
 7 files changed, 58 insertions(+), 39 deletions(-)

diff --git a/Dockerfile b/Dockerfile
index a5ab138..2aaee3e 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -23,7 +23,7 @@ RUN rm -rf /etc/localtime \
 
 WORKDIR /build
 COPY . .
-RUN GOPROXY=https://goproxy.io,direct make build
+RUN GOPROXY=https://goproxy.cn,direct make build
 
 FROM centos:centos7
 LABEL maintainer="gxthrj@163.com"
diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
index 017011e..bd739c9 100644
--- a/pkg/apisix/cluster.go
+++ b/pkg/apisix/cluster.go
@@ -23,6 +23,7 @@ import (
 	"io/ioutil"
 	"net/http"
 	"strings"
+	"sync/atomic"
 	"time"
 
 	"go.uber.org/multierr"
@@ -35,6 +36,10 @@ import (
 
 const (
 	_defaultTimeout = 5 * time.Second
+
+	_cacheNotSync = iota
+	_cacheSyncing
+	_cacheSynced
 )
 
 var (
@@ -60,6 +65,7 @@ type cluster struct {
 	baseURL      string
 	adminKey     string
 	cli          *http.Client
+	cacheState   int32
 	cache        cache.Cache
 	cacheSynced  chan struct{}
 	cacheSyncErr error
@@ -90,6 +96,7 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
 			},
 		},
 		cache:       nil,
+		cacheState:  _cacheNotSync,
 		cacheSynced: make(chan struct{}),
 	}
 	c.route = newRouteClient(c)
@@ -105,6 +112,9 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
 func (c *cluster) syncCache() {
 	log.Infow("syncing cache", zap.String("cluster", c.name))
 	now := time.Now()
+	if !atomic.CompareAndSwapInt32(&c.cacheState, _cacheNotSync, _cacheSyncing) {
+		panic("dubious state when sync cache")
+	}
 	defer func() {
 		if c.cacheSyncErr == nil {
 			log.Infow("cache synced",
@@ -129,6 +139,10 @@ func (c *cluster) syncCache() {
 		c.cacheSyncErr = err
 	}
 	close(c.cacheSynced)
+
+	if !atomic.CompareAndSwapInt32(&c.cacheState, _cacheSyncing, _cacheSynced) {
+		panic("dubious state when sync cache")
+	}
 }
 
 func (c *cluster) syncCacheOnce() (bool, error) {
@@ -212,6 +226,11 @@ func (c *cluster) HasSynced(ctx context.Context) error {
 	if c.cacheSyncErr != nil {
 		return c.cacheSyncErr
 	}
+	if atomic.LoadInt32(&c.cacheState) == _cacheSynced {
+		return nil
+	}
+
+	// still in sync
 	now := time.Now()
 	log.Warnf("waiting cluster %s to ready, it may takes a while", c.name)
 	select {
diff --git a/pkg/apisix/resource.go b/pkg/apisix/resource.go
index cb0f3a3..7cbb97c 100644
--- a/pkg/apisix/resource.go
+++ b/pkg/apisix/resource.go
@@ -84,7 +84,7 @@ type routeItem struct {
 
 // route decodes item.Value and converts it to v1.Route.
 func (i *item) route(clusterName string) (*v1.Route, error) {
-	log.Infof("got route: %s", string(i.Value))
+	log.Debugf("got route: %s", string(i.Value))
 	list := strings.Split(i.Key, "/")
 	if len(list) < 1 {
 		return nil, fmt.Errorf("bad route config key: %s", i.Key)
@@ -115,7 +115,7 @@ func (i *item) route(clusterName string) (*v1.Route, error) {
 
 // upstream decodes item.Value and converts it to v1.Upstream.
 func (i *item) upstream(clusterName string) (*v1.Upstream, error) {
-	log.Infof("got upstream: %s", string(i.Value))
+	log.Debugf("got upstream: %s", string(i.Value))
 	list := strings.Split(i.Key, "/")
 	if len(list) < 1 {
 		return nil, fmt.Errorf("bad upstream config key: %s", i.Key)
@@ -153,7 +153,7 @@ func (i *item) upstream(clusterName string) (*v1.Upstream, error) {
 
 // service decodes item.Value and converts it to v1.Service.
 func (i *item) service(clusterName string) (*v1.Service, error) {
-	log.Infof("got service: %s", string(i.Value))
+	log.Debugf("got service: %s", string(i.Value))
 	var svc serviceItem
 	if err := json.Unmarshal(i.Value, &svc); err != nil {
 		return nil, err
@@ -182,7 +182,7 @@ func (i *item) service(clusterName string) (*v1.Service, error) {
 
 // ssl decodes item.Value and converts it to v1.Ssl.
 func (i *item) ssl(clusterName string) (*v1.Ssl, error) {
-	log.Infof("got ssl: %s", string(i.Value))
+	log.Debugf("got ssl: %s", string(i.Value))
 	var ssl v1.Ssl
 	if err := json.Unmarshal(i.Value, &ssl); err != nil {
 		return nil, err
diff --git a/pkg/apisix/route.go b/pkg/apisix/route.go
index 08a2fdb..0008959 100644
--- a/pkg/apisix/route.go
+++ b/pkg/apisix/route.go
@@ -53,7 +53,7 @@ func newRouteClient(c *cluster) Route {
 // FIXME, currently if caller pass a non-existent resource, the Get always passes
 // through cache.
 func (r *routeClient) Get(ctx context.Context, fullname string) (*v1.Route, error) {
-	log.Infow("try to look up route",
+	log.Debugw("try to look up route",
 		zap.String("fullname", fullname),
 		zap.String("url", r.url),
 		zap.String("cluster", r.clusterName),
@@ -68,7 +68,7 @@ func (r *routeClient) Get(ctx context.Context, fullname string) (*v1.Route, erro
 			zap.Error(err),
 		)
 	} else {
-		log.Warnw("failed to find route in cache, will try to lookup from APISIX",
+		log.Debugw("failed to find route in cache, will try to lookup from APISIX",
 			zap.String("fullname", fullname),
 			zap.Error(err),
 		)
@@ -115,7 +115,7 @@ func (r *routeClient) Get(ctx context.Context, fullname string) (*v1.Route, erro
 // List is only used in cache warming up. So here just pass through
 // to APISIX.
 func (r *routeClient) List(ctx context.Context) ([]*v1.Route, error) {
-	log.Infow("try to list routes in APISIX",
+	log.Debugw("try to list routes in APISIX",
 		zap.String("cluster", r.clusterName),
 		zap.String("url", r.url),
 	)
@@ -138,14 +138,14 @@ func (r *routeClient) List(ctx context.Context) ([]*v1.Route, error) {
 		}
 
 		items = append(items, route)
-		log.Infof("list route #%d, body: %s", i, string(item.Value))
+		log.Debugf("list route #%d, body: %s", i, string(item.Value))
 	}
 
 	return items, nil
 }
 
 func (r *routeClient) Create(ctx context.Context, obj *v1.Route) (*v1.Route, error) {
-	log.Infow("try to create route",
+	log.Debugw("try to create route",
 		zap.String("host", obj.Host),
 		zap.String("fullname", obj.FullName),
 		zap.String("cluster", r.clusterName),
@@ -168,7 +168,7 @@ func (r *routeClient) Create(ctx context.Context, obj *v1.Route) (*v1.Route, err
 	}
 
 	url := r.url + "/" + obj.ID
-	log.Infow("creating route", zap.ByteString("body", data), zap.String("url", url))
+	log.Debugw("creating route", zap.ByteString("body", data), zap.String("url", url))
 	resp, err := r.cluster.createResource(ctx, url, bytes.NewReader(data))
 	if err != nil {
 		log.Errorf("failed to create route: %s", err)
@@ -191,7 +191,7 @@ func (r *routeClient) Create(ctx context.Context, obj *v1.Route) (*v1.Route, err
 }
 
 func (r *routeClient) Delete(ctx context.Context, obj *v1.Route) error {
-	log.Infow("try to delete route",
+	log.Debugw("try to delete route",
 		zap.String("id", obj.ID),
 		zap.String("fullname", obj.FullName),
 		zap.String("cluster", r.clusterName),
@@ -212,7 +212,7 @@ func (r *routeClient) Delete(ctx context.Context, obj *v1.Route) error {
 }
 
 func (r *routeClient) Update(ctx context.Context, obj *v1.Route) (*v1.Route, error) {
-	log.Infow("try to update route",
+	log.Debugw("try to update route",
 		zap.String("id", obj.ID),
 		zap.String("fullname", obj.FullName),
 		zap.String("cluster", r.clusterName),
@@ -232,7 +232,7 @@ func (r *routeClient) Update(ctx context.Context, obj *v1.Route) (*v1.Route, err
 		return nil, err
 	}
 	url := r.url + "/" + obj.ID
-	log.Infow("updating route", zap.ByteString("body", body), zap.String("url", r.url))
+	log.Debugw("updating route", zap.ByteString("body", body), zap.String("url", r.url))
 	resp, err := r.cluster.updateResource(ctx, url, bytes.NewReader(body))
 	if err != nil {
 		return nil, err
diff --git a/pkg/apisix/service.go b/pkg/apisix/service.go
index 3a283c9..a16d67a 100644
--- a/pkg/apisix/service.go
+++ b/pkg/apisix/service.go
@@ -49,7 +49,7 @@ func newServiceClient(c *cluster) Service {
 }
 
 func (s *serviceClient) Get(ctx context.Context, fullname string) (*v1.Service, error) {
-	log.Infow("try to look up service",
+	log.Debugw("try to look up service",
 		zap.String("fullname", fullname),
 		zap.String("url", s.url),
 		zap.String("cluster", s.clusterName),
@@ -64,7 +64,7 @@ func (s *serviceClient) Get(ctx context.Context, fullname string) (*v1.Service,
 			zap.Error(err),
 		)
 	} else {
-		log.Warnw("failed to find service in cache, will try to look up from APISIX",
+		log.Debugw("failed to find service in cache, will try to look up from APISIX",
 			zap.String("fullname", fullname),
 			zap.Error(err),
 		)
@@ -111,7 +111,7 @@ func (s *serviceClient) Get(ctx context.Context, fullname string) (*v1.Service,
 // List is only used in cache warming up. So here just pass through
 // to APISIX.
 func (s *serviceClient) List(ctx context.Context) ([]*v1.Service, error) {
-	log.Infow("try to list services in APISIX",
+	log.Debugw("try to list services in APISIX",
 		zap.String("url", s.url),
 		zap.String("cluster", s.clusterName),
 	)
@@ -134,13 +134,13 @@ func (s *serviceClient) List(ctx context.Context) ([]*v1.Service, error) {
 			return nil, err
 		}
 		items = append(items, svc)
-		log.Infof("list service #%d, body: %s", i, string(item.Value))
+		log.Debugf("list service #%d, body: %s", i, string(item.Value))
 	}
 	return items, nil
 }
 
 func (s *serviceClient) Create(ctx context.Context, obj *v1.Service) (*v1.Service, error) {
-	log.Infow("try to create service",
+	log.Debugw("try to create service",
 		zap.String("fullname", obj.FullName),
 		zap.String("cluster", s.clusterName),
 		zap.String("url", s.url),
@@ -159,7 +159,7 @@ func (s *serviceClient) Create(ctx context.Context, obj *v1.Service) (*v1.Servic
 	}
 
 	url := s.url + "/" + obj.ID
-	log.Infow("creating service", zap.ByteString("body", body), zap.String("url", url))
+	log.Debugw("creating service", zap.ByteString("body", body), zap.String("url", url))
 	resp, err := s.cluster.createResource(ctx, url, bytes.NewReader(body))
 	if err != nil {
 		log.Errorf("failed to create service: %s", err)
@@ -181,7 +181,7 @@ func (s *serviceClient) Create(ctx context.Context, obj *v1.Service) (*v1.Servic
 }
 
 func (s *serviceClient) Delete(ctx context.Context, obj *v1.Service) error {
-	log.Infow("try to delete service",
+	log.Debugw("try to delete service",
 		zap.String("id", obj.ID),
 		zap.String("fullname", obj.FullName),
 		zap.String("cluster", s.clusterName),
@@ -202,7 +202,7 @@ func (s *serviceClient) Delete(ctx context.Context, obj *v1.Service) error {
 }
 
 func (s *serviceClient) Update(ctx context.Context, obj *v1.Service) (*v1.Service, error) {
-	log.Infow("try to update service",
+	log.Debugw("try to update service",
 		zap.String("id", obj.ID),
 		zap.String("fullname", obj.FullName),
 		zap.String("cluster", s.clusterName),
@@ -223,7 +223,7 @@ func (s *serviceClient) Update(ctx context.Context, obj *v1.Service) (*v1.Servic
 	}
 
 	url := s.url + "/" + obj.ID
-	log.Infow("creating service", zap.ByteString("body", body), zap.String("url", url))
+	log.Debugw("creating service", zap.ByteString("body", body), zap.String("url", url))
 	resp, err := s.cluster.updateResource(ctx, url, bytes.NewReader(body))
 	if err != nil {
 		return nil, err
diff --git a/pkg/apisix/ssl.go b/pkg/apisix/ssl.go
index 21ad5ef..367aaa4 100644
--- a/pkg/apisix/ssl.go
+++ b/pkg/apisix/ssl.go
@@ -43,7 +43,7 @@ func newSSLClient(c *cluster) SSL {
 }
 
 func (s *sslClient) Get(ctx context.Context, fullname string) (*v1.Ssl, error) {
-	log.Infow("try to look up ssl",
+	log.Debugw("try to look up ssl",
 		zap.String("fullname", fullname),
 		zap.String("url", s.url),
 		zap.String("cluster", s.clusterName),
@@ -59,7 +59,7 @@ func (s *sslClient) Get(ctx context.Context, fullname string) (*v1.Ssl, error) {
 			zap.Error(err),
 		)
 	} else {
-		log.Warnw("failed to find ssl in cache, will try to lookup from APISIX",
+		log.Debugw("failed to find ssl in cache, will try to lookup from APISIX",
 			zap.String("fullname", fullname),
 			zap.Error(err),
 		)
@@ -105,7 +105,7 @@ func (s *sslClient) Get(ctx context.Context, fullname string) (*v1.Ssl, error) {
 // List is only used in cache warming up. So here just pass through
 // to APISIX.
 func (s *sslClient) List(ctx context.Context) ([]*v1.Ssl, error) {
-	log.Infow("try to list ssl in APISIX",
+	log.Debugw("try to list ssl in APISIX",
 		zap.String("url", s.url),
 		zap.String("cluster", s.clusterName),
 	)
@@ -135,7 +135,7 @@ func (s *sslClient) List(ctx context.Context) ([]*v1.Ssl, error) {
 }
 
 func (s *sslClient) Create(ctx context.Context, obj *v1.Ssl) (*v1.Ssl, error) {
-	log.Infow("try to create ssl",
+	log.Debugw("try to create ssl",
 		zap.String("cluster", s.clusterName),
 		zap.String("url", s.url),
 		zap.String("id", obj.ID),
@@ -153,7 +153,7 @@ func (s *sslClient) Create(ctx context.Context, obj *v1.Ssl) (*v1.Ssl, error) {
 		return nil, err
 	}
 	url := s.url + "/" + obj.ID
-	log.Infow("creating ssl", zap.ByteString("body", data), zap.String("url", url))
+	log.Debugw("creating ssl", zap.ByteString("body", data), zap.String("url", url))
 	resp, err := s.cluster.createResource(ctx, url, bytes.NewReader(data))
 	if err != nil {
 		log.Errorf("failed to create ssl: %s", err)
@@ -177,7 +177,7 @@ func (s *sslClient) Create(ctx context.Context, obj *v1.Ssl) (*v1.Ssl, error) {
 }
 
 func (s *sslClient) Delete(ctx context.Context, obj *v1.Ssl) error {
-	log.Infow("try to delete ssl",
+	log.Debugw("try to delete ssl",
 		zap.String("id", obj.ID),
 		zap.String("cluster", s.clusterName),
 		zap.String("url", s.url),
@@ -197,7 +197,7 @@ func (s *sslClient) Delete(ctx context.Context, obj *v1.Ssl) error {
 }
 
 func (s *sslClient) Update(ctx context.Context, obj *v1.Ssl) (*v1.Ssl, error) {
-	log.Infow("try to update ssl",
+	log.Debugw("try to update ssl",
 		zap.String("id", obj.ID),
 		zap.String("cluster", s.clusterName),
 		zap.String("url", s.url),
@@ -216,7 +216,7 @@ func (s *sslClient) Update(ctx context.Context, obj *v1.Ssl) (*v1.Ssl, error) {
 	if err != nil {
 		return nil, err
 	}
-	log.Infow("updating ssl", zap.ByteString("body", data), zap.String("url", url))
+	log.Debugw("updating ssl", zap.ByteString("body", data), zap.String("url", url))
 	resp, err := s.cluster.updateResource(ctx, url, bytes.NewReader(data))
 	if err != nil {
 		return nil, err
diff --git a/pkg/apisix/upstream.go b/pkg/apisix/upstream.go
index ba6503b..a339f14 100644
--- a/pkg/apisix/upstream.go
+++ b/pkg/apisix/upstream.go
@@ -86,7 +86,7 @@ func newUpstreamClient(c *cluster) Upstream {
 }
 
 func (u *upstreamClient) Get(ctx context.Context, fullname string) (*v1.Upstream, error) {
-	log.Infow("try to look up upstream",
+	log.Debugw("try to look up upstream",
 		zap.String("fullname", fullname),
 		zap.String("url", u.url),
 		zap.String("cluster", u.clusterName),
@@ -101,7 +101,7 @@ func (u *upstreamClient) Get(ctx context.Context, fullname string) (*v1.Upstream
 			zap.Error(err),
 		)
 	} else {
-		log.Warnw("failed to find upstream in cache, will try to lookup from APISIX",
+		log.Debugw("failed to find upstream in cache, will try to lookup from APISIX",
 			zap.String("fullname", fullname),
 			zap.Error(err),
 		)
@@ -148,7 +148,7 @@ func (u *upstreamClient) Get(ctx context.Context, fullname string) (*v1.Upstream
 // List is only used in cache warming up. So here just pass through
 // to APISIX.
 func (u *upstreamClient) List(ctx context.Context) ([]*v1.Upstream, error) {
-	log.Infow("try to list upstreams in APISIX",
+	log.Debugw("try to list upstreams in APISIX",
 		zap.String("url", u.url),
 		zap.String("cluster", u.clusterName),
 	)
@@ -177,7 +177,7 @@ func (u *upstreamClient) List(ctx context.Context) ([]*v1.Upstream, error) {
 }
 
 func (u *upstreamClient) Create(ctx context.Context, obj *v1.Upstream) (*v1.Upstream, error) {
-	log.Infow("try to create upstream",
+	log.Debugw("try to create upstream",
 		zap.String("fullname", obj.FullName),
 		zap.String("url", u.url),
 		zap.String("cluster", u.clusterName),
@@ -207,7 +207,7 @@ func (u *upstreamClient) Create(ctx context.Context, obj *v1.Upstream) (*v1.Upst
 		return nil, err
 	}
 	url := u.url + "/" + obj.ID
-	log.Infow("creating upstream", zap.ByteString("body", body), zap.String("url", url))
+	log.Debugw("creating upstream", zap.ByteString("body", body), zap.String("url", url))
 
 	resp, err := u.cluster.createResource(ctx, url, bytes.NewReader(body))
 	if err != nil {
@@ -230,7 +230,7 @@ func (u *upstreamClient) Create(ctx context.Context, obj *v1.Upstream) (*v1.Upst
 }
 
 func (u *upstreamClient) Delete(ctx context.Context, obj *v1.Upstream) error {
-	log.Infow("try to delete upstream",
+	log.Debugw("try to delete upstream",
 		zap.String("id", obj.ID),
 		zap.String("fullname", obj.FullName),
 		zap.String("cluster", u.clusterName),
@@ -252,7 +252,7 @@ func (u *upstreamClient) Delete(ctx context.Context, obj *v1.Upstream) error {
 }
 
 func (u *upstreamClient) Update(ctx context.Context, obj *v1.Upstream) (*v1.Upstream, error) {
-	log.Infow("try to update upstream",
+	log.Debugw("try to update upstream",
 		zap.String("id", obj.ID),
 		zap.String("fullname", obj.FullName),
 		zap.String("cluster", u.clusterName),
@@ -283,7 +283,7 @@ func (u *upstreamClient) Update(ctx context.Context, obj *v1.Upstream) (*v1.Upst
 	}
 
 	url := u.url + "/" + obj.ID
-	log.Infow("updating upstream", zap.ByteString("body", body), zap.String("url", url))
+	log.Debugw("updating upstream", zap.ByteString("body", body), zap.String("url", url))
 	resp, err := u.cluster.updateResource(ctx, url, bytes.NewReader(body))
 	if err != nil {
 		return nil, err