You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by pc...@apache.org on 2023/08/22 12:15:12 UTC
[camel-k] 03/06: feat(ctrl): confimap and secret hot reload
This is an automated email from the ASF dual-hosted git repository.
pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 7d16b8b29808cb86db4de51f43b8b2529abc3a21
Author: Pasquale Congiusti <pa...@gmail.com>
AuthorDate: Fri Aug 11 15:31:41 2023 +0200
feat(ctrl): confimap and secret hot reload
Watch the resources and if they belong to an Integration, kick a reconcile loop
---
.../integration/integration_controller.go | 122 +++++++++++++++++++++
pkg/controller/integration/monitor.go | 11 ++
2 files changed, 133 insertions(+)
diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go
index e39249c91..51970810d 100644
--- a/pkg/controller/integration/integration_controller.go
+++ b/pkg/controller/integration/integration_controller.go
@@ -52,6 +52,7 @@ import (
"github.com/apache/camel-k/v2/pkg/util/kubernetes"
"github.com/apache/camel-k/v2/pkg/util/log"
"github.com/apache/camel-k/v2/pkg/util/monitoring"
+ utilResource "github.com/apache/camel-k/v2/pkg/util/resource"
)
func Add(ctx context.Context, mgr manager.Manager, c client.Client) error {
@@ -168,6 +169,106 @@ func integrationKitEnqueueRequestsFromMapFunc(ctx context.Context, c client.Clie
return requests
}
+func configmapEnqueueRequestsFromMapFunc(ctx context.Context, c client.Client, cm *corev1.ConfigMap) []reconcile.Request {
+ var requests []reconcile.Request
+
+ // Do global search in case of global operator (it may be using a global platform)
+ list := &v1.IntegrationList{}
+ var opts []ctrl.ListOption
+ if !platform.IsCurrentOperatorGlobal() {
+ opts = append(opts, ctrl.InNamespace(cm.Namespace))
+ }
+
+ if err := c.List(ctx, list, opts...); err != nil {
+ log.Error(err, "Failed to list integrations")
+ return requests
+ }
+
+ for _, integration := range list.Items {
+ found := false
+ if integration.Spec.Traits.Mount == nil {
+ continue
+ }
+ for _, c := range integration.Spec.Traits.Mount.Configs {
+ if conf, parseErr := utilResource.ParseConfig(c); parseErr == nil {
+ if conf.StorageType() == utilResource.StorageTypeConfigmap && conf.Name() == cm.Name {
+ found = true
+ break
+ }
+ }
+ }
+ for _, r := range integration.Spec.Traits.Mount.Resources {
+ if conf, parseErr := utilResource.ParseConfig(r); parseErr == nil {
+ if conf.StorageType() == utilResource.StorageTypeConfigmap && conf.Name() == cm.Name {
+ found = true
+ break
+ }
+ }
+ }
+ if found {
+ log.Infof("Configmap %s updated, wake-up integration: %s", cm.Name, integration.Name)
+ requests = append(requests, reconcile.Request{
+ NamespacedName: types.NamespacedName{
+ Namespace: integration.Namespace,
+ Name: integration.Name,
+ },
+ })
+ }
+ }
+
+ return requests
+}
+
+func secretEnqueueRequestsFromMapFunc(ctx context.Context, c client.Client, sec *corev1.Secret) []reconcile.Request {
+ var requests []reconcile.Request
+
+ // Do global search in case of global operator (it may be using a global platform)
+ list := &v1.IntegrationList{}
+ var opts []ctrl.ListOption
+ if !platform.IsCurrentOperatorGlobal() {
+ opts = append(opts, ctrl.InNamespace(sec.Namespace))
+ }
+
+ if err := c.List(ctx, list, opts...); err != nil {
+ log.Error(err, "Failed to list integrations")
+ return requests
+ }
+
+ for _, integration := range list.Items {
+ found := false
+ if integration.Spec.Traits.Mount == nil {
+ continue
+ }
+ for _, c := range integration.Spec.Traits.Mount.Configs {
+ if conf, parseErr := utilResource.ParseConfig(c); parseErr == nil {
+ if conf.StorageType() == utilResource.StorageTypeSecret && conf.Name() == sec.Name {
+ found = true
+ break
+ }
+ }
+ }
+ for _, r := range integration.Spec.Traits.Mount.Resources {
+ if conf, parseErr := utilResource.ParseConfig(r); parseErr == nil {
+ if conf.StorageType() == utilResource.StorageTypeSecret && conf.Name() == sec.Name {
+ found = true
+ break
+ }
+ }
+ }
+ if found {
+ log.Infof("Secret %s updated, wake-up integration: %s", sec.Name, integration.Name)
+ requests = append(requests, reconcile.Request{
+ NamespacedName: types.NamespacedName{
+ Namespace: integration.Namespace,
+ Name: integration.Name,
+ },
+ })
+ }
+ }
+
+ return requests
+}
+
func integrationPlatformEnqueueRequestsFromMapFunc(ctx context.Context, c client.Client, p *v1.IntegrationPlatform) []reconcile.Request {
var requests []reconcile.Request
@@ -249,6 +350,27 @@ func add(ctx context.Context, mgr manager.Manager, c client.Client, r reconcile.
return integrationPlatformEnqueueRequestsFromMapFunc(ctx, c, p)
})).
+ // Watch for Configmaps or Secret used in the Integrations for updates.
+ Watches(&source.Kind{Type: &corev1.ConfigMap{}},
+ handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) []reconcile.Request {
+ cm, ok := a.(*corev1.ConfigMap)
+ if !ok {
+ log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve integration list")
+ return []reconcile.Request{}
+ }
+
+ return configmapEnqueueRequestsFromMapFunc(ctx, c, cm)
+ })).
+ Watches(&source.Kind{Type: &corev1.Secret{}},
+ handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) []reconcile.Request {
+ secret, ok := a.(*corev1.Secret)
+ if !ok {
+ log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve integration list")
+ return []reconcile.Request{}
+ }
+
+ return secretEnqueueRequestsFromMapFunc(ctx, c, secret)
+ })).
// Watch for the owned Deployments
Owns(&appsv1.Deployment{}, builder.WithPredicates(StatusChangedPredicate{})).
// Watch for the Integration Pods
diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go
index 471b1fbda..04d539174 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -219,6 +219,17 @@ func getIntegrationSecretsAndConfigmaps(ctx context.Context, client client.Clien
}
}
}
+ for _, r := range integration.Spec.Traits.Mount.Resources {
+ if conf, parseErr := utilResource.ParseConfig(r); parseErr == nil {
+ if conf.StorageType() == utilResource.StorageTypeConfigmap {
+ configmap := kubernetes.LookupConfigmap(ctx, client, integration.Namespace, conf.Name())
+ configmaps = append(configmaps, configmap)
+ } else if conf.StorageType() == utilResource.StorageTypeSecret {
+ secret := kubernetes.LookupSecret(ctx, client, integration.Namespace, conf.Name())
+ secrets = append(secrets, secret)
+ }
+ }
+ }
}
return secrets, configmaps
}