You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by pc...@apache.org on 2022/06/29 07:50:04 UTC

[camel-k] 02/06: feat(gc): Rate limit Discovery and SelfSubjectRulesReview requests

This is an automated email from the ASF dual-hosted git repository.

pcongiusti pushed a commit to branch release-1.9.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 66ff9e59d4c8a6b74f1da7a3bfd3095f947cce65
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Wed Jun 1 18:46:20 2022 +0200

    feat(gc): Rate limit Discovery and SelfSubjectRulesReview requests
    
    (cherry picked from commit 9d7145e8ab71e8619a205c65d68797084f3d5fb4)
---
 go.mod          |  1 +
 pkg/trait/gc.go | 46 +++++++++++++++++++++++++++++-----------------
 2 files changed, 30 insertions(+), 17 deletions(-)

diff --git a/go.mod b/go.mod
index 2c15a1092..aaf324702 100644
--- a/go.mod
+++ b/go.mod
@@ -45,6 +45,7 @@ require (
 	go.uber.org/zap v1.21.0
 	golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5
 	golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
+	golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
 	gopkg.in/inf.v0 v0.9.1
 	gopkg.in/yaml.v2 v2.4.0
 	k8s.io/api v0.22.5
diff --git a/pkg/trait/gc.go b/pkg/trait/gc.go
index b11802900..6ca3af790 100644
--- a/pkg/trait/gc.go
+++ b/pkg/trait/gc.go
@@ -27,6 +27,8 @@ import (
 	"sync"
 	"time"
 
+	"golang.org/x/time/rate"
+
 	"github.com/apache/camel-k/pkg/util"
 	authorization "k8s.io/api/authorization/v1"
 	k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -44,10 +46,13 @@ import (
 )
 
 var (
-	toFileName                  = regexp.MustCompile(`[^(\w/\.)]`)
-	diskCachedDiscoveryClient   discovery.CachedDiscoveryInterface
-	memoryCachedDiscoveryClient discovery.CachedDiscoveryInterface
-	discoveryClientLock         sync.Mutex
+	toFileName = regexp.MustCompile(`[^(\w/\.)]`)
+
+	lock                  sync.Mutex
+	rateLimiter           = rate.NewLimiter(rate.Every(time.Minute), 1)
+	collectableGVKs       = make(map[schema.GroupVersionKind]struct{})
+	memoryCachedDiscovery discovery.CachedDiscoveryInterface
+	diskCachedDiscovery   discovery.CachedDiscoveryInterface
 )
 
 type discoveryCacheType string
@@ -187,6 +192,15 @@ func (t *garbageCollectorTrait) canBeDeleted(e *Environment, u unstructured.Unst
 }
 
 func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.GroupVersionKind]struct{}, error) {
+	lock.Lock()
+	defer lock.Unlock()
+
+	// Rate limit to avoid Discovery and SelfSubjectRulesReview requests at every reconciliation.
+	if !rateLimiter.Allow() {
+		// Return the cached set of garbage collectable GVKs.
+		return collectableGVKs, nil
+	}
+
 	// We rely on the discovery API to retrieve all the resources GVK,
 	// that results in an unbounded set that can impact garbage collection latency when scaling up.
 	discoveryClient, err := t.discoveryClient()
@@ -196,7 +210,7 @@ func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.Gr
 	resources, err := discoveryClient.ServerPreferredNamespacedResources()
 	// Swallow group discovery errors, e.g., Knative serving exposes
 	// an aggregated API for custom.metrics.k8s.io that requires special
-	// authentication scheme while discovering preferred resources
+	// authentication scheme while discovering preferred resources.
 	if err != nil && !discovery.IsGroupDiscoveryFailedError(err) {
 		return nil, err
 	}
@@ -237,32 +251,30 @@ func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.Gr
 			}
 		}
 	}
+	collectableGVKs = GVKs
 
-	return GVKs, nil
+	return collectableGVKs, nil
 }
 
 func (t *garbageCollectorTrait) discoveryClient() (discovery.DiscoveryInterface, error) {
-	discoveryClientLock.Lock()
-	defer discoveryClientLock.Unlock()
-
 	switch *t.DiscoveryCache {
 	case diskDiscoveryCache:
-		if diskCachedDiscoveryClient != nil {
-			return diskCachedDiscoveryClient, nil
+		if diskCachedDiscovery != nil {
+			return diskCachedDiscovery, nil
 		}
 		config := t.Client.GetConfig()
 		httpCacheDir := filepath.Join(mustHomeDir(), ".kube", "http-cache")
 		diskCacheDir := filepath.Join(mustHomeDir(), ".kube", "cache", "discovery", toHostDir(config.Host))
 		var err error
-		diskCachedDiscoveryClient, err = disk.NewCachedDiscoveryClientForConfig(config, diskCacheDir, httpCacheDir, 10*time.Minute)
-		return diskCachedDiscoveryClient, err
+		diskCachedDiscovery, err = disk.NewCachedDiscoveryClientForConfig(config, diskCacheDir, httpCacheDir, 10*time.Minute)
+		return diskCachedDiscovery, err
 
 	case memoryDiscoveryCache:
-		if memoryCachedDiscoveryClient != nil {
-			return memoryCachedDiscoveryClient, nil
+		if memoryCachedDiscovery != nil {
+			return memoryCachedDiscovery, nil
 		}
-		memoryCachedDiscoveryClient = memory.NewMemCacheClient(t.Client.Discovery())
-		return memoryCachedDiscoveryClient, nil
+		memoryCachedDiscovery = memory.NewMemCacheClient(t.Client.Discovery())
+		return memoryCachedDiscovery, nil
 
 	case disabledDiscoveryCache, "":
 		return t.Client.Discovery(), nil