You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by pc...@apache.org on 2022/06/29 07:50:04 UTC
[camel-k] 02/06: feat(gc): Rate limit Discovery and SelfSubjectRulesReview requests
This is an automated email from the ASF dual-hosted git repository.
pcongiusti pushed a commit to branch release-1.9.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 66ff9e59d4c8a6b74f1da7a3bfd3095f947cce65
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Wed Jun 1 18:46:20 2022 +0200
feat(gc): Rate limit Discovery and SelfSubjectRulesReview requests
(cherry picked from commit 9d7145e8ab71e8619a205c65d68797084f3d5fb4)
---
go.mod | 1 +
pkg/trait/gc.go | 46 +++++++++++++++++++++++++++++-----------------
2 files changed, 30 insertions(+), 17 deletions(-)
diff --git a/go.mod b/go.mod
index 2c15a1092..aaf324702 100644
--- a/go.mod
+++ b/go.mod
@@ -45,6 +45,7 @@ require (
go.uber.org/zap v1.21.0
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
+ golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
gopkg.in/inf.v0 v0.9.1
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.22.5
diff --git a/pkg/trait/gc.go b/pkg/trait/gc.go
index b11802900..6ca3af790 100644
--- a/pkg/trait/gc.go
+++ b/pkg/trait/gc.go
@@ -27,6 +27,8 @@ import (
"sync"
"time"
+ "golang.org/x/time/rate"
+
"github.com/apache/camel-k/pkg/util"
authorization "k8s.io/api/authorization/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -44,10 +46,13 @@ import (
)
var (
- toFileName = regexp.MustCompile(`[^(\w/\.)]`)
- diskCachedDiscoveryClient discovery.CachedDiscoveryInterface
- memoryCachedDiscoveryClient discovery.CachedDiscoveryInterface
- discoveryClientLock sync.Mutex
+ toFileName = regexp.MustCompile(`[^(\w/\.)]`)
+
+ lock sync.Mutex
+ rateLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
+ collectableGVKs = make(map[schema.GroupVersionKind]struct{})
+ memoryCachedDiscovery discovery.CachedDiscoveryInterface
+ diskCachedDiscovery discovery.CachedDiscoveryInterface
)
type discoveryCacheType string
@@ -187,6 +192,15 @@ func (t *garbageCollectorTrait) canBeDeleted(e *Environment, u unstructured.Unst
}
func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.GroupVersionKind]struct{}, error) {
+ lock.Lock()
+ defer lock.Unlock()
+
+ // Rate limit to avoid Discovery and SelfSubjectRulesReview requests at every reconciliation.
+ if !rateLimiter.Allow() {
+ // Return the cached set of garbage collectable GVKs.
+ return collectableGVKs, nil
+ }
+
// We rely on the discovery API to retrieve all the resources GVK,
// that results in an unbounded set that can impact garbage collection latency when scaling up.
discoveryClient, err := t.discoveryClient()
@@ -196,7 +210,7 @@ func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.Gr
resources, err := discoveryClient.ServerPreferredNamespacedResources()
// Swallow group discovery errors, e.g., Knative serving exposes
// an aggregated API for custom.metrics.k8s.io that requires special
- // authentication scheme while discovering preferred resources
+ // authentication scheme while discovering preferred resources.
if err != nil && !discovery.IsGroupDiscoveryFailedError(err) {
return nil, err
}
@@ -237,32 +251,30 @@ func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.Gr
}
}
}
+ collectableGVKs = GVKs
- return GVKs, nil
+ return collectableGVKs, nil
}
func (t *garbageCollectorTrait) discoveryClient() (discovery.DiscoveryInterface, error) {
- discoveryClientLock.Lock()
- defer discoveryClientLock.Unlock()
-
switch *t.DiscoveryCache {
case diskDiscoveryCache:
- if diskCachedDiscoveryClient != nil {
- return diskCachedDiscoveryClient, nil
+ if diskCachedDiscovery != nil {
+ return diskCachedDiscovery, nil
}
config := t.Client.GetConfig()
httpCacheDir := filepath.Join(mustHomeDir(), ".kube", "http-cache")
diskCacheDir := filepath.Join(mustHomeDir(), ".kube", "cache", "discovery", toHostDir(config.Host))
var err error
- diskCachedDiscoveryClient, err = disk.NewCachedDiscoveryClientForConfig(config, diskCacheDir, httpCacheDir, 10*time.Minute)
- return diskCachedDiscoveryClient, err
+ diskCachedDiscovery, err = disk.NewCachedDiscoveryClientForConfig(config, diskCacheDir, httpCacheDir, 10*time.Minute)
+ return diskCachedDiscovery, err
case memoryDiscoveryCache:
- if memoryCachedDiscoveryClient != nil {
- return memoryCachedDiscoveryClient, nil
+ if memoryCachedDiscovery != nil {
+ return memoryCachedDiscovery, nil
}
- memoryCachedDiscoveryClient = memory.NewMemCacheClient(t.Client.Discovery())
- return memoryCachedDiscoveryClient, nil
+ memoryCachedDiscovery = memory.NewMemCacheClient(t.Client.Discovery())
+ return memoryCachedDiscovery, nil
case disabledDiscoveryCache, "":
return t.Client.Discovery(), nil