You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by kv...@apache.org on 2021/02/08 06:32:34 UTC
[apisix-ingress-controller] branch master updated: chore: optimized
the log for waiting cache sync, and other event details (#240)
This is an automated email from the ASF dual-hosted git repository.
kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new fe0edaf chore: optimized the log for waiting cache sync, and other event details (#240)
fe0edaf is described below
commit fe0edaf7f3e8f8ffb5808c9f2e0860ee6fae295a
Author: Alex Zhang <zc...@gmail.com>
AuthorDate: Mon Feb 8 14:32:26 2021 +0800
chore: optimized the log for waiting cache sync, and other event details (#240)
---
Dockerfile | 2 +-
pkg/apisix/cluster.go | 19 +++++++++++++++++++
pkg/apisix/resource.go | 8 ++++----
pkg/apisix/route.go | 18 +++++++++---------
pkg/apisix/service.go | 18 +++++++++---------
pkg/apisix/ssl.go | 16 ++++++++--------
pkg/apisix/upstream.go | 16 ++++++++--------
7 files changed, 58 insertions(+), 39 deletions(-)
diff --git a/Dockerfile b/Dockerfile
index a5ab138..2aaee3e 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -23,7 +23,7 @@ RUN rm -rf /etc/localtime \
WORKDIR /build
COPY . .
-RUN GOPROXY=https://goproxy.io,direct make build
+RUN GOPROXY=https://goproxy.cn,direct make build
FROM centos:centos7
LABEL maintainer="gxthrj@163.com"
diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
index 017011e..bd739c9 100644
--- a/pkg/apisix/cluster.go
+++ b/pkg/apisix/cluster.go
@@ -23,6 +23,7 @@ import (
"io/ioutil"
"net/http"
"strings"
+ "sync/atomic"
"time"
"go.uber.org/multierr"
@@ -35,6 +36,10 @@ import (
const (
_defaultTimeout = 5 * time.Second
+
+ _cacheNotSync = iota
+ _cacheSyncing
+ _cacheSynced
)
var (
@@ -60,6 +65,7 @@ type cluster struct {
baseURL string
adminKey string
cli *http.Client
+ cacheState int32
cache cache.Cache
cacheSynced chan struct{}
cacheSyncErr error
@@ -90,6 +96,7 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
},
},
cache: nil,
+ cacheState: _cacheNotSync,
cacheSynced: make(chan struct{}),
}
c.route = newRouteClient(c)
@@ -105,6 +112,9 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
func (c *cluster) syncCache() {
log.Infow("syncing cache", zap.String("cluster", c.name))
now := time.Now()
+ if !atomic.CompareAndSwapInt32(&c.cacheState, _cacheNotSync, _cacheSyncing) {
+ panic("dubious state when sync cache")
+ }
defer func() {
if c.cacheSyncErr == nil {
log.Infow("cache synced",
@@ -129,6 +139,10 @@ func (c *cluster) syncCache() {
c.cacheSyncErr = err
}
close(c.cacheSynced)
+
+ if !atomic.CompareAndSwapInt32(&c.cacheState, _cacheSyncing, _cacheSynced) {
+ panic("dubious state when sync cache")
+ }
}
func (c *cluster) syncCacheOnce() (bool, error) {
@@ -212,6 +226,11 @@ func (c *cluster) HasSynced(ctx context.Context) error {
if c.cacheSyncErr != nil {
return c.cacheSyncErr
}
+ if atomic.LoadInt32(&c.cacheState) == _cacheSynced {
+ return nil
+ }
+
+ // still in sync
now := time.Now()
log.Warnf("waiting cluster %s to ready, it may takes a while", c.name)
select {
diff --git a/pkg/apisix/resource.go b/pkg/apisix/resource.go
index cb0f3a3..7cbb97c 100644
--- a/pkg/apisix/resource.go
+++ b/pkg/apisix/resource.go
@@ -84,7 +84,7 @@ type routeItem struct {
// route decodes item.Value and converts it to v1.Route.
func (i *item) route(clusterName string) (*v1.Route, error) {
- log.Infof("got route: %s", string(i.Value))
+ log.Debugf("got route: %s", string(i.Value))
list := strings.Split(i.Key, "/")
if len(list) < 1 {
return nil, fmt.Errorf("bad route config key: %s", i.Key)
@@ -115,7 +115,7 @@ func (i *item) route(clusterName string) (*v1.Route, error) {
// upstream decodes item.Value and converts it to v1.Upstream.
func (i *item) upstream(clusterName string) (*v1.Upstream, error) {
- log.Infof("got upstream: %s", string(i.Value))
+ log.Debugf("got upstream: %s", string(i.Value))
list := strings.Split(i.Key, "/")
if len(list) < 1 {
return nil, fmt.Errorf("bad upstream config key: %s", i.Key)
@@ -153,7 +153,7 @@ func (i *item) upstream(clusterName string) (*v1.Upstream, error) {
// service decodes item.Value and converts it to v1.Service.
func (i *item) service(clusterName string) (*v1.Service, error) {
- log.Infof("got service: %s", string(i.Value))
+ log.Debugf("got service: %s", string(i.Value))
var svc serviceItem
if err := json.Unmarshal(i.Value, &svc); err != nil {
return nil, err
@@ -182,7 +182,7 @@ func (i *item) service(clusterName string) (*v1.Service, error) {
// ssl decodes item.Value and converts it to v1.Ssl.
func (i *item) ssl(clusterName string) (*v1.Ssl, error) {
- log.Infof("got ssl: %s", string(i.Value))
+ log.Debugf("got ssl: %s", string(i.Value))
var ssl v1.Ssl
if err := json.Unmarshal(i.Value, &ssl); err != nil {
return nil, err
diff --git a/pkg/apisix/route.go b/pkg/apisix/route.go
index 08a2fdb..0008959 100644
--- a/pkg/apisix/route.go
+++ b/pkg/apisix/route.go
@@ -53,7 +53,7 @@ func newRouteClient(c *cluster) Route {
// FIXME, currently if caller pass a non-existent resource, the Get always passes
// through cache.
func (r *routeClient) Get(ctx context.Context, fullname string) (*v1.Route, error) {
- log.Infow("try to look up route",
+ log.Debugw("try to look up route",
zap.String("fullname", fullname),
zap.String("url", r.url),
zap.String("cluster", r.clusterName),
@@ -68,7 +68,7 @@ func (r *routeClient) Get(ctx context.Context, fullname string) (*v1.Route, erro
zap.Error(err),
)
} else {
- log.Warnw("failed to find route in cache, will try to lookup from APISIX",
+ log.Debugw("failed to find route in cache, will try to lookup from APISIX",
zap.String("fullname", fullname),
zap.Error(err),
)
@@ -115,7 +115,7 @@ func (r *routeClient) Get(ctx context.Context, fullname string) (*v1.Route, erro
// List is only used in cache warming up. So here just pass through
// to APISIX.
func (r *routeClient) List(ctx context.Context) ([]*v1.Route, error) {
- log.Infow("try to list routes in APISIX",
+ log.Debugw("try to list routes in APISIX",
zap.String("cluster", r.clusterName),
zap.String("url", r.url),
)
@@ -138,14 +138,14 @@ func (r *routeClient) List(ctx context.Context) ([]*v1.Route, error) {
}
items = append(items, route)
- log.Infof("list route #%d, body: %s", i, string(item.Value))
+ log.Debugf("list route #%d, body: %s", i, string(item.Value))
}
return items, nil
}
func (r *routeClient) Create(ctx context.Context, obj *v1.Route) (*v1.Route, error) {
- log.Infow("try to create route",
+ log.Debugw("try to create route",
zap.String("host", obj.Host),
zap.String("fullname", obj.FullName),
zap.String("cluster", r.clusterName),
@@ -168,7 +168,7 @@ func (r *routeClient) Create(ctx context.Context, obj *v1.Route) (*v1.Route, err
}
url := r.url + "/" + obj.ID
- log.Infow("creating route", zap.ByteString("body", data), zap.String("url", url))
+ log.Debugw("creating route", zap.ByteString("body", data), zap.String("url", url))
resp, err := r.cluster.createResource(ctx, url, bytes.NewReader(data))
if err != nil {
log.Errorf("failed to create route: %s", err)
@@ -191,7 +191,7 @@ func (r *routeClient) Create(ctx context.Context, obj *v1.Route) (*v1.Route, err
}
func (r *routeClient) Delete(ctx context.Context, obj *v1.Route) error {
- log.Infow("try to delete route",
+ log.Debugw("try to delete route",
zap.String("id", obj.ID),
zap.String("fullname", obj.FullName),
zap.String("cluster", r.clusterName),
@@ -212,7 +212,7 @@ func (r *routeClient) Delete(ctx context.Context, obj *v1.Route) error {
}
func (r *routeClient) Update(ctx context.Context, obj *v1.Route) (*v1.Route, error) {
- log.Infow("try to update route",
+ log.Debugw("try to update route",
zap.String("id", obj.ID),
zap.String("fullname", obj.FullName),
zap.String("cluster", r.clusterName),
@@ -232,7 +232,7 @@ func (r *routeClient) Update(ctx context.Context, obj *v1.Route) (*v1.Route, err
return nil, err
}
url := r.url + "/" + obj.ID
- log.Infow("updating route", zap.ByteString("body", body), zap.String("url", r.url))
+ log.Debugw("updating route", zap.ByteString("body", body), zap.String("url", r.url))
resp, err := r.cluster.updateResource(ctx, url, bytes.NewReader(body))
if err != nil {
return nil, err
diff --git a/pkg/apisix/service.go b/pkg/apisix/service.go
index 3a283c9..a16d67a 100644
--- a/pkg/apisix/service.go
+++ b/pkg/apisix/service.go
@@ -49,7 +49,7 @@ func newServiceClient(c *cluster) Service {
}
func (s *serviceClient) Get(ctx context.Context, fullname string) (*v1.Service, error) {
- log.Infow("try to look up service",
+ log.Debugw("try to look up service",
zap.String("fullname", fullname),
zap.String("url", s.url),
zap.String("cluster", s.clusterName),
@@ -64,7 +64,7 @@ func (s *serviceClient) Get(ctx context.Context, fullname string) (*v1.Service,
zap.Error(err),
)
} else {
- log.Warnw("failed to find service in cache, will try to look up from APISIX",
+ log.Debugw("failed to find service in cache, will try to look up from APISIX",
zap.String("fullname", fullname),
zap.Error(err),
)
@@ -111,7 +111,7 @@ func (s *serviceClient) Get(ctx context.Context, fullname string) (*v1.Service,
// List is only used in cache warming up. So here just pass through
// to APISIX.
func (s *serviceClient) List(ctx context.Context) ([]*v1.Service, error) {
- log.Infow("try to list services in APISIX",
+ log.Debugw("try to list services in APISIX",
zap.String("url", s.url),
zap.String("cluster", s.clusterName),
)
@@ -134,13 +134,13 @@ func (s *serviceClient) List(ctx context.Context) ([]*v1.Service, error) {
return nil, err
}
items = append(items, svc)
- log.Infof("list service #%d, body: %s", i, string(item.Value))
+ log.Debugf("list service #%d, body: %s", i, string(item.Value))
}
return items, nil
}
func (s *serviceClient) Create(ctx context.Context, obj *v1.Service) (*v1.Service, error) {
- log.Infow("try to create service",
+ log.Debugw("try to create service",
zap.String("fullname", obj.FullName),
zap.String("cluster", s.clusterName),
zap.String("url", s.url),
@@ -159,7 +159,7 @@ func (s *serviceClient) Create(ctx context.Context, obj *v1.Service) (*v1.Servic
}
url := s.url + "/" + obj.ID
- log.Infow("creating service", zap.ByteString("body", body), zap.String("url", url))
+ log.Debugw("creating service", zap.ByteString("body", body), zap.String("url", url))
resp, err := s.cluster.createResource(ctx, url, bytes.NewReader(body))
if err != nil {
log.Errorf("failed to create service: %s", err)
@@ -181,7 +181,7 @@ func (s *serviceClient) Create(ctx context.Context, obj *v1.Service) (*v1.Servic
}
func (s *serviceClient) Delete(ctx context.Context, obj *v1.Service) error {
- log.Infow("try to delete service",
+ log.Debugw("try to delete service",
zap.String("id", obj.ID),
zap.String("fullname", obj.FullName),
zap.String("cluster", s.clusterName),
@@ -202,7 +202,7 @@ func (s *serviceClient) Delete(ctx context.Context, obj *v1.Service) error {
}
func (s *serviceClient) Update(ctx context.Context, obj *v1.Service) (*v1.Service, error) {
- log.Infow("try to update service",
+ log.Debugw("try to update service",
zap.String("id", obj.ID),
zap.String("fullname", obj.FullName),
zap.String("cluster", s.clusterName),
@@ -223,7 +223,7 @@ func (s *serviceClient) Update(ctx context.Context, obj *v1.Service) (*v1.Servic
}
url := s.url + "/" + obj.ID
- log.Infow("creating service", zap.ByteString("body", body), zap.String("url", url))
+ log.Debugw("creating service", zap.ByteString("body", body), zap.String("url", url))
resp, err := s.cluster.updateResource(ctx, url, bytes.NewReader(body))
if err != nil {
return nil, err
diff --git a/pkg/apisix/ssl.go b/pkg/apisix/ssl.go
index 21ad5ef..367aaa4 100644
--- a/pkg/apisix/ssl.go
+++ b/pkg/apisix/ssl.go
@@ -43,7 +43,7 @@ func newSSLClient(c *cluster) SSL {
}
func (s *sslClient) Get(ctx context.Context, fullname string) (*v1.Ssl, error) {
- log.Infow("try to look up ssl",
+ log.Debugw("try to look up ssl",
zap.String("fullname", fullname),
zap.String("url", s.url),
zap.String("cluster", s.clusterName),
@@ -59,7 +59,7 @@ func (s *sslClient) Get(ctx context.Context, fullname string) (*v1.Ssl, error) {
zap.Error(err),
)
} else {
- log.Warnw("failed to find ssl in cache, will try to lookup from APISIX",
+ log.Debugw("failed to find ssl in cache, will try to lookup from APISIX",
zap.String("fullname", fullname),
zap.Error(err),
)
@@ -105,7 +105,7 @@ func (s *sslClient) Get(ctx context.Context, fullname string) (*v1.Ssl, error) {
// List is only used in cache warming up. So here just pass through
// to APISIX.
func (s *sslClient) List(ctx context.Context) ([]*v1.Ssl, error) {
- log.Infow("try to list ssl in APISIX",
+ log.Debugw("try to list ssl in APISIX",
zap.String("url", s.url),
zap.String("cluster", s.clusterName),
)
@@ -135,7 +135,7 @@ func (s *sslClient) List(ctx context.Context) ([]*v1.Ssl, error) {
}
func (s *sslClient) Create(ctx context.Context, obj *v1.Ssl) (*v1.Ssl, error) {
- log.Infow("try to create ssl",
+ log.Debugw("try to create ssl",
zap.String("cluster", s.clusterName),
zap.String("url", s.url),
zap.String("id", obj.ID),
@@ -153,7 +153,7 @@ func (s *sslClient) Create(ctx context.Context, obj *v1.Ssl) (*v1.Ssl, error) {
return nil, err
}
url := s.url + "/" + obj.ID
- log.Infow("creating ssl", zap.ByteString("body", data), zap.String("url", url))
+ log.Debugw("creating ssl", zap.ByteString("body", data), zap.String("url", url))
resp, err := s.cluster.createResource(ctx, url, bytes.NewReader(data))
if err != nil {
log.Errorf("failed to create ssl: %s", err)
@@ -177,7 +177,7 @@ func (s *sslClient) Create(ctx context.Context, obj *v1.Ssl) (*v1.Ssl, error) {
}
func (s *sslClient) Delete(ctx context.Context, obj *v1.Ssl) error {
- log.Infow("try to delete ssl",
+ log.Debugw("try to delete ssl",
zap.String("id", obj.ID),
zap.String("cluster", s.clusterName),
zap.String("url", s.url),
@@ -197,7 +197,7 @@ func (s *sslClient) Delete(ctx context.Context, obj *v1.Ssl) error {
}
func (s *sslClient) Update(ctx context.Context, obj *v1.Ssl) (*v1.Ssl, error) {
- log.Infow("try to update ssl",
+ log.Debugw("try to update ssl",
zap.String("id", obj.ID),
zap.String("cluster", s.clusterName),
zap.String("url", s.url),
@@ -216,7 +216,7 @@ func (s *sslClient) Update(ctx context.Context, obj *v1.Ssl) (*v1.Ssl, error) {
if err != nil {
return nil, err
}
- log.Infow("updating ssl", zap.ByteString("body", data), zap.String("url", url))
+ log.Debugw("updating ssl", zap.ByteString("body", data), zap.String("url", url))
resp, err := s.cluster.updateResource(ctx, url, bytes.NewReader(data))
if err != nil {
return nil, err
diff --git a/pkg/apisix/upstream.go b/pkg/apisix/upstream.go
index ba6503b..a339f14 100644
--- a/pkg/apisix/upstream.go
+++ b/pkg/apisix/upstream.go
@@ -86,7 +86,7 @@ func newUpstreamClient(c *cluster) Upstream {
}
func (u *upstreamClient) Get(ctx context.Context, fullname string) (*v1.Upstream, error) {
- log.Infow("try to look up upstream",
+ log.Debugw("try to look up upstream",
zap.String("fullname", fullname),
zap.String("url", u.url),
zap.String("cluster", u.clusterName),
@@ -101,7 +101,7 @@ func (u *upstreamClient) Get(ctx context.Context, fullname string) (*v1.Upstream
zap.Error(err),
)
} else {
- log.Warnw("failed to find upstream in cache, will try to lookup from APISIX",
+ log.Debugw("failed to find upstream in cache, will try to lookup from APISIX",
zap.String("fullname", fullname),
zap.Error(err),
)
@@ -148,7 +148,7 @@ func (u *upstreamClient) Get(ctx context.Context, fullname string) (*v1.Upstream
// List is only used in cache warming up. So here just pass through
// to APISIX.
func (u *upstreamClient) List(ctx context.Context) ([]*v1.Upstream, error) {
- log.Infow("try to list upstreams in APISIX",
+ log.Debugw("try to list upstreams in APISIX",
zap.String("url", u.url),
zap.String("cluster", u.clusterName),
)
@@ -177,7 +177,7 @@ func (u *upstreamClient) List(ctx context.Context) ([]*v1.Upstream, error) {
}
func (u *upstreamClient) Create(ctx context.Context, obj *v1.Upstream) (*v1.Upstream, error) {
- log.Infow("try to create upstream",
+ log.Debugw("try to create upstream",
zap.String("fullname", obj.FullName),
zap.String("url", u.url),
zap.String("cluster", u.clusterName),
@@ -207,7 +207,7 @@ func (u *upstreamClient) Create(ctx context.Context, obj *v1.Upstream) (*v1.Upst
return nil, err
}
url := u.url + "/" + obj.ID
- log.Infow("creating upstream", zap.ByteString("body", body), zap.String("url", url))
+ log.Debugw("creating upstream", zap.ByteString("body", body), zap.String("url", url))
resp, err := u.cluster.createResource(ctx, url, bytes.NewReader(body))
if err != nil {
@@ -230,7 +230,7 @@ func (u *upstreamClient) Create(ctx context.Context, obj *v1.Upstream) (*v1.Upst
}
func (u *upstreamClient) Delete(ctx context.Context, obj *v1.Upstream) error {
- log.Infow("try to delete upstream",
+ log.Debugw("try to delete upstream",
zap.String("id", obj.ID),
zap.String("fullname", obj.FullName),
zap.String("cluster", u.clusterName),
@@ -252,7 +252,7 @@ func (u *upstreamClient) Delete(ctx context.Context, obj *v1.Upstream) error {
}
func (u *upstreamClient) Update(ctx context.Context, obj *v1.Upstream) (*v1.Upstream, error) {
- log.Infow("try to update upstream",
+ log.Debugw("try to update upstream",
zap.String("id", obj.ID),
zap.String("fullname", obj.FullName),
zap.String("cluster", u.clusterName),
@@ -283,7 +283,7 @@ func (u *upstreamClient) Update(ctx context.Context, obj *v1.Upstream) (*v1.Upst
}
url := u.url + "/" + obj.ID
- log.Infow("updating upstream", zap.ByteString("body", body), zap.String("url", url))
+ log.Debugw("updating upstream", zap.ByteString("body", body), zap.String("url", url))
resp, err := u.cluster.updateResource(ctx, url, bytes.NewReader(body))
if err != nil {
return nil, err