You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ts...@apache.org on 2022/06/30 04:42:03 UTC

[camel-k] branch release-1.8.x updated (2d8ba4839 -> 8f64c6b53)

This is an automated email from the ASF dual-hosted git repository.

tsato pushed a change to branch release-1.8.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git


    from 2d8ba4839 [TEST] Add kamel reset test
     new f841aa6d9 feat(gc): Use SelfSubjectRulesReview to scan for garbage collectable resources
     new 92ef377dc feat(gc): Rate limit Discovery and SelfSubjectRulesReview requests
     new d5711f644 chore(gc): Fix lint errors
     new 3b5d83595 feat(gc): Skip GC for first integration generation
     new 7cdb9220b fix: Use status change predicate to filter updates on owned resources
     new 8f64c6b53 chore(trait): Add an option to disable SSA in deployer trait

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/modules/traits/pages/deployer.adoc            |   5 +
 go.mod                                             |   1 +
 go.sum                                             |   3 +-
 pkg/cmd/run_test.go                                |   4 +-
 .../integration/integration_controller.go          |   7 +-
 pkg/controller/integration/predicate.go            |  57 +++++++
 pkg/resources/resources.go                         |   4 +-
 pkg/trait/deployer.go                              |   5 +-
 pkg/trait/gc.go                                    | 183 ++++++++++++---------
 pkg/trait/gc_test.go                               |  16 +-
 resources/traits.yaml                              |   5 +
 11 files changed, 201 insertions(+), 89 deletions(-)
 create mode 100644 pkg/controller/integration/predicate.go


[camel-k] 06/06: chore(trait): Add an option to disable SSA in deployer trait

Posted by ts...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tsato pushed a commit to branch release-1.8.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 8f64c6b53911fffde7af59ffe87cd47865b53dd5
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Wed May 18 10:38:46 2022 +0200

    chore(trait): Add an option to disable SSA in deployer trait
    
    (cherry picked from commit 8f3f6354228bbd04174cb3f25a33e4a7886fffd5)
---
 docs/modules/traits/pages/deployer.adoc | 5 +++++
 go.sum                                  | 3 ++-
 pkg/cmd/run_test.go                     | 4 ++--
 pkg/resources/resources.go              | 4 ++--
 pkg/trait/deployer.go                   | 5 ++++-
 resources/traits.yaml                   | 5 +++++
 6 files changed, 20 insertions(+), 6 deletions(-)

diff --git a/docs/modules/traits/pages/deployer.adoc b/docs/modules/traits/pages/deployer.adoc
index 61181e5a9..906f34936 100755
--- a/docs/modules/traits/pages/deployer.adoc
+++ b/docs/modules/traits/pages/deployer.adoc
@@ -32,6 +32,11 @@ The following configuration options are available:
 | string
 | Allows to explicitly select the desired deployment kind between `deployment`, `cron-job` or `knative-service` when creating the resources for running the integration.
 
+| deployer.use-ssa
+| bool
+| Use server-side apply to update the owned resources (default `true`).
+Note that it automatically falls back to client-side patching, if SSA is not available, e.g., on old Kubernetes clusters.
+
 |===
 
 // End of autogenerated code - DO NOT EDIT! (configuration)
diff --git a/go.sum b/go.sum
index a230b3538..571ac57b7 100644
--- a/go.sum
+++ b/go.sum
@@ -1447,8 +1447,9 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb
 golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
-golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
 golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M=
+golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
diff --git a/pkg/cmd/run_test.go b/pkg/cmd/run_test.go
index 85241a2c5..8cabc66cd 100644
--- a/pkg/cmd/run_test.go
+++ b/pkg/cmd/run_test.go
@@ -633,7 +633,7 @@ func TestFilterBuildPropertyFiles(t *testing.T) {
 }
 
 func TestResolveYamlPodTemplateWithSupplementalGroups(t *testing.T) {
-	//_, rootCmd, _ := initializeRunCmdOptions(t)
+	// _, rootCmd, _ := initializeRunCmdOptions(t)
 	templateText := `
 securityContext:
   supplementalGroups:
@@ -651,7 +651,7 @@ securityContext:
 }
 
 func TestResolveJsonPodTemplateWithSupplementalGroups(t *testing.T) {
-	//_, rootCmd, _ := initializeRunCmdOptions(t)
+	// _, rootCmd, _ := initializeRunCmdOptions(t)
 	minifiedYamlTemplate := `{"securityContext":{"supplementalGroups":[666]}}`
 
 	integrationSpec := v1.IntegrationSpec{}
diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go
index a4b5ea1bb..e21cdb9ee 100644
--- a/pkg/resources/resources.go
+++ b/pkg/resources/resources.go
@@ -583,9 +583,9 @@ var assets = func() http.FileSystem {
 		"/traits.yaml": &vfsgen۰CompressedFileInfo{
 			name:             "traits.yaml",
 			modTime:          time.Time{},
-			uncompressedSize: 51291,
+			uncompressedSize: 51544,
 
-			compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\x7d\xfd\x73\x5b\xb9\x91\xe0\xef\xf3\x57\xa0\xb4\x57\x65\xc9\x45\x52\x9e\xc9\x26\x99\xd3\xdd\x6c\x4e\x63\x7b\x12\xcd\xf8\x43\x67\x6b\x26\x9b\xf2\xb9\x42\xf0\xbd\x26\x09\xeb\x11\x78\x01\xf0\x24\x33\x97\xfb\xdf\xaf\xd0\xdd\xf8\x78\x8f\xa4\x44\xd9\xd6\x6c\xb4\xb5\x9b\xaa\x1d\x4b\x7a\x00\x1a\x8d\x46\x7f\x77\xc3\x5b\xa9\xbc\x3b\xf9\x6a\x2c\xb4\x5c\xc1\x89\x90\xf3\xb9\xd2\xca\xaf\xbf\x12\xa2\x6d\xa4\x9f\x1b\xbb\x3a\x11\x [...]
+			compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\x7d\xfd\x73\x5b\xb9\x91\xe0\xef\xf3\x57\xa0\xb4\x57\x65\xc9\x45\x52\x9e\xc9\x26\x99\xd3\xdd\x6c\x4e\x63\x7b\x12\xcd\xf8\x43\x67\x6b\x26\x9b\xf2\xb9\x42\xf0\xbd\x26\x09\xf3\x11\x78\x01\xf0\x24\x33\x97\xfb\xdf\xaf\xd0\xdd\xf8\x78\x8f\x94\x44\xd9\xd6\x6c\xb4\xb5\x9b\xaa\x1d\x4b\x7a\x00\x1a\x8d\x46\x7f\x77\xc3\x5b\xa9\xbc\x3b\xf9\x6a\x2c\xb4\x5c\xc3\x89\x90\xf3\xb9\xd2\xca\x6f\xbe\x12\xa2\x6d\xa4\x9f\x1b\xbb\x3e\x11\x [...]
 		},
 	}
 	fs["/"].(*vfsgen۰DirInfo).entries = []os.FileInfo{
diff --git a/pkg/trait/deployer.go b/pkg/trait/deployer.go
index 8bc3b23ff..3dfb036ef 100644
--- a/pkg/trait/deployer.go
+++ b/pkg/trait/deployer.go
@@ -41,6 +41,9 @@ type deployerTrait struct {
 	BaseTrait `property:",squash"`
 	// Allows to explicitly select the desired deployment kind between `deployment`, `cron-job` or `knative-service` when creating the resources for running the integration.
 	Kind string `property:"kind" json:"kind,omitempty"`
+	// Use server-side apply to update the owned resources (default `true`).
+	// Note that it automatically falls back to client-side patching, if SSA is not available, e.g., on old Kubernetes clusters.
+	UseSSA *bool `property:"use-ssa" json:"useSSA,omitempty"`
 }
 
 var _ ControllerStrategySelector = &deployerTrait{}
@@ -68,7 +71,7 @@ func (t *deployerTrait) Apply(e *Environment) error {
 			// check its list of accepted MIME types.
 			// As a simpler solution, we fall back to client-side apply at the first
 			// 415 error, and assume server-side apply is not available globally.
-			if hasServerSideApply {
+			if hasServerSideApply && IsNilOrTrue(t.UseSSA) {
 				err := t.serverSideApply(env, resource)
 				switch {
 				case err == nil:
diff --git a/resources/traits.yaml b/resources/traits.yaml
index b52c48742..515c26e3e 100755
--- a/resources/traits.yaml
+++ b/resources/traits.yaml
@@ -281,6 +281,11 @@ traits:
     description: Allows to explicitly select the desired deployment kind between `deployment`,
       `cron-job` or `knative-service` when creating the resources for running the
       integration.
+  - name: use-ssa
+    type: bool
+    description: Use server-side apply to update the owned resources (default `true`).Note
+      that it automatically falls back to client-side patching, if SSA is not available,
+      e.g., on old Kubernetes clusters.
 - name: deployment
   platform: true
   profiles:


[camel-k] 01/06: feat(gc): Use SelfSubjectRulesReview to scan for garbage collectable resources

Posted by ts...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tsato pushed a commit to branch release-1.8.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit f841aa6d9489c83963fc0e8da28b22329a2a9936
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Wed Jun 1 16:10:58 2022 +0200

    feat(gc): Use SelfSubjectRulesReview to scan for garbage collectable resources
    
    (cherry picked from commit 8caeee372112d2dd60e9f5121af84e52de5355e4)
---
 pkg/trait/gc.go | 100 +++++++++++++++++++++++++++++++++-----------------------
 1 file changed, 60 insertions(+), 40 deletions(-)

diff --git a/pkg/trait/gc.go b/pkg/trait/gc.go
index 1ff3204f5..b11802900 100644
--- a/pkg/trait/gc.go
+++ b/pkg/trait/gc.go
@@ -19,6 +19,7 @@ package trait
 
 import (
 	"context"
+	"fmt"
 	"path/filepath"
 	"regexp"
 	"strconv"
@@ -26,6 +27,8 @@ import (
 	"sync"
 	"time"
 
+	"github.com/apache/camel-k/pkg/util"
+	authorization "k8s.io/api/authorization/v1"
 	k8serrors "k8s.io/apimachinery/pkg/api/errors"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -35,7 +38,6 @@ import (
 	"k8s.io/client-go/discovery"
 	"k8s.io/client-go/discovery/cached/disk"
 	"k8s.io/client-go/discovery/cached/memory"
-
 	ctrl "sigs.k8s.io/controller-runtime/pkg/client"
 
 	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
@@ -81,8 +83,7 @@ func (t *garbageCollectorTrait) Configure(e *Environment) (bool, error) {
 		t.DiscoveryCache = &s
 	}
 
-	return e.IntegrationInPhase(v1.IntegrationPhaseInitialization) || e.IntegrationInRunningPhases(),
-		nil
+	return e.IntegrationInPhase(v1.IntegrationPhaseInitialization) || e.IntegrationInRunningPhases(), nil
 }
 
 func (t *garbageCollectorTrait) Apply(e *Environment) error {
@@ -92,12 +93,9 @@ func (t *garbageCollectorTrait) Apply(e *Environment) error {
 		// Register a post action that deletes the existing resources that are labelled
 		// with the previous integration generations.
 		// TODO: this should be refined so that it's run when all the replicas for the newer generation
-		// are ready. This is to be added when the integration scale status is refined with ready replicas
+		// are ready.
 		e.PostActions = append(e.PostActions, func(env *Environment) error {
-			// The collection and deletion are performed asynchronously to avoid blocking
-			// the reconciliation loop.
-			go t.garbageCollectResources(env)
-			return nil
+			return t.garbageCollectResources(env)
 		})
 
 		fallthrough
@@ -121,43 +119,40 @@ func (t *garbageCollectorTrait) Apply(e *Environment) error {
 	return nil
 }
 
-func (t *garbageCollectorTrait) garbageCollectResources(e *Environment) {
+func (t *garbageCollectorTrait) garbageCollectResources(e *Environment) error {
+	deletableGVKs, err := t.getDeletableTypes(e)
+	if err != nil {
+		return fmt.Errorf("cannot discover GVK types: %v", err)
+	}
+
 	integration, _ := labels.NewRequirement(v1.IntegrationLabel, selection.Equals, []string{e.Integration.Name})
 	generation, err := labels.NewRequirement("camel.apache.org/generation", selection.LessThan, []string{strconv.FormatInt(e.Integration.GetGeneration(), 10)})
 	if err != nil {
-		t.L.ForIntegration(e.Integration).Errorf(err, "cannot determine generation requirement")
-		return
+		return fmt.Errorf("cannot determine generation requirement: %v", err)
 	}
 	selector := labels.NewSelector().
 		Add(*integration).
 		Add(*generation)
 
-	deletableGVKs, err := t.getDeletableTypes(e)
-	if err != nil {
-		t.L.ForIntegration(e.Integration).Errorf(err, "cannot discover GVK types")
-		return
-	}
-
-	t.deleteEachOf(deletableGVKs, e, selector)
+	return t.deleteEachOf(e.Ctx, deletableGVKs, e, selector)
 }
 
-func (t *garbageCollectorTrait) deleteEachOf(gvks map[schema.GroupVersionKind]struct{}, e *Environment, selector labels.Selector) {
-	for gvk := range gvks {
+func (t *garbageCollectorTrait) deleteEachOf(ctx context.Context, GVKs map[schema.GroupVersionKind]struct{}, e *Environment, selector labels.Selector) error {
+	for GVK := range GVKs {
 		resources := unstructured.UnstructuredList{
 			Object: map[string]interface{}{
-				"apiVersion": gvk.GroupVersion().String(),
-				"kind":       gvk.Kind,
+				"apiVersion": GVK.GroupVersion().String(),
+				"kind":       GVK.Kind,
 			},
 		}
 		options := []ctrl.ListOption{
 			ctrl.InNamespace(e.Integration.Namespace),
 			ctrl.MatchingLabelsSelector{Selector: selector},
 		}
-		if err := t.Client.List(context.TODO(), &resources, options...); err != nil {
-			if !k8serrors.IsNotFound(err) && !k8serrors.IsForbidden(err) {
-				t.L.ForIntegration(e.Integration).Errorf(err, "cannot list child resources: %v", gvk)
+		if err := t.Client.List(ctx, &resources, options...); err != nil {
+			if !k8serrors.IsNotFound(err) {
+				return fmt.Errorf("cannot list child resources: %v", err)
 			}
-
 			continue
 		}
 
@@ -166,7 +161,7 @@ func (t *garbageCollectorTrait) deleteEachOf(gvks map[schema.GroupVersionKind]st
 			if !t.canBeDeleted(e, r) {
 				continue
 			}
-			err := t.Client.Delete(context.TODO(), &r, ctrl.PropagationPolicy(metav1.DeletePropagationBackground))
+			err := t.Client.Delete(ctx, &r, ctrl.PropagationPolicy(metav1.DeletePropagationBackground))
 			if err != nil {
 				// The resource may have already been deleted
 				if !k8serrors.IsNotFound(err) {
@@ -177,6 +172,8 @@ func (t *garbageCollectorTrait) deleteEachOf(gvks map[schema.GroupVersionKind]st
 			}
 		}
 	}
+
+	return nil
 }
 
 func (t *garbageCollectorTrait) canBeDeleted(e *Environment, u unstructured.Unstructured) bool {
@@ -192,7 +189,7 @@ func (t *garbageCollectorTrait) canBeDeleted(e *Environment, u unstructured.Unst
 func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.GroupVersionKind]struct{}, error) {
 	// We rely on the discovery API to retrieve all the resources GVK,
 	// that results in an unbounded set that can impact garbage collection latency when scaling up.
-	discoveryClient, err := t.discoveryClient(e)
+	discoveryClient, err := t.discoveryClient()
 	if err != nil {
 		return nil, err
 	}
@@ -206,21 +203,45 @@ func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.Gr
 
 	// We only take types that support the "delete" verb,
 	// to prevents from performing queries that we know are going to return "MethodNotAllowed".
-	return groupVersionKinds(discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)),
-		nil
-}
+	APIResourceLists := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)
+
+	// Retrieve the permissions granted to the operator service account.
+	// We assume the operator has only to garbage collect the resources it has created.
+	srr := &authorization.SelfSubjectRulesReview{
+		Spec: authorization.SelfSubjectRulesReviewSpec{
+			Namespace: e.Integration.Namespace,
+		},
+	}
+	res, err := e.Client.AuthorizationV1().SelfSubjectRulesReviews().Create(e.Ctx, srr, metav1.CreateOptions{})
+	if err != nil {
+		return nil, err
+	}
 
-func groupVersionKinds(rls []*metav1.APIResourceList) map[schema.GroupVersionKind]struct{} {
-	GVKs := map[schema.GroupVersionKind]struct{}{}
-	for _, rl := range rls {
-		for _, r := range rl.APIResources {
-			GVKs[schema.FromAPIVersionAndKind(rl.GroupVersion, r.Kind)] = struct{}{}
+	GVKs := make(map[schema.GroupVersionKind]struct{})
+	for _, APIResourceList := range APIResourceLists {
+		for _, resource := range APIResourceList.APIResources {
+		rule:
+			for _, rule := range res.Status.ResourceRules {
+				if !util.StringSliceContainsAnyOf(rule.Verbs, "delete", "*") {
+					continue
+				}
+				for _, group := range rule.APIGroups {
+					for _, name := range rule.Resources {
+						if (resource.Group == group || group == "*") && (resource.Name == name || name == "*") {
+							GVK := schema.FromAPIVersionAndKind(APIResourceList.GroupVersion, resource.Kind)
+							GVKs[GVK] = struct{}{}
+							break rule
+						}
+					}
+				}
+			}
 		}
 	}
-	return GVKs
+
+	return GVKs, nil
 }
 
-func (t *garbageCollectorTrait) discoveryClient(e *Environment) (discovery.DiscoveryInterface, error) {
+func (t *garbageCollectorTrait) discoveryClient() (discovery.DiscoveryInterface, error) {
 	discoveryClientLock.Lock()
 	defer discoveryClientLock.Unlock()
 
@@ -247,7 +268,6 @@ func (t *garbageCollectorTrait) discoveryClient(e *Environment) (discovery.Disco
 		return t.Client.Discovery(), nil
 
 	default:
-		t.L.ForIntegration(e.Integration).Infof("unsupported discovery cache type: %s", *t.DiscoveryCache)
-		return t.Client.Discovery(), nil
+		return nil, fmt.Errorf("unsupported discovery cache type: %s", *t.DiscoveryCache)
 	}
 }


[camel-k] 03/06: chore(gc): Fix lint errors

Posted by ts...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tsato pushed a commit to branch release-1.8.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit d5711f64407ba4c9cc202fabc5709a3da1564e8c
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Wed Jun 1 19:07:20 2022 +0200

    chore(gc): Fix lint errors
    
    (cherry picked from commit 8a267fc3eb4de93d7a1a56f963870babb224f0d6)
---
 pkg/trait/gc.go | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/pkg/trait/gc.go b/pkg/trait/gc.go
index 6ca3af790..a088367fb 100644
--- a/pkg/trait/gc.go
+++ b/pkg/trait/gc.go
@@ -27,9 +27,9 @@ import (
 	"sync"
 	"time"
 
+	"github.com/pkg/errors"
 	"golang.org/x/time/rate"
 
-	"github.com/apache/camel-k/pkg/util"
 	authorization "k8s.io/api/authorization/v1"
 	k8serrors "k8s.io/apimachinery/pkg/api/errors"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -43,6 +43,7 @@ import (
 	ctrl "sigs.k8s.io/controller-runtime/pkg/client"
 
 	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+	"github.com/apache/camel-k/pkg/util"
 )
 
 var (
@@ -127,13 +128,13 @@ func (t *garbageCollectorTrait) Apply(e *Environment) error {
 func (t *garbageCollectorTrait) garbageCollectResources(e *Environment) error {
 	deletableGVKs, err := t.getDeletableTypes(e)
 	if err != nil {
-		return fmt.Errorf("cannot discover GVK types: %v", err)
+		return errors.Wrap(err, "cannot discover GVK types")
 	}
 
 	integration, _ := labels.NewRequirement(v1.IntegrationLabel, selection.Equals, []string{e.Integration.Name})
 	generation, err := labels.NewRequirement("camel.apache.org/generation", selection.LessThan, []string{strconv.FormatInt(e.Integration.GetGeneration(), 10)})
 	if err != nil {
-		return fmt.Errorf("cannot determine generation requirement: %v", err)
+		return errors.Wrap(err, "cannot determine generation requirement")
 	}
 	selector := labels.NewSelector().
 		Add(*integration).
@@ -142,8 +143,8 @@ func (t *garbageCollectorTrait) garbageCollectResources(e *Environment) error {
 	return t.deleteEachOf(e.Ctx, deletableGVKs, e, selector)
 }
 
-func (t *garbageCollectorTrait) deleteEachOf(ctx context.Context, GVKs map[schema.GroupVersionKind]struct{}, e *Environment, selector labels.Selector) error {
-	for GVK := range GVKs {
+func (t *garbageCollectorTrait) deleteEachOf(ctx context.Context, deletableGVKs map[schema.GroupVersionKind]struct{}, e *Environment, selector labels.Selector) error {
+	for GVK := range deletableGVKs {
 		resources := unstructured.UnstructuredList{
 			Object: map[string]interface{}{
 				"apiVersion": GVK.GroupVersion().String(),
@@ -156,7 +157,7 @@ func (t *garbageCollectorTrait) deleteEachOf(ctx context.Context, GVKs map[schem
 		}
 		if err := t.Client.List(ctx, &resources, options...); err != nil {
 			if !k8serrors.IsNotFound(err) {
-				return fmt.Errorf("cannot list child resources: %v", err)
+				return errors.Wrap(err, "cannot list child resources")
 			}
 			continue
 		}


[camel-k] 02/06: feat(gc): Rate limit Discovery and SelfSubjectRulesReview requests

Posted by ts...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tsato pushed a commit to branch release-1.8.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 92ef377dc9e3f2f23fe059864d4db33eef2af9b3
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Wed Jun 1 18:46:20 2022 +0200

    feat(gc): Rate limit Discovery and SelfSubjectRulesReview requests
    
    (cherry picked from commit 9d7145e8ab71e8619a205c65d68797084f3d5fb4)
---
 go.mod          |  1 +
 pkg/trait/gc.go | 46 +++++++++++++++++++++++++++++-----------------
 2 files changed, 30 insertions(+), 17 deletions(-)

diff --git a/go.mod b/go.mod
index e3c63cfc8..5d1130572 100644
--- a/go.mod
+++ b/go.mod
@@ -46,6 +46,7 @@ require (
 	go.uber.org/zap v1.19.1
 	golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
 	golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
+	golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
 	gopkg.in/inf.v0 v0.9.1
 	gopkg.in/yaml.v2 v2.4.0
 	k8s.io/api v0.21.4
diff --git a/pkg/trait/gc.go b/pkg/trait/gc.go
index b11802900..6ca3af790 100644
--- a/pkg/trait/gc.go
+++ b/pkg/trait/gc.go
@@ -27,6 +27,8 @@ import (
 	"sync"
 	"time"
 
+	"golang.org/x/time/rate"
+
 	"github.com/apache/camel-k/pkg/util"
 	authorization "k8s.io/api/authorization/v1"
 	k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -44,10 +46,13 @@ import (
 )
 
 var (
-	toFileName                  = regexp.MustCompile(`[^(\w/\.)]`)
-	diskCachedDiscoveryClient   discovery.CachedDiscoveryInterface
-	memoryCachedDiscoveryClient discovery.CachedDiscoveryInterface
-	discoveryClientLock         sync.Mutex
+	toFileName = regexp.MustCompile(`[^(\w/\.)]`)
+
+	lock                  sync.Mutex
+	rateLimiter           = rate.NewLimiter(rate.Every(time.Minute), 1)
+	collectableGVKs       = make(map[schema.GroupVersionKind]struct{})
+	memoryCachedDiscovery discovery.CachedDiscoveryInterface
+	diskCachedDiscovery   discovery.CachedDiscoveryInterface
 )
 
 type discoveryCacheType string
@@ -187,6 +192,15 @@ func (t *garbageCollectorTrait) canBeDeleted(e *Environment, u unstructured.Unst
 }
 
 func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.GroupVersionKind]struct{}, error) {
+	lock.Lock()
+	defer lock.Unlock()
+
+	// Rate limit to avoid Discovery and SelfSubjectRulesReview requests at every reconciliation.
+	if !rateLimiter.Allow() {
+		// Return the cached set of garbage collectable GVKs.
+		return collectableGVKs, nil
+	}
+
 	// We rely on the discovery API to retrieve all the resources GVK,
 	// that results in an unbounded set that can impact garbage collection latency when scaling up.
 	discoveryClient, err := t.discoveryClient()
@@ -196,7 +210,7 @@ func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.Gr
 	resources, err := discoveryClient.ServerPreferredNamespacedResources()
 	// Swallow group discovery errors, e.g., Knative serving exposes
 	// an aggregated API for custom.metrics.k8s.io that requires special
-	// authentication scheme while discovering preferred resources
+	// authentication scheme while discovering preferred resources.
 	if err != nil && !discovery.IsGroupDiscoveryFailedError(err) {
 		return nil, err
 	}
@@ -237,32 +251,30 @@ func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.Gr
 			}
 		}
 	}
+	collectableGVKs = GVKs
 
-	return GVKs, nil
+	return collectableGVKs, nil
 }
 
 func (t *garbageCollectorTrait) discoveryClient() (discovery.DiscoveryInterface, error) {
-	discoveryClientLock.Lock()
-	defer discoveryClientLock.Unlock()
-
 	switch *t.DiscoveryCache {
 	case diskDiscoveryCache:
-		if diskCachedDiscoveryClient != nil {
-			return diskCachedDiscoveryClient, nil
+		if diskCachedDiscovery != nil {
+			return diskCachedDiscovery, nil
 		}
 		config := t.Client.GetConfig()
 		httpCacheDir := filepath.Join(mustHomeDir(), ".kube", "http-cache")
 		diskCacheDir := filepath.Join(mustHomeDir(), ".kube", "cache", "discovery", toHostDir(config.Host))
 		var err error
-		diskCachedDiscoveryClient, err = disk.NewCachedDiscoveryClientForConfig(config, diskCacheDir, httpCacheDir, 10*time.Minute)
-		return diskCachedDiscoveryClient, err
+		diskCachedDiscovery, err = disk.NewCachedDiscoveryClientForConfig(config, diskCacheDir, httpCacheDir, 10*time.Minute)
+		return diskCachedDiscovery, err
 
 	case memoryDiscoveryCache:
-		if memoryCachedDiscoveryClient != nil {
-			return memoryCachedDiscoveryClient, nil
+		if memoryCachedDiscovery != nil {
+			return memoryCachedDiscovery, nil
 		}
-		memoryCachedDiscoveryClient = memory.NewMemCacheClient(t.Client.Discovery())
-		return memoryCachedDiscoveryClient, nil
+		memoryCachedDiscovery = memory.NewMemCacheClient(t.Client.Discovery())
+		return memoryCachedDiscovery, nil
 
 	case disabledDiscoveryCache, "":
 		return t.Client.Discovery(), nil


[camel-k] 05/06: fix: Use status change predicate to filter updates on owned resources

Posted by ts...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tsato pushed a commit to branch release-1.8.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 7cdb9220be7a7aa02e55dc2e38dd10ed2572a21b
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Wed May 18 09:15:20 2022 +0200

    fix: Use status change predicate to filter updates on owned resources
    
    (cherry picked from commit 85483d4054457dad390b691c95a78cd2087a728b)
---
 .../integration/integration_controller.go          |  7 ++-
 pkg/controller/integration/predicate.go            | 57 ++++++++++++++++++++++
 2 files changed, 60 insertions(+), 4 deletions(-)

diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go
index fb2dd202c..45aa0828c 100644
--- a/pkg/controller/integration/integration_controller.go
+++ b/pkg/controller/integration/integration_controller.go
@@ -31,7 +31,6 @@ import (
 	"k8s.io/apimachinery/pkg/runtime/schema"
 	"k8s.io/apimachinery/pkg/types"
 	"k8s.io/client-go/tools/record"
-
 	"sigs.k8s.io/controller-runtime/pkg/builder"
 	ctrl "sigs.k8s.io/controller-runtime/pkg/client"
 	"sigs.k8s.io/controller-runtime/pkg/event"
@@ -203,9 +202,9 @@ func add(mgr manager.Manager, c client.Client, r reconcile.Reconciler) error {
 				return requests
 			})).
 		// Watch for the owned Deployments
-		Owns(&appsv1.Deployment{}).
+		Owns(&appsv1.Deployment{}, builder.WithPredicates(StatusChangedPredicate{})).
 		// Watch for the owned CronJobs
-		Owns(&batchv1beta1.CronJob{}).
+		Owns(&batchv1beta1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{})).
 		// Watch for the Integration Pods
 		Watches(&source.Kind{Type: &corev1.Pod{}},
 			handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) []reconcile.Request {
@@ -234,7 +233,7 @@ func add(mgr manager.Manager, c client.Client, r reconcile.Reconciler) error {
 		if ok, err = kubernetes.CheckPermission(ctx, c, serving.GroupName, "services", platform.GetOperatorWatchNamespace(), "", "watch"); err != nil {
 			return err
 		} else if ok {
-			b.Owns(&servingv1.Service{})
+			b.Owns(&servingv1.Service{}, builder.WithPredicates(StatusChangedPredicate{}))
 		}
 	}
 
diff --git a/pkg/controller/integration/predicate.go b/pkg/controller/integration/predicate.go
new file mode 100644
index 000000000..79d61556a
--- /dev/null
+++ b/pkg/controller/integration/predicate.go
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package integration
+
+import (
+	"reflect"
+
+	"k8s.io/apimachinery/pkg/api/equality"
+	"sigs.k8s.io/controller-runtime/pkg/event"
+	"sigs.k8s.io/controller-runtime/pkg/predicate"
+)
+
+// StatusChangedPredicate implements a generic update predicate function on status change.
+type StatusChangedPredicate struct {
+	predicate.Funcs
+}
+
+// Update implements default UpdateEvent filter for validating status change.
+func (StatusChangedPredicate) Update(e event.UpdateEvent) bool {
+	if e.ObjectOld == nil {
+		Log.Error(nil, "Update event has no old object to update", "event", e)
+		return false
+	}
+	if e.ObjectNew == nil {
+		Log.Error(nil, "Update event has no new object to update", "event", e)
+		return false
+	}
+
+	s1 := reflect.ValueOf(e.ObjectOld).Elem().FieldByName("Status")
+	if !s1.IsValid() {
+		Log.Error(nil, "Update event old object has no Status field", "event", e)
+		return false
+	}
+
+	s2 := reflect.ValueOf(e.ObjectNew).Elem().FieldByName("Status")
+	if !s2.IsValid() {
+		Log.Error(nil, "Update event new object has no Status field", "event", e)
+		return false
+	}
+
+	return !equality.Semantic.DeepDerivative(s1.Interface(), s2.Interface())
+}


[camel-k] 04/06: feat(gc): Skip GC for first integration generation

Posted by ts...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tsato pushed a commit to branch release-1.8.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 3b5d83595c56cc93527931506d64eef7be934ecb
Author: Antonin Stefanutti <an...@stefanutti.fr>
AuthorDate: Thu Jun 2 10:02:29 2022 +0200

    feat(gc): Skip GC for first integration generation
    
    (cherry picked from commit 0dd6d8cb68d771ddfabb81a19a5d104e08b3e451)
---
 pkg/trait/gc.go      | 46 +++++++++++++++++++++-------------------------
 pkg/trait/gc_test.go | 16 ++++++++++++++--
 2 files changed, 35 insertions(+), 27 deletions(-)

diff --git a/pkg/trait/gc.go b/pkg/trait/gc.go
index a088367fb..ef20ae7b1 100644
--- a/pkg/trait/gc.go
+++ b/pkg/trait/gc.go
@@ -93,34 +93,30 @@ func (t *garbageCollectorTrait) Configure(e *Environment) (bool, error) {
 }
 
 func (t *garbageCollectorTrait) Apply(e *Environment) error {
-	switch e.Integration.Status.Phase {
-
-	case v1.IntegrationPhaseDeploying, v1.IntegrationPhaseRunning, v1.IntegrationPhaseError:
+	if e.IntegrationInRunningPhases() && e.Integration.GetGeneration() > 1 {
 		// Register a post action that deletes the existing resources that are labelled
-		// with the previous integration generations.
-		// TODO: this should be refined so that it's run when all the replicas for the newer generation
-		// are ready.
+		// with the previous integration generation(s).
+		// We make the assumption generation is a monotonically increasing strictly positive integer,
+		// in which case we can skip garbage collection on the first generation.
+		// TODO: this should be refined so that it's run when all the replicas for the newer generation are ready.
 		e.PostActions = append(e.PostActions, func(env *Environment) error {
 			return t.garbageCollectResources(env)
 		})
+	}
 
-		fallthrough
-
-	default:
-		// Register a post processor that adds the required labels to the new resources
-		e.PostProcessors = append(e.PostProcessors, func(env *Environment) error {
-			generation := strconv.FormatInt(env.Integration.GetGeneration(), 10)
-			env.Resources.VisitMetaObject(func(resource metav1.Object) {
-				labels := resource.GetLabels()
-				// Label the resource with the current integration generation
-				labels["camel.apache.org/generation"] = generation
-				// Make sure the integration label is set
-				labels[v1.IntegrationLabel] = env.Integration.Name
-				resource.SetLabels(labels)
-			})
-			return nil
+	// Register a post processor that adds the required labels to the new resources
+	e.PostProcessors = append(e.PostProcessors, func(env *Environment) error {
+		generation := strconv.FormatInt(env.Integration.GetGeneration(), 10)
+		env.Resources.VisitMetaObject(func(resource metav1.Object) {
+			labels := resource.GetLabels()
+			// Label the resource with the current integration generation
+			labels["camel.apache.org/generation"] = generation
+			// Make sure the integration label is set
+			labels[v1.IntegrationLabel] = env.Integration.Name
+			resource.SetLabels(labels)
 		})
-	}
+		return nil
+	})
 
 	return nil
 }
@@ -222,12 +218,12 @@ func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.Gr
 
 	// Retrieve the permissions granted to the operator service account.
 	// We assume the operator has only to garbage collect the resources it has created.
-	srr := &authorization.SelfSubjectRulesReview{
+	ssrr := &authorization.SelfSubjectRulesReview{
 		Spec: authorization.SelfSubjectRulesReviewSpec{
 			Namespace: e.Integration.Namespace,
 		},
 	}
-	res, err := e.Client.AuthorizationV1().SelfSubjectRulesReviews().Create(e.Ctx, srr, metav1.CreateOptions{})
+	ssrr, err = e.Client.AuthorizationV1().SelfSubjectRulesReviews().Create(e.Ctx, ssrr, metav1.CreateOptions{})
 	if err != nil {
 		return nil, err
 	}
@@ -236,7 +232,7 @@ func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.Gr
 	for _, APIResourceList := range APIResourceLists {
 		for _, resource := range APIResourceList.APIResources {
 		rule:
-			for _, rule := range res.Status.ResourceRules {
+			for _, rule := range ssrr.Status.ResourceRules {
 				if !util.StringSliceContainsAnyOf(rule.Verbs, "delete", "*") {
 					continue
 				}
diff --git a/pkg/trait/gc_test.go b/pkg/trait/gc_test.go
index 9e76d1056..db8322f4e 100644
--- a/pkg/trait/gc_test.go
+++ b/pkg/trait/gc_test.go
@@ -44,11 +44,22 @@ func TestConfigureDisabledGarbageCollectorTraitDoesNotSucceed(t *testing.T) {
 	assert.Nil(t, err)
 }
 
-func TestApplyGarbageCollectorTraitDoesSucceed(t *testing.T) {
+func TestApplyGarbageCollectorTraitFirstGenerationDoesSucceed(t *testing.T) {
 	gcTrait, environment := createNominalGarbageCollectorTest()
 
 	err := gcTrait.Apply(environment)
 
+	assert.Nil(t, err)
+	assert.Len(t, environment.PostProcessors, 1)
+	assert.Len(t, environment.PostActions, 0)
+}
+
+func TestApplyGarbageCollectorTraitNextGenerationDoesSucceed(t *testing.T) {
+	gcTrait, environment := createNominalGarbageCollectorTest()
+	environment.Integration.Generation = 2
+
+	err := gcTrait.Apply(environment)
+
 	assert.Nil(t, err)
 	assert.Len(t, environment.PostProcessors, 1)
 	assert.Len(t, environment.PostActions, 1)
@@ -73,7 +84,8 @@ func createNominalGarbageCollectorTest() (*garbageCollectorTrait, *Environment)
 		Catalog: NewCatalog(nil),
 		Integration: &v1.Integration{
 			ObjectMeta: metav1.ObjectMeta{
-				Name: "integration-name",
+				Name:       "integration-name",
+				Generation: 1,
 			},
 			Status: v1.IntegrationStatus{
 				Phase: v1.IntegrationPhaseRunning,