You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by pc...@apache.org on 2024/01/09 14:21:49 UTC
(camel-k) 01/08: feat: import external Camel applications
This is an automated email from the ASF dual-hosted git repository.
pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit dbea5cc5800b4cc03be7e7c46193123c9c071393
Author: Pasquale Congiusti <pa...@gmail.com>
AuthorDate: Wed Nov 29 17:25:35 2023 +0100
feat: import external Camel applications
---
config/rbac/namespaced/operator-role-knative.yaml | 8 +
pkg/apis/camel/v1/integration_types.go | 18 +-
pkg/apis/camel/v1/integration_types_support.go | 15 ++
pkg/cmd/operator/operator.go | 1 -
pkg/controller/integration/initialize.go | 87 +++++++
.../integration/integration_controller.go | 173 ++++++++++----
.../integration/integration_controller_import.go | 249 +++++++++++++++++++++
pkg/controller/integration/monitor.go | 54 ++++-
pkg/controller/integration/monitor_cronjob.go | 16 ++
pkg/controller/integration/monitor_deployment.go | 13 ++
pkg/controller/integration/monitor_knative.go | 17 ++
pkg/controller/integration/monitor_synthetic.go | 70 ++++++
pkg/controller/integration/predicate.go | 37 +++
pkg/trait/camel.go | 3 +-
pkg/trait/platform.go | 3 +-
pkg/trait/trait.go | 71 +++++-
16 files changed, 770 insertions(+), 65 deletions(-)
diff --git a/config/rbac/namespaced/operator-role-knative.yaml b/config/rbac/namespaced/operator-role-knative.yaml
index 3cba80931..7e1d2f349 100644
--- a/config/rbac/namespaced/operator-role-knative.yaml
+++ b/config/rbac/namespaced/operator-role-knative.yaml
@@ -35,6 +35,14 @@ rules:
- patch
- update
- watch
+- apiGroups:
+ - serving.knative.dev
+ resources:
+ - revisions
+ verbs:
+ - get
+ - list
+ - watch
- apiGroups:
- eventing.knative.dev
resources:
diff --git a/pkg/apis/camel/v1/integration_types.go b/pkg/apis/camel/v1/integration_types.go
index 78dd40a8c..9bcecaad2 100644
--- a/pkg/apis/camel/v1/integration_types.go
+++ b/pkg/apis/camel/v1/integration_types.go
@@ -155,7 +155,13 @@ const (
IntegrationPhaseRunning IntegrationPhase = "Running"
// IntegrationPhaseError --.
IntegrationPhaseError IntegrationPhase = "Error"
+ // IntegrationPhaseImportMissing used when the application from which the Integration is imported has been deleted.
+ IntegrationPhaseImportMissing IntegrationPhase = "Application Missing"
+ // IntegrationPhaseCannotMonitor used when the application from which the Integration has not enough information to monitor its pods.
+ IntegrationPhaseCannotMonitor IntegrationPhase = "Cannot Monitor Pods"
+ // IntegrationConditionReady --.
+ IntegrationConditionReady IntegrationConditionType = "Ready"
// IntegrationConditionKitAvailable --.
IntegrationConditionKitAvailable IntegrationConditionType = "IntegrationKitAvailable"
// IntegrationConditionPlatformAvailable --.
@@ -178,10 +184,11 @@ const (
IntegrationConditionJolokiaAvailable IntegrationConditionType = "JolokiaAvailable"
// IntegrationConditionProbesAvailable --.
IntegrationConditionProbesAvailable IntegrationConditionType = "ProbesAvailable"
- // IntegrationConditionReady --.
- IntegrationConditionReady IntegrationConditionType = "Ready"
// IntegrationConditionTraitInfo --.
IntegrationConditionTraitInfo IntegrationConditionType = "TraitInfo"
+ // IntegrationConditionMonitoringPodsAvailable used to specify that the Pods generated are available for monitoring.
+ IntegrationConditionMonitoringPodsAvailable IntegrationConditionType = "MonitoringPodsAvailable"
+
// IntegrationConditionKitAvailableReason --.
IntegrationConditionKitAvailableReason string = "IntegrationKitAvailable"
// IntegrationConditionPlatformAvailableReason --.
@@ -220,7 +227,8 @@ const (
IntegrationConditionJolokiaAvailableReason string = "JolokiaAvailable"
// IntegrationConditionProbesAvailableReason --.
IntegrationConditionProbesAvailableReason string = "ProbesAvailable"
-
+ // IntegrationConditionMonitoringPodsAvailableReason used to specify that the Pods generated are available for monitoring.
+ IntegrationConditionMonitoringPodsAvailableReason string = "MonitoringPodsAvailable"
// IntegrationConditionKnativeServiceReadyReason --.
IntegrationConditionKnativeServiceReadyReason string = "KnativeServiceReady"
// IntegrationConditionDeploymentReadyReason --.
@@ -239,18 +247,18 @@ const (
IntegrationConditionRuntimeNotReadyReason string = "RuntimeNotReady"
// IntegrationConditionErrorReason --.
IntegrationConditionErrorReason string = "Error"
-
// IntegrationConditionInitializationFailedReason --.
IntegrationConditionInitializationFailedReason string = "InitializationFailed"
// IntegrationConditionUnsupportedLanguageReason --.
IntegrationConditionUnsupportedLanguageReason string = "UnsupportedLanguage"
-
// IntegrationConditionKameletsAvailable --.
IntegrationConditionKameletsAvailable IntegrationConditionType = "KameletsAvailable"
// IntegrationConditionKameletsAvailableReason --.
IntegrationConditionKameletsAvailableReason string = "KameletsAvailable"
// IntegrationConditionKameletsNotAvailableReason --.
IntegrationConditionKameletsNotAvailableReason string = "KameletsNotAvailable"
+ // IntegrationConditionImportingKindAvailableReason used (as false) if we're trying to import an unsupported kind.
+ IntegrationConditionImportingKindAvailableReason string = "ImportingKindAvailable"
)
// IntegrationCondition describes the state of a resource at a certain point.
diff --git a/pkg/apis/camel/v1/integration_types_support.go b/pkg/apis/camel/v1/integration_types_support.go
index ef24e207b..3342be76a 100644
--- a/pkg/apis/camel/v1/integration_types_support.go
+++ b/pkg/apis/camel/v1/integration_types_support.go
@@ -25,8 +25,18 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
+// IntegrationLabel is used to tag k8s object created by a given Integration.
const IntegrationLabel = "camel.apache.org/integration"
+// IntegrationSyntheticLabel is used to tag k8s synthetic Integrations.
+const IntegrationSyntheticLabel = "camel.apache.org/is-synthetic"
+
+// IntegrationImportedKindLabel specifies from what kind of resource an Integration was imported.
+const IntegrationImportedKindLabel = "camel.apache.org/imported-from-kind"
+
+// IntegrationImportedNameLabel specifies from what resource an Integration was imported.
+const IntegrationImportedNameLabel = "camel.apache.org/imported-from-name"
+
func NewIntegration(namespace string, name string) Integration {
return Integration{
TypeMeta: metav1.TypeMeta{
@@ -283,6 +293,11 @@ func (in *Integration) SetReadyConditionError(err string) {
in.SetReadyCondition(corev1.ConditionFalse, IntegrationConditionErrorReason, err)
}
+// IsSynthetic returns true for synthetic Integrations (non managed, likely imported from external deployments).
+func (in *Integration) IsSynthetic() bool {
+ return in.Annotations[IntegrationSyntheticLabel] == "true"
+}
+
// GetCondition returns the condition with the provided type.
func (in *IntegrationStatus) GetCondition(condType IntegrationConditionType) *IntegrationCondition {
for i := range in.Conditions {
diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go
index 04b5ea8b2..ab59ab638 100644
--- a/pkg/cmd/operator/operator.go
+++ b/pkg/cmd/operator/operator.go
@@ -188,7 +188,6 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID
selector := labels.NewSelector().Add(*hasIntegrationLabel)
selectors := map[ctrl.Object]cache.ByObject{
- &corev1.Pod{}: {Label: selector},
&appsv1.Deployment{}: {Label: selector},
&batchv1.Job{}: {Label: selector},
&servingv1.Service{}: {Label: selector},
diff --git a/pkg/controller/integration/initialize.go b/pkg/controller/integration/initialize.go
index a08dd28c6..ad8891647 100644
--- a/pkg/controller/integration/initialize.go
+++ b/pkg/controller/integration/initialize.go
@@ -19,6 +19,7 @@ package integration
import (
"context"
+ "fmt"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -53,6 +54,10 @@ func (action *initializeAction) CanHandle(integration *v1.Integration) bool {
func (action *initializeAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) {
action.L.Info("Initializing Integration")
+ if integration.Annotations[v1.IntegrationImportedNameLabel] != "" {
+ return action.importFromExternalApp(integration)
+ }
+
if _, err := trait.Apply(ctx, action.client, integration, nil); err != nil {
integration.Status.Phase = v1.IntegrationPhaseError
integration.SetReadyCondition(corev1.ConditionFalse,
@@ -91,3 +96,85 @@ func (action *initializeAction) Handle(ctx context.Context, integration *v1.Inte
return integration, nil
}
+
+func (action *initializeAction) importFromExternalApp(integration *v1.Integration) (*v1.Integration, error) {
+ readyMessage := fmt.Sprintf(
+ "imported from %s %s",
+ integration.Annotations[v1.IntegrationImportedNameLabel],
+ integration.Annotations[v1.IntegrationImportedKindLabel],
+ )
+ // We need to set the condition for which this Integration is imported (required later by monitoring)
+ integration.Status.SetConditions(
+ getCamelAppImportingCondition(
+ integration.Annotations[v1.IntegrationImportedKindLabel],
+ readyMessage,
+ )...,
+ )
+ // If it's ready, then we can safely assume the integration is running
+ if integration.IsConditionTrue(v1.IntegrationConditionReady) {
+ integration.Status.Phase = v1.IntegrationPhaseRunning
+ } else {
+ integration.Status.Phase = v1.IntegrationPhaseError
+ }
+
+ return integration, nil
+}
+
+func getCamelAppImportingCondition(kind, message string) []v1.IntegrationCondition {
+ switch kind {
+ case "Deployment":
+ return []v1.IntegrationCondition{
+ {
+ Type: v1.IntegrationConditionDeploymentAvailable,
+ Status: corev1.ConditionTrue,
+ Reason: v1.IntegrationConditionDeploymentAvailableReason,
+ Message: message,
+ },
+ {
+ Type: v1.IntegrationConditionReady,
+ Status: corev1.ConditionTrue,
+ Reason: v1.IntegrationConditionDeploymentReadyReason,
+ Message: message,
+ },
+ }
+ case "CronJob":
+ return []v1.IntegrationCondition{
+ {
+ Type: v1.IntegrationConditionCronJobAvailable,
+ Status: corev1.ConditionTrue,
+ Reason: v1.IntegrationConditionCronJobCreatedReason,
+ Message: message,
+ },
+ {
+ Type: v1.IntegrationConditionReady,
+ Status: corev1.ConditionTrue,
+ Reason: v1.IntegrationConditionDeploymentReadyReason,
+ Message: message,
+ },
+ }
+ case "KnativeService":
+ return []v1.IntegrationCondition{
+ {
+ Type: v1.IntegrationConditionKnativeServiceAvailable,
+ Status: corev1.ConditionTrue,
+ Reason: v1.IntegrationConditionKnativeServiceAvailableReason,
+ Message: message,
+ },
+ {
+ Type: v1.IntegrationConditionReady,
+ Status: corev1.ConditionTrue,
+ Reason: v1.IntegrationConditionKnativeServiceReadyReason,
+ Message: message,
+ },
+ }
+ default:
+ return []v1.IntegrationCondition{
+ {
+ Type: v1.IntegrationConditionReady,
+ Status: corev1.ConditionFalse,
+ Reason: v1.IntegrationConditionImportingKindAvailableReason,
+ Message: fmt.Sprintf("Unsupported %s import kind", kind),
+ },
+ }
+ }
+}
diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go
index a70a8713b..c3dcd30f4 100644
--- a/pkg/controller/integration/integration_controller.go
+++ b/pkg/controller/integration/integration_controller.go
@@ -27,12 +27,12 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
-
"sigs.k8s.io/controller-runtime/pkg/builder"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
@@ -324,30 +324,47 @@ func add(ctx context.Context, mgr manager.Manager, c client.Client, r reconcile.
// Evaluates to false if the object has been confirmed deleted
return !e.DeleteStateUnknown
},
- })).
- // Watch for IntegrationKit phase transitioning to ready or error, and
- // enqueue requests for any integration that matches the kit, in building
- // or running phase.
- Watches(&v1.IntegrationKit{},
- handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
- kit, ok := a.(*v1.IntegrationKit)
- if !ok {
- log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve integration list")
- return []reconcile.Request{}
- }
+ }))
+ // Watch for all the resources
+ watchIntegrationResources(c, b)
+ // Watch for the CronJob conditionally
+ if ok, err := kubernetes.IsAPIResourceInstalled(c, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil {
+ watchCronJobResources(c, b)
+ }
+ // Watch for the Knative Services conditionally
+ if ok, err := kubernetes.IsAPIResourceInstalled(c, servingv1.SchemeGroupVersion.String(), reflect.TypeOf(servingv1.Service{}).Name()); err != nil {
+ return err
+ } else if ok {
+ if err = watchKnativeResources(ctx, c, b); err != nil {
+ return err
+ }
+ }
- return integrationKitEnqueueRequestsFromMapFunc(ctx, c, kit)
- })).
+ return b.Complete(r)
+}
+
+func watchIntegrationResources(c client.Client, b *builder.Builder) {
+ // Watch for IntegrationKit phase transitioning to ready or error, and
+ // enqueue requests for any integration that matches the kit, in building
+ // or running phase.
+ b.Watches(&v1.IntegrationKit{},
+ handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
+ kit, ok := a.(*v1.IntegrationKit)
+ if !ok {
+ log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve IntegrationKit")
+ return []reconcile.Request{}
+ }
+ return integrationKitEnqueueRequestsFromMapFunc(ctx, c, kit)
+ })).
// Watch for IntegrationPlatform phase transitioning to ready and enqueue
// requests for any integrations that are in phase waiting for platform
Watches(&v1.IntegrationPlatform{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
p, ok := a.(*v1.IntegrationPlatform)
if !ok {
- log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to list integrations")
+ log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve IntegrationPlatform")
return []reconcile.Request{}
}
-
return integrationPlatformEnqueueRequestsFromMapFunc(ctx, c, p)
})).
// Watch for Configmaps or Secret used in the Integrations for updates
@@ -355,30 +372,29 @@ func add(ctx context.Context, mgr manager.Manager, c client.Client, r reconcile.
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
cm, ok := a.(*corev1.ConfigMap)
if !ok {
- log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve integration list")
+ log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve Configmap")
return []reconcile.Request{}
}
-
return configmapEnqueueRequestsFromMapFunc(ctx, c, cm)
})).
Watches(&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
secret, ok := a.(*corev1.Secret)
if !ok {
- log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve integration list")
+ log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve Secret")
return []reconcile.Request{}
}
-
return secretEnqueueRequestsFromMapFunc(ctx, c, secret)
})).
- // Watch for the owned Deployments
- Owns(&appsv1.Deployment{}, builder.WithPredicates(StatusChangedPredicate{})).
- // Watch for the Integration Pods
+ // Watch for the Integration Pods belonging to managed Integrations
Watches(&corev1.Pod{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
pod, ok := a.(*corev1.Pod)
if !ok {
- log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to list integration pods")
+ log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve Pod")
+ return []reconcile.Request{}
+ }
+ if pod.Labels[v1.IntegrationLabel] == "" {
return []reconcile.Request{}
}
return []reconcile.Request{
@@ -389,36 +405,90 @@ func add(ctx context.Context, mgr manager.Manager, c client.Client, r reconcile.
},
},
}
- }))
+ })).
+ // Watch for non managed Deployments (ie, imported)
+ Watches(&appsv1.Deployment{},
+ handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
+ deploy, ok := a.(*appsv1.Deployment)
+ if !ok {
+ log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve Deployment")
+ return []reconcile.Request{}
+ }
+ return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelDeployment{deploy: deploy})
+ }),
+ builder.WithPredicates(NonManagedObjectPredicate{}),
+ ).
+ // Watch for the owned Deployments
+ Owns(&appsv1.Deployment{}, builder.WithPredicates(StatusChangedPredicate{}))
+}
- if ok, err := kubernetes.IsAPIResourceInstalled(c, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil {
+func watchCronJobResources(c client.Client, b *builder.Builder) {
+ // Watch for non managed Deployments (ie, imported)
+ b.Watches(&batchv1.CronJob{},
+ handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
+ cron, ok := a.(*batchv1.CronJob)
+ if !ok {
+ log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve CronJob")
+ return []reconcile.Request{}
+ }
+ return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelCronjob{cron: cron})
+ }),
+ builder.WithPredicates(NonManagedObjectPredicate{}),
+ ).
// Watch for the owned CronJobs
- b.Owns(&batchv1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{}))
- }
+ Owns(&batchv1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{}))
+}
- // Watch for the owned Knative Services conditionally
- if ok, err := kubernetes.IsAPIResourceInstalled(c, servingv1.SchemeGroupVersion.String(), reflect.TypeOf(servingv1.Service{}).Name()); err != nil {
+func watchKnativeResources(ctx context.Context, c client.Client, b *builder.Builder) error {
+ // Check for permission to watch the Knative Service resource
+ checkCtx, cancel := context.WithTimeout(ctx, time.Minute)
+ defer cancel()
+ if ok, err := kubernetes.CheckPermission(checkCtx, c, serving.GroupName, "services", platform.GetOperatorWatchNamespace(), "", "watch"); err != nil {
return err
} else if ok {
- // Check for permission to watch the Knative Service resource
- checkCtx, cancel := context.WithTimeout(ctx, time.Minute)
- defer cancel()
- if ok, err = kubernetes.CheckPermission(checkCtx, c, serving.GroupName, "services", platform.GetOperatorWatchNamespace(), "", "watch"); err != nil {
- return err
- } else if ok {
- log.Info("KnativeService resources installed in the cluster. RBAC privileges assigned correctly, you can use Knative features.")
- b.Owns(&servingv1.Service{}, builder.WithPredicates(StatusChangedPredicate{}))
- } else {
- log.Info(` KnativeService resources installed in the cluster. However Camel K operator has not the required RBAC privileges. You can't use Knative features.
- Make sure to apply the required RBAC privileges and restart the Camel K Operator Pod to be able to watch for Camel K managed Knative Services.`)
- }
- } else {
- log.Info(`KnativeService resources are not installed in the cluster. You can't use Knative features. If you install Knative Serving resources after the
- Camel K operator, make sure to apply the required RBAC privileges and restart the Camel K Operator Pod to be able to watch for
- Camel K managed Knative Services.`)
+ // Watch for non managed Knative Service (ie, imported)
+ b.Watches(&servingv1.Service{},
+ handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
+ ksvc, ok := a.(*servingv1.Service)
+ if !ok {
+ log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve KnativeService")
+ return []reconcile.Request{}
+ }
+ return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelKnativeService{ksvc: ksvc})
+ }),
+ builder.WithPredicates(NonManagedObjectPredicate{}),
+ ).
+ // We must watch also Revisions, since it's the object that really change when a Knative service scales up and down
+ Watches(&servingv1.Revision{},
+ handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
+ revision, ok := a.(*servingv1.Revision)
+ if !ok {
+ log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve KnativeService Revision")
+ return []reconcile.Request{}
+ }
+ ksvc := &servingv1.Service{
+ TypeMeta: metav1.TypeMeta{
+ Kind: "Service",
+ APIVersion: servingv1.SchemeGroupVersion.String(),
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: revision.Labels["serving.knative.dev/service"],
+ Namespace: revision.Namespace,
+ },
+ }
+ err := c.Get(ctx, ctrl.ObjectKeyFromObject(ksvc), ksvc)
+ if err != nil {
+ // The revision does not belong to any managed (owned or imported) KnativeService, just discard
+ return []reconcile.Request{}
+ }
+ return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelKnativeService{ksvc: ksvc})
+ }),
+ builder.WithPredicates(NonManagedObjectPredicate{}),
+ ).
+ // Watch for the owned CronJobs
+ Owns(&servingv1.Service{}, builder.WithPredicates(StatusChangedPredicate{}))
}
-
- return b.Complete(r)
+ return nil
}
var _ reconcile.Reconciler = &reconcileIntegration{}
@@ -476,7 +546,12 @@ func (r *reconcileIntegration) Reconcile(ctx context.Context, request reconcile.
NewPlatformSetupAction(),
NewInitializeAction(),
newBuildKitAction(),
- NewMonitorAction(),
+ }
+
+ if instance.IsSynthetic() {
+ actions = append(actions, NewMonitorSyntheticAction())
+ } else {
+ actions = append(actions, NewMonitorAction())
}
for _, a := range actions {
diff --git a/pkg/controller/integration/integration_controller_import.go b/pkg/controller/integration/integration_controller_import.go
new file mode 100644
index 000000000..403185509
--- /dev/null
+++ b/pkg/controller/integration/integration_controller_import.go
@@ -0,0 +1,249 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package integration
+
+import (
+ "context"
+
+ v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait"
+ "github.com/apache/camel-k/v2/pkg/client"
+ "github.com/apache/camel-k/v2/pkg/util/log"
+ "github.com/apache/camel-k/v2/pkg/util/patch"
+ appsv1 "k8s.io/api/apps/v1"
+ batchv1 "k8s.io/api/batch/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/types"
+ servingv1 "knative.dev/serving/pkg/apis/serving/v1"
+ ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+)
+
+// nonManagedCamelAppEnqueueRequestsFromMapFunc represent the function to discover the Integration which has to be woke up: it creates a synthetic
+// Integration if the Integration does not exist. This is used to import external Camel applications.
+func nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx context.Context, c client.Client, adp NonManagedCamelApplicationAdapter) []reconcile.Request {
+ if adp.GetIntegrationName() == "" {
+ return []reconcile.Request{}
+ }
+ it := v1.NewIntegration(adp.GetIntegrationNameSpace(), adp.GetIntegrationName())
+ err := c.Get(ctx, ctrl.ObjectKeyFromObject(&it), &it)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ // We must perform this check to make sure the resource is not being deleted.
+ // In such case it makes no sense to create an Integration after it.
+ err := c.Get(ctx, ctrl.ObjectKeyFromObject(adp.GetAppObj()), adp.GetAppObj())
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ return []reconcile.Request{}
+ }
+ log.Errorf(err, "Some error happened while trying to get %s %s resource", adp.GetName(), adp.GetKind())
+ }
+ createSyntheticIntegration(&it, adp)
+ target, err := patch.ApplyPatch(&it)
+ if err == nil {
+ err = c.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator"))
+ if err != nil {
+ log.Errorf(err, "Some error happened while creating a synthetic Integration after %s %s resource", adp.GetName(), adp.GetKind())
+ return []reconcile.Request{}
+ }
+ log.Infof(
+ "Created a synthetic Integration %s after %s %s",
+ it.GetName(),
+ adp.GetName(),
+ adp.GetKind(),
+ )
+ return []reconcile.Request{
+ {
+ NamespacedName: types.NamespacedName{
+ Namespace: it.Namespace,
+ Name: it.Name,
+ },
+ },
+ }
+ }
+ if err != nil {
+ log.Infof("Could not create Integration %s: %s", adp.GetIntegrationName(), err.Error())
+ return []reconcile.Request{}
+ }
+ }
+ log.Errorf(err, "Could not get Integration %s", it.GetName())
+ return []reconcile.Request{}
+ }
+
+ return []reconcile.Request{
+ {
+ NamespacedName: types.NamespacedName{
+ Namespace: it.Namespace,
+ Name: it.Name,
+ },
+ },
+ }
+}
+
+// createSyntheticIntegration set all required values for a synthetic Integration.
+func createSyntheticIntegration(it *v1.Integration, adp NonManagedCamelApplicationAdapter) {
+ // We need to create a synthetic Integration
+ it.SetAnnotations(map[string]string{
+ v1.IntegrationImportedNameLabel: adp.GetName(),
+ v1.IntegrationImportedKindLabel: adp.GetKind(),
+ v1.IntegrationSyntheticLabel: "true",
+ })
+ it.Spec = v1.IntegrationSpec{
+ Traits: adp.GetTraits(),
+ }
+}
+
+// NonManagedCamelApplicationAdapter represents a Camel application built and deployed outside the operator lifecycle.
+type NonManagedCamelApplicationAdapter interface {
+ // GetName returns the name of the Camel application.
+ GetName() string
+ // GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, ...).
+ GetKind() string
+ // GetTraits in used to retrieve the trait configuration.
+ GetTraits() v1.Traits
+ // GetIntegrationName return the name of the Integration which has to be imported.
+ GetIntegrationName() string
+ // GetIntegrationNameSpace return the namespace of the Integration which has to be imported.
+ GetIntegrationNameSpace() string
+ // GetAppObj return the object from which we're importing.
+ GetAppObj() ctrl.Object
+}
+
+// NonManagedCamelDeployment represents a regular Camel application built and deployed outside the operator lifecycle.
+type NonManagedCamelDeployment struct {
+ deploy *appsv1.Deployment
+}
+
+// GetName returns the name of the Camel application.
+func (app *NonManagedCamelDeployment) GetName() string {
+ return app.deploy.GetName()
+}
+
+// GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, ...).
+func (app *NonManagedCamelDeployment) GetKind() string {
+ return "Deployment"
+}
+
+// GetTraits in used to retrieve the trait configuration.
+func (app *NonManagedCamelDeployment) GetTraits() v1.Traits {
+ return v1.Traits{
+ Container: &trait.ContainerTrait{
+ Name: app.getContainerNameFromDeployment(),
+ },
+ }
+}
+
+// GetAppObj return the object from which we're importing.
+func (app *NonManagedCamelDeployment) GetAppObj() ctrl.Object {
+ return app.deploy
+}
+
+// GetIntegrationName return the name of the Integration which has to be imported.
+func (app *NonManagedCamelDeployment) GetIntegrationName() string {
+ return app.deploy.Labels[v1.IntegrationLabel]
+}
+
+// GetIntegrationNameSpace return the namespace of the Integration which has to be imported.
+func (app *NonManagedCamelDeployment) GetIntegrationNameSpace() string {
+ return app.deploy.Namespace
+}
+
+// getContainerNameFromDeployment returns the container name which is running the Camel application.
+func (app *NonManagedCamelDeployment) getContainerNameFromDeployment() string {
+ firstContainerName := ""
+ for _, ct := range app.deploy.Spec.Template.Spec.Containers {
+ // set as fallback if no container is named as the deployment
+ if firstContainerName == "" {
+ firstContainerName = app.deploy.Name
+ }
+ if ct.Name == app.deploy.Name {
+ return app.deploy.Name
+ }
+ }
+ return firstContainerName
+}
+
+// NonManagedCamelCronjob represents a cron Camel application built and deployed outside the operator lifecycle.
+type NonManagedCamelCronjob struct {
+ cron *batchv1.CronJob
+}
+
+// GetName returns the name of the Camel application.
+func (app *NonManagedCamelCronjob) GetName() string {
+ return app.cron.GetName()
+}
+
+// GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, ...).
+func (app *NonManagedCamelCronjob) GetKind() string {
+ return "CronJob"
+}
+
+// GetTraits in used to retrieve the trait configuration.
+func (app *NonManagedCamelCronjob) GetTraits() v1.Traits {
+ return v1.Traits{}
+}
+
+// GetIntegrationName return the name of the Integration which has to be imported.
+func (app *NonManagedCamelCronjob) GetIntegrationName() string {
+ return app.cron.Labels[v1.IntegrationLabel]
+}
+
+// GetIntegrationNameSpace return the namespace of the Integration which has to be imported.
+func (app *NonManagedCamelCronjob) GetIntegrationNameSpace() string {
+ return app.cron.Namespace
+}
+
+// GetAppObj return the object from which we're importing.
+func (app *NonManagedCamelCronjob) GetAppObj() ctrl.Object {
+ return app.cron
+}
+
+// NonManagedCamelKnativeService represents a Knative Service based Camel application built and deployed outside the operator lifecycle.
+type NonManagedCamelKnativeService struct {
+ ksvc *servingv1.Service
+}
+
+// GetName returns the name of the Camel application.
+func (app *NonManagedCamelKnativeService) GetName() string {
+ return app.ksvc.GetName()
+}
+
+// GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, ...).
+func (app *NonManagedCamelKnativeService) GetKind() string {
+ return "KnativeService"
+}
+
+// GetTraits in used to retrieve the trait configuration.
+func (app *NonManagedCamelKnativeService) GetTraits() v1.Traits {
+ return v1.Traits{}
+}
+
+// GetIntegrationName return the name of the Integration which has to be imported.
+func (app *NonManagedCamelKnativeService) GetIntegrationName() string {
+ return app.ksvc.Labels[v1.IntegrationLabel]
+}
+
+// GetIntegrationNameSpace return the namespace of the Integration which has to be imported.
+func (app *NonManagedCamelKnativeService) GetIntegrationNameSpace() string {
+ return app.ksvc.Namespace
+}
+
+// GetAppObj return the object from which we're importing.
+func (app *NonManagedCamelKnativeService) GetAppObj() ctrl.Object {
+ return app.ksvc
+}
diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go
index 9a6208fcb..5630f09c7 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -28,6 +28,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
@@ -43,6 +44,7 @@ import (
utilResource "github.com/apache/camel-k/v2/pkg/util/resource"
)
+// NewMonitorAction is an action used to monitor manager Integrations.
func NewMonitorAction() Action {
return &monitorAction{}
}
@@ -58,7 +60,9 @@ func (action *monitorAction) Name() string {
func (action *monitorAction) CanHandle(integration *v1.Integration) bool {
return integration.Status.Phase == v1.IntegrationPhaseDeploying ||
integration.Status.Phase == v1.IntegrationPhaseRunning ||
- integration.Status.Phase == v1.IntegrationPhaseError
+ integration.Status.Phase == v1.IntegrationPhaseError ||
+ integration.Status.Phase == v1.IntegrationPhaseImportMissing ||
+ integration.Status.Phase == v1.IntegrationPhaseCannotMonitor
}
func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) {
@@ -124,16 +128,51 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra
return nil, err
}
+ return action.monitorPods(ctx, environment, integration)
+}
+
+func (action *monitorAction) monitorPods(ctx context.Context, environment *trait.Environment, integration *v1.Integration) (*v1.Integration, error) {
+ controller, err := action.newController(environment, integration)
+ if err != nil {
+ return nil, err
+ }
+ if controller.isEmptySelector() {
+ // This is happening when the Deployment, CronJob, etc resources
+ // have no selector or labels to identify sibling Pods.
+ integration.Status.Phase = v1.IntegrationPhaseCannotMonitor
+ integration.Status.SetConditions(
+ v1.IntegrationCondition{
+ Type: v1.IntegrationConditionMonitoringPodsAvailable,
+ Status: corev1.ConditionFalse,
+ Reason: v1.IntegrationConditionMonitoringPodsAvailableReason,
+ Message: fmt.Sprintf("Could not find any selector for %s. Make sure to include any label in the template and the Pods generated to inherit such label for monitoring purposes.", controller.getControllerName()),
+ },
+ )
+ return integration, nil
+ }
+
+ controllerSelector := controller.getSelector()
+ selector, err := metav1.LabelSelectorAsSelector(&controllerSelector)
+ if err != nil {
+ return nil, err
+ }
+ integration.Status.SetConditions(
+ v1.IntegrationCondition{
+ Type: v1.IntegrationConditionMonitoringPodsAvailable,
+ Status: corev1.ConditionTrue,
+ Reason: v1.IntegrationConditionMonitoringPodsAvailableReason,
+ },
+ )
// Enforce the scale sub-resource label selector.
// It is used by the HPA that queries the scale sub-resource endpoint,
// to list the pods owned by the integration.
- integration.Status.Selector = v1.IntegrationLabel + "=" + integration.Name
+ integration.Status.Selector = selector.String()
// Update the replicas count
pendingPods := &corev1.PodList{}
err = action.client.List(ctx, pendingPods,
ctrl.InNamespace(integration.Namespace),
- ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name},
+ &ctrl.ListOptions{LabelSelector: selector},
ctrl.MatchingFields{"status.phase": string(corev1.PodPending)})
if err != nil {
return nil, err
@@ -141,7 +180,7 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra
runningPods := &corev1.PodList{}
err = action.client.List(ctx, runningPods,
ctrl.InNamespace(integration.Namespace),
- ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name},
+ &ctrl.ListOptions{LabelSelector: selector},
ctrl.MatchingFields{"status.phase": string(corev1.PodRunning)})
if err != nil {
return nil, err
@@ -161,7 +200,7 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra
integration.Status.Phase = v1.IntegrationPhaseRunning
}
if err = action.updateIntegrationPhaseAndReadyCondition(
- ctx, environment, integration, pendingPods.Items, runningPods.Items,
+ ctx, controller, environment, integration, pendingPods.Items, runningPods.Items,
); err != nil {
return nil, err
}
@@ -255,6 +294,9 @@ type controller interface {
checkReadyCondition(ctx context.Context) (bool, error)
getPodSpec() corev1.PodSpec
updateReadyCondition(readyPods int) bool
+ getSelector() metav1.LabelSelector
+ isEmptySelector() bool
+ getControllerName() string
}
func (action *monitorAction) newController(env *trait.Environment, integration *v1.Integration) (controller, error) {
@@ -311,7 +353,7 @@ func getUpdatedController(env *trait.Environment, obj ctrl.Object) ctrl.Object {
}
func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(
- ctx context.Context, environment *trait.Environment, integration *v1.Integration,
+ ctx context.Context, controller controller, environment *trait.Environment, integration *v1.Integration,
pendingPods []corev1.Pod, runningPods []corev1.Pod,
) error {
controller, err := action.newController(environment, integration)
diff --git a/pkg/controller/integration/monitor_cronjob.go b/pkg/controller/integration/monitor_cronjob.go
index 1620a66c3..f5b9a6419 100644
--- a/pkg/controller/integration/monitor_cronjob.go
+++ b/pkg/controller/integration/monitor_cronjob.go
@@ -23,6 +23,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
@@ -110,3 +111,18 @@ func (c *cronJobController) updateReadyCondition(readyPods int) bool {
return false
}
+
+func (c *cronJobController) getSelector() metav1.LabelSelector {
+ // We use all the labels which will be transferred to the Pod generated
+ return metav1.LabelSelector{
+ MatchLabels: c.obj.Spec.JobTemplate.Spec.Template.Labels,
+ }
+}
+
+func (c *cronJobController) isEmptySelector() bool {
+ return c.obj.Spec.JobTemplate.Spec.Template.Labels == nil
+}
+
+func (c *cronJobController) getControllerName() string {
+ return fmt.Sprintf("CronJob/%s", c.obj.Name)
+}
diff --git a/pkg/controller/integration/monitor_deployment.go b/pkg/controller/integration/monitor_deployment.go
index e2f823c16..e3325f8ea 100644
--- a/pkg/controller/integration/monitor_deployment.go
+++ b/pkg/controller/integration/monitor_deployment.go
@@ -23,6 +23,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
"github.com/apache/camel-k/v2/pkg/util/kubernetes"
@@ -91,3 +92,15 @@ func (c *deploymentController) updateReadyCondition(readyPods int) bool {
return false
}
+
+func (c *deploymentController) getSelector() metav1.LabelSelector {
+ return *c.obj.Spec.Selector
+}
+
+func (c *deploymentController) isEmptySelector() bool {
+ return c.obj.Spec.Selector.MatchExpressions == nil && c.obj.Spec.Selector.MatchLabels == nil
+}
+
+func (c *deploymentController) getControllerName() string {
+ return fmt.Sprintf("Deployment/%s", c.obj.Name)
+}
diff --git a/pkg/controller/integration/monitor_knative.go b/pkg/controller/integration/monitor_knative.go
index 06b7dc82b..ed614f1a1 100644
--- a/pkg/controller/integration/monitor_knative.go
+++ b/pkg/controller/integration/monitor_knative.go
@@ -19,8 +19,10 @@ package integration
import (
"context"
+ "fmt"
corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
servingv1 "knative.dev/serving/pkg/apis/serving/v1"
@@ -63,3 +65,18 @@ func (c *knativeServiceController) updateReadyCondition(readyPods int) bool {
return false
}
+
+func (c *knativeServiceController) getSelector() metav1.LabelSelector {
+ // We use all the labels which will be transferred to the Pod generated
+ return metav1.LabelSelector{
+ MatchLabels: c.obj.Spec.Template.Labels,
+ }
+}
+
+func (c *knativeServiceController) isEmptySelector() bool {
+ return c.obj.Spec.Template.Labels == nil
+}
+
+func (c *knativeServiceController) getControllerName() string {
+ return fmt.Sprintf("KnativeService/%s", c.obj.Name)
+}
diff --git a/pkg/controller/integration/monitor_synthetic.go b/pkg/controller/integration/monitor_synthetic.go
new file mode 100644
index 000000000..a10a03deb
--- /dev/null
+++ b/pkg/controller/integration/monitor_synthetic.go
@@ -0,0 +1,70 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package integration
+
+import (
+ "context"
+ "fmt"
+
+ corev1 "k8s.io/api/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+
+ v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ "github.com/apache/camel-k/v2/pkg/trait"
+)
+
+// NewMonitorSyntheticAction is an action used to monitor synthetic Integrations.
+func NewMonitorSyntheticAction() Action {
+ return &monitorSyntheticAction{}
+}
+
+type monitorSyntheticAction struct {
+ monitorAction
+}
+
+func (action *monitorSyntheticAction) Name() string {
+ return "monitor-synthetic"
+}
+
+func (action *monitorSyntheticAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) {
+ environment, err := trait.NewSyntheticEnvironment(ctx, action.client, integration, nil)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ // Not an error: the resource from which we imported has been deleted, report in it status.
+ // It may be a temporary situation, for example, if the deployment from which the Integration is imported
+ // is being redeployed. For this reason we should keep the Integration instead of forcefully removing it.
+ message := fmt.Sprintf(
+ "import %s %s no longer available",
+ integration.Annotations[v1.IntegrationImportedKindLabel],
+ integration.Annotations[v1.IntegrationImportedNameLabel],
+ )
+ action.L.Info(message)
+ integration.SetReadyConditionError(message)
+ zero := int32(0)
+ integration.Status.Phase = v1.IntegrationPhaseImportMissing
+ integration.Status.Replicas = &zero
+ return integration, nil
+ }
+ // report the error
+ integration.Status.Phase = v1.IntegrationPhaseError
+ integration.SetReadyCondition(corev1.ConditionFalse, v1.IntegrationConditionImportingKindAvailableReason, err.Error())
+ return integration, err
+ }
+
+ return action.monitorPods(ctx, environment, integration)
+}
diff --git a/pkg/controller/integration/predicate.go b/pkg/controller/integration/predicate.go
index 79d61556a..0feb71fec 100644
--- a/pkg/controller/integration/predicate.go
+++ b/pkg/controller/integration/predicate.go
@@ -21,6 +21,7 @@ import (
"reflect"
"k8s.io/apimachinery/pkg/api/equality"
+ ctrl "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
@@ -55,3 +56,39 @@ func (StatusChangedPredicate) Update(e event.UpdateEvent) bool {
return !equality.Semantic.DeepDerivative(s1.Interface(), s2.Interface())
}
+
+// NonManagedObjectPredicate implements a generic update predicate function for managed object.
+type NonManagedObjectPredicate struct {
+ predicate.Funcs
+}
+
+// Create --.
+func (NonManagedObjectPredicate) Create(e event.CreateEvent) bool {
+ return !isManagedObject(e.Object)
+}
+
+// Update --.
+func (NonManagedObjectPredicate) Update(e event.UpdateEvent) bool {
+ return !isManagedObject(e.ObjectNew)
+}
+
+// Delete --.
+func (NonManagedObjectPredicate) Delete(e event.DeleteEvent) bool {
+ return !isManagedObject(e.Object)
+}
+
+// Generic --.
+func (NonManagedObjectPredicate) Generic(e event.GenericEvent) bool {
+ return !isManagedObject(e.Object)
+}
+
+// isManagedObject returns true if the object is managed by an Integration.
+func isManagedObject(obj ctrl.Object) bool {
+ for _, mr := range obj.GetOwnerReferences() {
+ if mr.APIVersion == "camel.apache.org/v1" &&
+ mr.Kind == "Integration" {
+ return true
+ }
+ }
+ return false
+}
diff --git a/pkg/trait/camel.go b/pkg/trait/camel.go
index 2a4e7b3f4..71a24550e 100644
--- a/pkg/trait/camel.go
+++ b/pkg/trait/camel.go
@@ -64,7 +64,8 @@ func (t *camelTrait) Configure(e *Environment) (bool, *TraitCondition, error) {
t.RuntimeVersion = determineRuntimeVersion(e)
}
- return true, nil, nil
+ // Don't run this trait for a synthetic Integration
+ return e.Integration == nil || !e.Integration.IsSynthetic(), nil, nil
}
func (t *camelTrait) Apply(e *Environment) error {
diff --git a/pkg/trait/platform.go b/pkg/trait/platform.go
index 58e545597..ec3fb1e04 100644
--- a/pkg/trait/platform.go
+++ b/pkg/trait/platform.go
@@ -73,7 +73,8 @@ func (t *platformTrait) Configure(e *Environment) (bool, *TraitCondition, error)
}
}
- return true, nil, nil
+ // Don't run this trait for a synthetic Integration
+ return e.Integration == nil || !e.Integration.IsSynthetic(), nil, nil
}
func (t *platformTrait) Apply(e *Environment) error {
diff --git a/pkg/trait/trait.go b/pkg/trait/trait.go
index 33676616f..16794ee12 100644
--- a/pkg/trait/trait.go
+++ b/pkg/trait/trait.go
@@ -24,13 +24,15 @@ import (
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
"github.com/apache/camel-k/v2/pkg/client"
"github.com/apache/camel-k/v2/pkg/platform"
"github.com/apache/camel-k/v2/pkg/util/kubernetes"
"github.com/apache/camel-k/v2/pkg/util/log"
- k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
+ serving "knative.dev/serving/pkg/apis/serving/v1"
+ ctrl "sigs.k8s.io/controller-runtime/pkg/client"
)
func Apply(ctx context.Context, c client.Client, integration *v1.Integration, kit *v1.IntegrationKit) (*Environment, error) {
@@ -97,7 +99,7 @@ func newEnvironment(ctx context.Context, c client.Client, integration *v1.Integr
return nil, errors.New("neither integration nor kit are set")
}
- var obj k8sclient.Object
+ var obj ctrl.Object
if integration != nil {
obj = integration
} else if kit != nil {
@@ -134,3 +136,68 @@ func newEnvironment(ctx context.Context, c client.Client, integration *v1.Integr
return &env, nil
}
+
+// NewSyntheticEnvironment creates an environment suitable for a synthetic Integration.
+func NewSyntheticEnvironment(ctx context.Context, c client.Client, integration *v1.Integration, kit *v1.IntegrationKit) (*Environment, error) {
+ if integration == nil && kit == nil {
+ return nil, errors.New("neither integration nor kit are set")
+ }
+
+ env := Environment{
+ Ctx: ctx,
+ Platform: nil,
+ Client: c,
+ IntegrationKit: kit,
+ Integration: integration,
+ ExecutedTraits: make([]Trait, 0),
+ Resources: kubernetes.NewCollection(),
+ EnvVars: make([]corev1.EnvVar, 0),
+ ApplicationProperties: make(map[string]string),
+ }
+
+ catalog := NewCatalog(c)
+ // set the catalog
+ env.Catalog = catalog
+ // we need to simulate the execution of the traits to fill certain values used later by monitoring
+ _, err := catalog.apply(&env)
+ if err != nil {
+ return nil, fmt.Errorf("error during trait customization: %w", err)
+ }
+ camelApp, err := getCamelAppObject(
+ ctx,
+ c,
+ integration.Annotations[v1.IntegrationImportedKindLabel],
+ integration.Namespace,
+ integration.Annotations[v1.IntegrationImportedNameLabel],
+ )
+ if err != nil {
+ return nil, err
+ }
+ env.Resources.Add(camelApp)
+
+ return &env, nil
+}
+
+func getCamelAppObject(ctx context.Context, c client.Client, kind, namespace, name string) (ctrl.Object, error) {
+ switch kind {
+ case "Deployment":
+ return c.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
+ case "CronJob":
+ return c.BatchV1().CronJobs(namespace).Get(ctx, name, metav1.GetOptions{})
+ case "KnativeService":
+ ksvc := &serving.Service{
+ TypeMeta: metav1.TypeMeta{
+ Kind: "Service",
+ APIVersion: serving.SchemeGroupVersion.String(),
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: namespace,
+ },
+ }
+ err := c.Get(ctx, ctrl.ObjectKeyFromObject(ksvc), ksvc)
+ return ksvc, err
+ default:
+ return nil, fmt.Errorf("cannot create a synthetic environment for %s kind", kind)
+ }
+}