You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by pc...@apache.org on 2024/01/09 14:21:51 UTC
(camel-k) 03/08: chore: synthetic Integration separate controller
This is an automated email from the ASF dual-hosted git repository.
pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 3d7715612a7437ba5ec6b30651dabe7540ea10bd
Author: Pasquale Congiusti <pa...@gmail.com>
AuthorDate: Fri Dec 1 15:48:32 2023 +0100
chore: synthetic Integration separate controller
---
config/rbac/namespaced/operator-role-knative.yaml | 8 -
config/rbac/namespaced/operator-role.yaml | 1 +
helm/camel-k/templates/operator-role.yaml | 1 +
pkg/cmd/operator/operator.go | 3 +
.../integration/integration_controller.go | 48 +---
.../integration/integration_controller_import.go | 249 -----------------
pkg/controller/integration/monitor.go | 1 -
pkg/controller/integration/monitor_synthetic.go | 18 --
pkg/controller/integration/predicate.go | 37 ---
pkg/controller/pipe/pipe_controller.go | 2 +-
pkg/controller/synthetic/synthetic.go | 300 +++++++++++++++++++++
11 files changed, 312 insertions(+), 356 deletions(-)
diff --git a/config/rbac/namespaced/operator-role-knative.yaml b/config/rbac/namespaced/operator-role-knative.yaml
index 7e1d2f349..3cba80931 100644
--- a/config/rbac/namespaced/operator-role-knative.yaml
+++ b/config/rbac/namespaced/operator-role-knative.yaml
@@ -35,14 +35,6 @@ rules:
- patch
- update
- watch
-- apiGroups:
- - serving.knative.dev
- resources:
- - revisions
- verbs:
- - get
- - list
- - watch
- apiGroups:
- eventing.knative.dev
resources:
diff --git a/config/rbac/namespaced/operator-role.yaml b/config/rbac/namespaced/operator-role.yaml
index 4ddc2d4c1..0f364463e 100644
--- a/config/rbac/namespaced/operator-role.yaml
+++ b/config/rbac/namespaced/operator-role.yaml
@@ -45,6 +45,7 @@ rules:
- camel.apache.org
resources:
- builds
+ - integrations
verbs:
- delete
- apiGroups:
diff --git a/helm/camel-k/templates/operator-role.yaml b/helm/camel-k/templates/operator-role.yaml
index b8e709b80..40ef9742a 100644
--- a/helm/camel-k/templates/operator-role.yaml
+++ b/helm/camel-k/templates/operator-role.yaml
@@ -54,6 +54,7 @@ rules:
- camel.apache.org
resources:
- builds
+ - integrations
verbs:
- delete
- apiGroups:
diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go
index 04b5ea8b2..12edd7cc1 100644
--- a/pkg/cmd/operator/operator.go
+++ b/pkg/cmd/operator/operator.go
@@ -59,6 +59,7 @@ import (
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
"github.com/apache/camel-k/v2/pkg/client"
"github.com/apache/camel-k/v2/pkg/controller"
+ "github.com/apache/camel-k/v2/pkg/controller/synthetic"
"github.com/apache/camel-k/v2/pkg/event"
"github.com/apache/camel-k/v2/pkg/install"
"github.com/apache/camel-k/v2/pkg/platform"
@@ -231,6 +232,8 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID
install.OperatorStartupOptionalTools(installCtx, bootstrapClient, watchNamespace, operatorNamespace, log)
exitOnError(findOrCreateIntegrationPlatform(installCtx, bootstrapClient, operatorNamespace), "failed to create integration platform")
+ log.Info("Starting the synthetic Integration manager")
+ exitOnError(synthetic.ManageSyntheticIntegrations(ctx, ctrlClient, mgr.GetCache(), mgr.GetAPIReader()), "synthetic Integration manager error")
log.Info("Starting the manager")
exitOnError(mgr.Start(ctx), "manager exited non-zero")
}
diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go
index 1979b9d4a..a16aa6967 100644
--- a/pkg/controller/integration/integration_controller.go
+++ b/pkg/controller/integration/integration_controller.go
@@ -328,7 +328,7 @@ func add(ctx context.Context, mgr manager.Manager, c client.Client, r reconcile.
watchIntegrationResources(c, b)
// Watch for the CronJob conditionally
if ok, err := kubernetes.IsAPIResourceInstalled(c, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil {
- watchCronJobResources(c, b)
+ watchCronJobResources(b)
}
// Watch for the Knative Services conditionally
if ok, err := kubernetes.IsAPIResourceInstalled(c, servingv1.SchemeGroupVersion.String(), reflect.TypeOf(servingv1.Service{}).Name()); err != nil {
@@ -405,37 +405,13 @@ func watchIntegrationResources(c client.Client, b *builder.Builder) {
},
}
})).
- // Watch for non managed Deployments (ie, imported)
- Watches(&appsv1.Deployment{},
- handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
- deploy, ok := a.(*appsv1.Deployment)
- if !ok {
- log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve Deployment")
- return []reconcile.Request{}
- }
- return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelDeployment{deploy: deploy})
- }),
- builder.WithPredicates(NonManagedObjectPredicate{}),
- ).
// Watch for the owned Deployments
Owns(&appsv1.Deployment{}, builder.WithPredicates(StatusChangedPredicate{}))
}
-func watchCronJobResources(c client.Client, b *builder.Builder) {
- // Watch for non managed Deployments (ie, imported)
- b.Watches(&batchv1.CronJob{},
- handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
- cron, ok := a.(*batchv1.CronJob)
- if !ok {
- log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve CronJob")
- return []reconcile.Request{}
- }
- return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelCronjob{cron: cron})
- }),
- builder.WithPredicates(NonManagedObjectPredicate{}),
- ).
- // Watch for the owned CronJobs
- Owns(&batchv1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{}))
+func watchCronJobResources(b *builder.Builder) {
+ // Watch for the owned CronJobs
+ b.Owns(&batchv1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{}))
}
func watchKnativeResources(ctx context.Context, c client.Client, b *builder.Builder) error {
@@ -445,20 +421,8 @@ func watchKnativeResources(ctx context.Context, c client.Client, b *builder.Buil
if ok, err := kubernetes.CheckPermission(checkCtx, c, serving.GroupName, "services", platform.GetOperatorWatchNamespace(), "", "watch"); err != nil {
return err
} else if ok {
- // Watch for non managed Knative Service (ie, imported)
- b.Watches(&servingv1.Service{},
- handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
- ksvc, ok := a.(*servingv1.Service)
- if !ok {
- log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve KnativeService")
- return []reconcile.Request{}
- }
- return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelKnativeService{ksvc: ksvc})
- }),
- builder.WithPredicates(NonManagedObjectPredicate{}),
- ).
- // Watch for the owned CronJobs
- Owns(&servingv1.Service{}, builder.WithPredicates(StatusChangedPredicate{}))
+ // Watch for the owned Knative Services
+ b.Owns(&servingv1.Service{}, builder.WithPredicates(StatusChangedPredicate{}))
}
return nil
}
diff --git a/pkg/controller/integration/integration_controller_import.go b/pkg/controller/integration/integration_controller_import.go
deleted file mode 100644
index 403185509..000000000
--- a/pkg/controller/integration/integration_controller_import.go
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package integration
-
-import (
- "context"
-
- v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
- "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait"
- "github.com/apache/camel-k/v2/pkg/client"
- "github.com/apache/camel-k/v2/pkg/util/log"
- "github.com/apache/camel-k/v2/pkg/util/patch"
- appsv1 "k8s.io/api/apps/v1"
- batchv1 "k8s.io/api/batch/v1"
- k8serrors "k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/apimachinery/pkg/types"
- servingv1 "knative.dev/serving/pkg/apis/serving/v1"
- ctrl "sigs.k8s.io/controller-runtime/pkg/client"
- "sigs.k8s.io/controller-runtime/pkg/reconcile"
-)
-
-// nonManagedCamelAppEnqueueRequestsFromMapFunc represent the function to discover the Integration which has to be woke up: it creates a synthetic
-// Integration if the Integration does not exist. This is used to import external Camel applications.
-func nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx context.Context, c client.Client, adp NonManagedCamelApplicationAdapter) []reconcile.Request {
- if adp.GetIntegrationName() == "" {
- return []reconcile.Request{}
- }
- it := v1.NewIntegration(adp.GetIntegrationNameSpace(), adp.GetIntegrationName())
- err := c.Get(ctx, ctrl.ObjectKeyFromObject(&it), &it)
- if err != nil {
- if k8serrors.IsNotFound(err) {
- // We must perform this check to make sure the resource is not being deleted.
- // In such case it makes no sense to create an Integration after it.
- err := c.Get(ctx, ctrl.ObjectKeyFromObject(adp.GetAppObj()), adp.GetAppObj())
- if err != nil {
- if k8serrors.IsNotFound(err) {
- return []reconcile.Request{}
- }
- log.Errorf(err, "Some error happened while trying to get %s %s resource", adp.GetName(), adp.GetKind())
- }
- createSyntheticIntegration(&it, adp)
- target, err := patch.ApplyPatch(&it)
- if err == nil {
- err = c.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator"))
- if err != nil {
- log.Errorf(err, "Some error happened while creating a synthetic Integration after %s %s resource", adp.GetName(), adp.GetKind())
- return []reconcile.Request{}
- }
- log.Infof(
- "Created a synthetic Integration %s after %s %s",
- it.GetName(),
- adp.GetName(),
- adp.GetKind(),
- )
- return []reconcile.Request{
- {
- NamespacedName: types.NamespacedName{
- Namespace: it.Namespace,
- Name: it.Name,
- },
- },
- }
- }
- if err != nil {
- log.Infof("Could not create Integration %s: %s", adp.GetIntegrationName(), err.Error())
- return []reconcile.Request{}
- }
- }
- log.Errorf(err, "Could not get Integration %s", it.GetName())
- return []reconcile.Request{}
- }
-
- return []reconcile.Request{
- {
- NamespacedName: types.NamespacedName{
- Namespace: it.Namespace,
- Name: it.Name,
- },
- },
- }
-}
-
-// createSyntheticIntegration set all required values for a synthetic Integration.
-func createSyntheticIntegration(it *v1.Integration, adp NonManagedCamelApplicationAdapter) {
- // We need to create a synthetic Integration
- it.SetAnnotations(map[string]string{
- v1.IntegrationImportedNameLabel: adp.GetName(),
- v1.IntegrationImportedKindLabel: adp.GetKind(),
- v1.IntegrationSyntheticLabel: "true",
- })
- it.Spec = v1.IntegrationSpec{
- Traits: adp.GetTraits(),
- }
-}
-
-// NonManagedCamelApplicationAdapter represents a Camel application built and deployed outside the operator lifecycle.
-type NonManagedCamelApplicationAdapter interface {
- // GetName returns the name of the Camel application.
- GetName() string
- // GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, ...).
- GetKind() string
- // GetTraits in used to retrieve the trait configuration.
- GetTraits() v1.Traits
- // GetIntegrationName return the name of the Integration which has to be imported.
- GetIntegrationName() string
- // GetIntegrationNameSpace return the namespace of the Integration which has to be imported.
- GetIntegrationNameSpace() string
- // GetAppObj return the object from which we're importing.
- GetAppObj() ctrl.Object
-}
-
-// NonManagedCamelDeployment represents a regular Camel application built and deployed outside the operator lifecycle.
-type NonManagedCamelDeployment struct {
- deploy *appsv1.Deployment
-}
-
-// GetName returns the name of the Camel application.
-func (app *NonManagedCamelDeployment) GetName() string {
- return app.deploy.GetName()
-}
-
-// GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, ...).
-func (app *NonManagedCamelDeployment) GetKind() string {
- return "Deployment"
-}
-
-// GetTraits in used to retrieve the trait configuration.
-func (app *NonManagedCamelDeployment) GetTraits() v1.Traits {
- return v1.Traits{
- Container: &trait.ContainerTrait{
- Name: app.getContainerNameFromDeployment(),
- },
- }
-}
-
-// GetAppObj return the object from which we're importing.
-func (app *NonManagedCamelDeployment) GetAppObj() ctrl.Object {
- return app.deploy
-}
-
-// GetIntegrationName return the name of the Integration which has to be imported.
-func (app *NonManagedCamelDeployment) GetIntegrationName() string {
- return app.deploy.Labels[v1.IntegrationLabel]
-}
-
-// GetIntegrationNameSpace return the namespace of the Integration which has to be imported.
-func (app *NonManagedCamelDeployment) GetIntegrationNameSpace() string {
- return app.deploy.Namespace
-}
-
-// getContainerNameFromDeployment returns the container name which is running the Camel application.
-func (app *NonManagedCamelDeployment) getContainerNameFromDeployment() string {
- firstContainerName := ""
- for _, ct := range app.deploy.Spec.Template.Spec.Containers {
- // set as fallback if no container is named as the deployment
- if firstContainerName == "" {
- firstContainerName = app.deploy.Name
- }
- if ct.Name == app.deploy.Name {
- return app.deploy.Name
- }
- }
- return firstContainerName
-}
-
-// NonManagedCamelCronjob represents a cron Camel application built and deployed outside the operator lifecycle.
-type NonManagedCamelCronjob struct {
- cron *batchv1.CronJob
-}
-
-// GetName returns the name of the Camel application.
-func (app *NonManagedCamelCronjob) GetName() string {
- return app.cron.GetName()
-}
-
-// GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, ...).
-func (app *NonManagedCamelCronjob) GetKind() string {
- return "CronJob"
-}
-
-// GetTraits in used to retrieve the trait configuration.
-func (app *NonManagedCamelCronjob) GetTraits() v1.Traits {
- return v1.Traits{}
-}
-
-// GetIntegrationName return the name of the Integration which has to be imported.
-func (app *NonManagedCamelCronjob) GetIntegrationName() string {
- return app.cron.Labels[v1.IntegrationLabel]
-}
-
-// GetIntegrationNameSpace return the namespace of the Integration which has to be imported.
-func (app *NonManagedCamelCronjob) GetIntegrationNameSpace() string {
- return app.cron.Namespace
-}
-
-// GetAppObj return the object from which we're importing.
-func (app *NonManagedCamelCronjob) GetAppObj() ctrl.Object {
- return app.cron
-}
-
-// NonManagedCamelKnativeService represents a Knative Service based Camel application built and deployed outside the operator lifecycle.
-type NonManagedCamelKnativeService struct {
- ksvc *servingv1.Service
-}
-
-// GetName returns the name of the Camel application.
-func (app *NonManagedCamelKnativeService) GetName() string {
- return app.ksvc.GetName()
-}
-
-// GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, ...).
-func (app *NonManagedCamelKnativeService) GetKind() string {
- return "KnativeService"
-}
-
-// GetTraits in used to retrieve the trait configuration.
-func (app *NonManagedCamelKnativeService) GetTraits() v1.Traits {
- return v1.Traits{}
-}
-
-// GetIntegrationName return the name of the Integration which has to be imported.
-func (app *NonManagedCamelKnativeService) GetIntegrationName() string {
- return app.ksvc.Labels[v1.IntegrationLabel]
-}
-
-// GetIntegrationNameSpace return the namespace of the Integration which has to be imported.
-func (app *NonManagedCamelKnativeService) GetIntegrationNameSpace() string {
- return app.ksvc.Namespace
-}
-
-// GetAppObj return the object from which we're importing.
-func (app *NonManagedCamelKnativeService) GetAppObj() ctrl.Object {
- return app.ksvc
-}
diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go
index e2d3b32a3..fb86ed41d 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -60,7 +60,6 @@ func (action *monitorAction) CanHandle(integration *v1.Integration) bool {
return integration.Status.Phase == v1.IntegrationPhaseDeploying ||
integration.Status.Phase == v1.IntegrationPhaseRunning ||
integration.Status.Phase == v1.IntegrationPhaseError ||
- integration.Status.Phase == v1.IntegrationPhaseImportMissing ||
integration.Status.Phase == v1.IntegrationPhaseCannotMonitor
}
diff --git a/pkg/controller/integration/monitor_synthetic.go b/pkg/controller/integration/monitor_synthetic.go
index a10a03deb..a1aa86a43 100644
--- a/pkg/controller/integration/monitor_synthetic.go
+++ b/pkg/controller/integration/monitor_synthetic.go
@@ -19,10 +19,8 @@ package integration
import (
"context"
- "fmt"
corev1 "k8s.io/api/core/v1"
- k8serrors "k8s.io/apimachinery/pkg/api/errors"
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
"github.com/apache/camel-k/v2/pkg/trait"
@@ -44,22 +42,6 @@ func (action *monitorSyntheticAction) Name() string {
func (action *monitorSyntheticAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) {
environment, err := trait.NewSyntheticEnvironment(ctx, action.client, integration, nil)
if err != nil {
- if k8serrors.IsNotFound(err) {
- // Not an error: the resource from which we imported has been deleted, report in it status.
- // It may be a temporary situation, for example, if the deployment from which the Integration is imported
- // is being redeployed. For this reason we should keep the Integration instead of forcefully removing it.
- message := fmt.Sprintf(
- "import %s %s no longer available",
- integration.Annotations[v1.IntegrationImportedKindLabel],
- integration.Annotations[v1.IntegrationImportedNameLabel],
- )
- action.L.Info(message)
- integration.SetReadyConditionError(message)
- zero := int32(0)
- integration.Status.Phase = v1.IntegrationPhaseImportMissing
- integration.Status.Replicas = &zero
- return integration, nil
- }
// report the error
integration.Status.Phase = v1.IntegrationPhaseError
integration.SetReadyCondition(corev1.ConditionFalse, v1.IntegrationConditionImportingKindAvailableReason, err.Error())
diff --git a/pkg/controller/integration/predicate.go b/pkg/controller/integration/predicate.go
index 0feb71fec..79d61556a 100644
--- a/pkg/controller/integration/predicate.go
+++ b/pkg/controller/integration/predicate.go
@@ -21,7 +21,6 @@ import (
"reflect"
"k8s.io/apimachinery/pkg/api/equality"
- ctrl "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
@@ -56,39 +55,3 @@ func (StatusChangedPredicate) Update(e event.UpdateEvent) bool {
return !equality.Semantic.DeepDerivative(s1.Interface(), s2.Interface())
}
-
-// NonManagedObjectPredicate implements a generic update predicate function for managed object.
-type NonManagedObjectPredicate struct {
- predicate.Funcs
-}
-
-// Create --.
-func (NonManagedObjectPredicate) Create(e event.CreateEvent) bool {
- return !isManagedObject(e.Object)
-}
-
-// Update --.
-func (NonManagedObjectPredicate) Update(e event.UpdateEvent) bool {
- return !isManagedObject(e.ObjectNew)
-}
-
-// Delete --.
-func (NonManagedObjectPredicate) Delete(e event.DeleteEvent) bool {
- return !isManagedObject(e.Object)
-}
-
-// Generic --.
-func (NonManagedObjectPredicate) Generic(e event.GenericEvent) bool {
- return !isManagedObject(e.Object)
-}
-
-// isManagedObject returns true if the object is managed by an Integration.
-func isManagedObject(obj ctrl.Object) bool {
- for _, mr := range obj.GetOwnerReferences() {
- if mr.APIVersion == "camel.apache.org/v1" &&
- mr.Kind == "Integration" {
- return true
- }
- }
- return false
-}
diff --git a/pkg/controller/pipe/pipe_controller.go b/pkg/controller/pipe/pipe_controller.go
index 36da7fca1..5b174e435 100644
--- a/pkg/controller/pipe/pipe_controller.go
+++ b/pkg/controller/pipe/pipe_controller.go
@@ -66,7 +66,7 @@ func newReconciler(mgr manager.Manager, c client.Client) reconcile.Reconciler {
}
func add(mgr manager.Manager, r reconcile.Reconciler) error {
- c, err := controller.New("kamelet-binding-controller", mgr, controller.Options{Reconciler: r})
+ c, err := controller.New("pipe-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}
diff --git a/pkg/controller/synthetic/synthetic.go b/pkg/controller/synthetic/synthetic.go
new file mode 100644
index 000000000..bd785d318
--- /dev/null
+++ b/pkg/controller/synthetic/synthetic.go
@@ -0,0 +1,300 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package synthetic
+
+import (
+ "context"
+ "fmt"
+ "reflect"
+
+ v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait"
+ "github.com/apache/camel-k/v2/pkg/client"
+ "github.com/apache/camel-k/v2/pkg/platform"
+ "github.com/apache/camel-k/v2/pkg/util/kubernetes"
+ "github.com/apache/camel-k/v2/pkg/util/log"
+ appsv1 "k8s.io/api/apps/v1"
+ batchv1 "k8s.io/api/batch/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ clientgocache "k8s.io/client-go/tools/cache"
+ "knative.dev/serving/pkg/apis/serving"
+ servingv1 "knative.dev/serving/pkg/apis/serving/v1"
+ "sigs.k8s.io/controller-runtime/pkg/cache"
+ ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+// ManageSyntheticIntegrations is the controller for synthetic Integrations. Consider that the lifecycle of the objects are driven
+// by the way we are monitoring them. Since we're filtering by `camel.apache.org/integration` label in the cached clinet,
+// you must consider an add, update or delete
+// accordingly, ie, when the user label the resource, then it is considered as an add, when it removes the label, it is considered as a delete.
+// We must filter only non managed objects in order to avoid to conflict with the reconciliation loop of managed objects (owned by an Integration).
+func ManageSyntheticIntegrations(ctx context.Context, c client.Client, cache cache.Cache, reader ctrl.Reader) error {
+ informers, err := getInformers(ctx, c, cache)
+ if err != nil {
+ return err
+ }
+ for _, informer := range informers {
+ _, err := informer.AddEventHandler(clientgocache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ ctrlObj, ok := obj.(ctrl.Object)
+ if !ok {
+ log.Error(fmt.Errorf("type assertion failed: %v", obj), "Failed to retrieve Object on add event")
+ return
+ }
+ if !isManagedObject(ctrlObj) {
+ integrationName := ctrlObj.GetLabels()[v1.IntegrationLabel]
+ it, err := getSyntheticIntegration(ctx, c, ctrlObj.GetNamespace(), integrationName)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ adapter, err := nonManagedCamelApplicationFactory(ctrlObj)
+ if err != nil {
+ log.Errorf(err, "Some error happened while creating a Camel application adapter for %s", integrationName)
+ }
+ if err = createSyntheticIntegration(ctx, c, adapter.Integration()); err != nil {
+ log.Errorf(err, "Some error happened while creating a synthetic Integration %s", integrationName)
+ }
+ log.Infof("Created a synthetic Integration %s after %s resource object", it.GetName(), ctrlObj.GetName())
+ } else {
+ log.Errorf(err, "Some error happened while loading a synthetic Integration %s", integrationName)
+ }
+ } else {
+ if it.Status.Phase == v1.IntegrationPhaseImportMissing {
+ // Update with proper phase (reconciliation will take care)
+ it.Status.Phase = v1.IntegrationPhaseNone
+ if err = updateSyntheticIntegration(ctx, c, it); err != nil {
+ log.Errorf(err, "Some error happened while updatinf a synthetic Integration %s", integrationName)
+ }
+ } else {
+ log.Infof("Synthetic Integration %s is in phase %s. Skipping.", integrationName, it.Status.Phase)
+ }
+ }
+ }
+ },
+ DeleteFunc: func(obj interface{}) {
+ ctrlObj, ok := obj.(ctrl.Object)
+ if !ok {
+ log.Error(fmt.Errorf("type assertion failed: %v", obj), "Failed to retrieve Object on delete event")
+ return
+ }
+ if !isManagedObject(ctrlObj) {
+ integrationName := ctrlObj.GetLabels()[v1.IntegrationLabel]
+ // We must use a non caching client to understand if the object has been deleted from the cluster or only deleted from
+ // the cache (ie, user removed the importing label)
+ err := reader.Get(ctx, ctrl.ObjectKeyFromObject(ctrlObj), ctrlObj)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ // Object removed from the cluster
+ it, err := getSyntheticIntegration(ctx, c, ctrlObj.GetNamespace(), integrationName)
+ if err != nil {
+ log.Errorf(err, "Some error happened while loading a synthetic Integration %s", it.Name)
+ return
+ }
+ // The resource from which we imported has been deleted, report in it status.
+ // It may be a temporary situation, for example, if the deployment from which the Integration is imported
+ // is being redeployed. For this reason we should keep the Integration instead of forcefully removing it.
+ message := fmt.Sprintf(
+ "import %s %s no longer available",
+ it.Annotations[v1.IntegrationImportedKindLabel],
+ it.Annotations[v1.IntegrationImportedNameLabel],
+ )
+ it.SetReadyConditionError(message)
+ zero := int32(0)
+ it.Status.Phase = v1.IntegrationPhaseImportMissing
+ it.Status.Replicas = &zero
+ if err = updateSyntheticIntegration(ctx, c, it); err != nil {
+ log.Errorf(err, "Some error happened while updating a synthetic Integration %s", it.Name)
+ }
+ log.Infof("Updated synthetic Integration %s with status %s", it.GetName(), it.Status.Phase)
+ } else {
+ log.Errorf(err, "Some error happened while loading object %s from the cluster", ctrlObj.GetName())
+ return
+ }
+ } else {
+ // Importing label removed
+ if err = deleteSyntheticIntegration(ctx, c, ctrlObj.GetNamespace(), integrationName); err != nil {
+ log.Errorf(err, "Some error happened while deleting a synthetic Integration %s", integrationName)
+ }
+ log.Infof("Deleted synthetic Integration %s", integrationName)
+ }
+ }
+ },
+ })
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func getInformers(ctx context.Context, cl client.Client, c cache.Cache) ([]cache.Informer, error) {
+ deploy, err := c.GetInformer(ctx, &appsv1.Deployment{})
+ if err != nil {
+ return nil, err
+ }
+ informers := []cache.Informer{deploy}
+ // Watch for the CronJob conditionally
+ if ok, err := kubernetes.IsAPIResourceInstalled(cl, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil {
+ cron, err := c.GetInformer(ctx, &batchv1.CronJob{})
+ if err != nil {
+ return nil, err
+ }
+ informers = append(informers, cron)
+ }
+ // Watch for the Knative Services conditionally
+ if ok, err := kubernetes.IsAPIResourceInstalled(cl, servingv1.SchemeGroupVersion.String(), reflect.TypeOf(servingv1.Service{}).Name()); ok && err == nil {
+ if ok, err := kubernetes.CheckPermission(ctx, cl, serving.GroupName, "services", platform.GetOperatorWatchNamespace(), "", "watch"); ok && err == nil {
+ ksvc, err := c.GetInformer(ctx, &servingv1.Service{})
+ if err != nil {
+ return nil, err
+ }
+ informers = append(informers, ksvc)
+ }
+ }
+
+ return informers, nil
+}
+
+func getSyntheticIntegration(ctx context.Context, c client.Client, namespace, name string) (*v1.Integration, error) {
+ it := v1.NewIntegration(namespace, name)
+ err := c.Get(ctx, ctrl.ObjectKeyFromObject(&it), &it)
+ return &it, err
+}
+
+func createSyntheticIntegration(ctx context.Context, c client.Client, it *v1.Integration) error {
+ return c.Create(ctx, it, ctrl.FieldOwner("camel-k-operator"))
+}
+
+func deleteSyntheticIntegration(ctx context.Context, c client.Client, namespace, name string) error {
+ // As the Integration label was removed, we don't know which is the Synthetic integration to remove
+ it := v1.NewIntegration(namespace, name)
+ return c.Delete(ctx, &it)
+}
+
+func updateSyntheticIntegration(ctx context.Context, c client.Client, it *v1.Integration) error {
+ return c.Status().Update(ctx, it, ctrl.FieldOwner("camel-k-operator"))
+}
+
+// isManagedObject returns true if the object is managed by an Integration.
+func isManagedObject(obj ctrl.Object) bool {
+ for _, mr := range obj.GetOwnerReferences() {
+ if mr.APIVersion == "camel.apache.org/v1" &&
+ mr.Kind == "Integration" {
+ return true
+ }
+ }
+ return false
+}
+
+// nonManagedCamelApplicationAdapter represents a Camel application built and deployed outside the operator lifecycle.
+type nonManagedCamelApplicationAdapter interface {
+ // Integration return an Integration resource fed by the Camel application adapter.
+ Integration() *v1.Integration
+}
+
+func nonManagedCamelApplicationFactory(obj ctrl.Object) (nonManagedCamelApplicationAdapter, error) {
+ deploy, ok := obj.(*appsv1.Deployment)
+ if ok {
+ return &nonManagedCamelDeployment{deploy: deploy}, nil
+ }
+ cronjob, ok := obj.(*batchv1.CronJob)
+ if ok {
+ return &NonManagedCamelCronjob{cron: cronjob}, nil
+ }
+ ksvc, ok := obj.(*servingv1.Service)
+ if ok {
+ return &NonManagedCamelKnativeService{ksvc: ksvc}, nil
+ }
+ return nil, fmt.Errorf("unsupported %s object kind", obj)
+}
+
+// NonManagedCamelDeployment represents a regular Camel application built and deployed outside the operator lifecycle.
+type nonManagedCamelDeployment struct {
+ deploy *appsv1.Deployment
+}
+
+// Integration return an Integration resource fed by the Camel application adapter.
+func (app *nonManagedCamelDeployment) Integration() *v1.Integration {
+ it := v1.NewIntegration(app.deploy.Namespace, app.deploy.Labels[v1.IntegrationLabel])
+ it.SetAnnotations(map[string]string{
+ v1.IntegrationImportedNameLabel: app.deploy.Name,
+ v1.IntegrationImportedKindLabel: "Deployment",
+ v1.IntegrationSyntheticLabel: "true",
+ })
+ it.Spec = v1.IntegrationSpec{
+ Traits: v1.Traits{
+ Container: &trait.ContainerTrait{
+ Name: app.getContainerNameFromDeployment(),
+ },
+ },
+ }
+ return &it
+}
+
+// getContainerNameFromDeployment returns the container name which is running the Camel application.
+func (app *nonManagedCamelDeployment) getContainerNameFromDeployment() string {
+ firstContainerName := ""
+ for _, ct := range app.deploy.Spec.Template.Spec.Containers {
+ // set as fallback if no container is named as the deployment
+ if firstContainerName == "" {
+ firstContainerName = app.deploy.Name
+ }
+ if ct.Name == app.deploy.Name {
+ return app.deploy.Name
+ }
+ }
+ return firstContainerName
+}
+
+// NonManagedCamelCronjob represents a cron Camel application built and deployed outside the operator lifecycle.
+type NonManagedCamelCronjob struct {
+ cron *batchv1.CronJob
+}
+
+// Integration return an Integration resource fed by the Camel application adapter.
+func (app *NonManagedCamelCronjob) Integration() *v1.Integration {
+ it := v1.NewIntegration(app.cron.Namespace, app.cron.Labels[v1.IntegrationLabel])
+ it.SetAnnotations(map[string]string{
+ v1.IntegrationImportedNameLabel: app.cron.Name,
+ v1.IntegrationImportedKindLabel: "CronJob",
+ v1.IntegrationSyntheticLabel: "true",
+ })
+ it.Spec = v1.IntegrationSpec{
+ Traits: v1.Traits{},
+ }
+ return &it
+}
+
+// NonManagedCamelKnativeService represents a Knative Service based Camel application built and deployed outside the operator lifecycle.
+type NonManagedCamelKnativeService struct {
+ ksvc *servingv1.Service
+}
+
+// Integration return an Integration resource fed by the Camel application adapter.
+func (app *NonManagedCamelKnativeService) Integration() *v1.Integration {
+ it := v1.NewIntegration(app.ksvc.Namespace, app.ksvc.Labels[v1.IntegrationLabel])
+ it.SetAnnotations(map[string]string{
+ v1.IntegrationImportedNameLabel: app.ksvc.Name,
+ v1.IntegrationImportedKindLabel: "KnativeService",
+ v1.IntegrationSyntheticLabel: "true",
+ })
+ it.Spec = v1.IntegrationSpec{
+ Traits: v1.Traits{},
+ }
+ return &it
+}