You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2020/10/28 14:05:47 UTC

[camel-k] branch master updated: Fix #1785: propagate klb changes to integrations

This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b46f84  Fix #1785: propagate klb changes to integrations
3b46f84 is described below

commit 3b46f849d2ce159ce0b8a32ac6592d9784ba898d
Author: nicolaferraro <ni...@gmail.com>
AuthorDate: Mon Oct 26 15:11:41 2020 +0100

    Fix #1785: propagate klb changes to integrations
---
 e2e/knative/files/display.groovy                   |  20 ++++
 e2e/knative/kamelet_test.go                        |  60 ++++++++++++
 e2e/support/test_support.go                        |  90 ++++++++++++++++-
 pkg/cmd/reset.go                                   |  16 +--
 .../kameletbinding/{initialize.go => common.go}    | 101 ++-----------------
 pkg/controller/kameletbinding/initialize.go        | 108 +--------------------
 .../kameletbinding/kamelet_binding_controller.go   |  10 ++
 pkg/controller/kameletbinding/monitor.go           |  39 +++++++-
 8 files changed, 233 insertions(+), 211 deletions(-)

diff --git a/e2e/knative/files/display.groovy b/e2e/knative/files/display.groovy
new file mode 100644
index 0000000..43a595f
--- /dev/null
+++ b/e2e/knative/files/display.groovy
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+from('knative:channel/messages')
+    .convertBodyTo(String.class)
+    .to('log:info?showAll=false')
diff --git a/e2e/knative/kamelet_test.go b/e2e/knative/kamelet_test.go
new file mode 100644
index 0000000..cfc90f3
--- /dev/null
+++ b/e2e/knative/kamelet_test.go
@@ -0,0 +1,60 @@
+// +build integration
+
+// To enable compilation of this file in Goland, go to "Settings -> Go -> Vendoring & Build Tags -> Custom Tags" and add "integration"
+
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package knative
+
+import (
+	"testing"
+
+	. "github.com/apache/camel-k/e2e/support"
+	camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+	. "github.com/onsi/gomega"
+	v1 "k8s.io/api/core/v1"
+	messaging "knative.dev/eventing/pkg/apis/messaging/v1beta1"
+)
+
+// Test that kamelet binding can be changed and changes propagated to integrations
+func TestKameletChange(t *testing.T) {
+
+	WithNewTestNamespace(t, func(ns string) {
+		RegisterTestingT(t)
+
+		Expect(Kamel("install", "-n", ns).Execute()).Should(BeNil())
+		Expect(CreateTimerKamelet(ns, "timer-source")()).Should(BeNil())
+		Expect(CreateKnativeChannelv1Beta1(ns, "messages")()).Should(BeNil())
+		Expect(Kamel("run", "-n", ns, "files/display.groovy", "-w").Execute()).Should(BeNil())
+		ref := v1.ObjectReference{
+			Kind:       "InMemoryChannel",
+			Name:       "messages",
+			APIVersion: messaging.SchemeGroupVersion.String(),
+		}
+		Expect(BindKameletTo(ns, "timer-binding", "timer-source", ref, map[string]string{"message": "message is Hello"})()).Should(BeNil())
+		Eventually(IntegrationPodPhase(ns, "timer-binding"), TestTimeoutMedium).Should(Equal(v1.PodRunning))
+		Eventually(IntegrationCondition(ns, "timer-binding", camelv1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(v1.ConditionTrue))
+		Eventually(IntegrationLogs(ns, "display"), TestTimeoutShort).Should(ContainSubstring("message is Hello"))
+
+		Expect(BindKameletTo(ns, "timer-binding", "timer-source", ref, map[string]string{"message": "message is Hi"})()).Should(BeNil())
+		Eventually(IntegrationPodPhase(ns, "timer-binding"), TestTimeoutMedium).Should(Equal(v1.PodRunning))
+		Eventually(IntegrationCondition(ns, "timer-binding", camelv1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(v1.ConditionTrue))
+		Eventually(IntegrationLogs(ns, "display"), TestTimeoutShort).Should(ContainSubstring("message is Hi"))
+	})
+
+}
diff --git a/e2e/support/test_support.go b/e2e/support/test_support.go
index c070c87..e572602 100644
--- a/e2e/support/test_support.go
+++ b/e2e/support/test_support.go
@@ -23,6 +23,7 @@ package support
 
 import (
 	"context"
+	"encoding/json"
 	"errors"
 	"fmt"
 	"io"
@@ -33,9 +34,10 @@ import (
 	"testing"
 	"time"
 
+	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+	"github.com/apache/camel-k/pkg/util/kubernetes"
 	"github.com/google/uuid"
 	"github.com/onsi/gomega"
-
 	"github.com/spf13/cobra"
 	appsv1 "k8s.io/api/apps/v1"
 	"k8s.io/api/batch/v1beta1"
@@ -891,6 +893,92 @@ func CreateKnativeChannelv1Beta1(ns string, name string) func() error {
 }
 
 /*
+	Kamelets
+*/
+
+func CreateTimerKamelet(ns string, name string) func() error {
+	return func() error {
+		kamelet := v1alpha1.Kamelet{
+			ObjectMeta: metav1.ObjectMeta{
+				Namespace: ns,
+				Name:      name,
+			},
+			Spec: v1alpha1.KameletSpec{
+				Definition: v1alpha1.JSONSchemaProps{
+					Properties: map[string]v1alpha1.JSONSchemaProps{
+						"message": {
+							Type: "string",
+						},
+					},
+				},
+				Flow: asFlow(map[string]interface{}{
+					"from": map[string]interface{}{
+						"uri": "timer:tick",
+						"steps": []map[string]interface{}{
+							{
+								"set-body": map[string]interface{}{
+									"constant": "{{message}}",
+								},
+							},
+							{
+								"to": "kamelet:sink",
+							},
+						},
+					},
+				}),
+			},
+		}
+		return TestClient.Create(TestContext, &kamelet)
+	}
+}
+
+func BindKameletTo(ns, name, from string, to corev1.ObjectReference, properties map[string]string) func() error {
+	return func() error {
+		kb := v1alpha1.KameletBinding{
+			ObjectMeta: metav1.ObjectMeta{
+				Namespace: ns,
+				Name:      name,
+			},
+			Spec: v1alpha1.KameletBindingSpec{
+				Source: v1alpha1.Endpoint{
+					Ref: &corev1.ObjectReference{
+						Kind:       "Kamelet",
+						APIVersion: v1alpha1.SchemeGroupVersion.String(),
+						Name:       from,
+					},
+					Properties: asEndpointProperties(properties),
+				},
+				Sink: v1alpha1.Endpoint{
+					Ref:        &to,
+					Properties: asEndpointProperties(map[string]string{}),
+				},
+			},
+		}
+		return kubernetes.ReplaceResource(TestContext, TestClient, &kb)
+	}
+}
+
+func asFlow(source map[string]interface{}) *v1.Flow {
+	bytes, err := json.Marshal(source)
+	if err != nil {
+		panic(err)
+	}
+	return &v1.Flow{
+		RawMessage: bytes,
+	}
+}
+
+func asEndpointProperties(props map[string]string) v1alpha1.EndpointProperties {
+	bytes, err := json.Marshal(props)
+	if err != nil {
+		panic(err)
+	}
+	return v1alpha1.EndpointProperties{
+		RawMessage: bytes,
+	}
+}
+
+/*
 	Namespace testing functions
 */
 
diff --git a/pkg/cmd/reset.go b/pkg/cmd/reset.go
index f6fa6f6..5831d85 100644
--- a/pkg/cmd/reset.go
+++ b/pkg/cmd/reset.go
@@ -61,6 +61,14 @@ func (o *resetCmdOptions) reset(_ *cobra.Command, _ []string) {
 	}
 
 	var n int
+	if !o.SkipKameletBindings {
+		if n, err = o.deleteAllKameletBindings(c); err != nil {
+			fmt.Print(err)
+			return
+		}
+		fmt.Printf("%d kamelet bindings deleted from namespace %s\n", n, o.Namespace)
+	}
+
 	if !o.SkipIntegrations {
 		if n, err = o.deleteAllIntegrations(c); err != nil {
 			fmt.Print(err)
@@ -77,14 +85,6 @@ func (o *resetCmdOptions) reset(_ *cobra.Command, _ []string) {
 		fmt.Printf("%d integration kits deleted from namespace %s\n", n, o.Namespace)
 	}
 
-	if !o.SkipKameletBindings {
-		if n, err = o.deleteAllKameletBindings(c); err != nil {
-			fmt.Print(err)
-			return
-		}
-		fmt.Printf("%d kamelet bindings deleted from namespace %s\n", n, o.Namespace)
-	}
-
 	if err = o.resetIntegrationPlatform(c); err != nil {
 		fmt.Println(err)
 		return
diff --git a/pkg/controller/kameletbinding/initialize.go b/pkg/controller/kameletbinding/common.go
similarity index 51%
copy from pkg/controller/kameletbinding/initialize.go
copy to pkg/controller/kameletbinding/common.go
index 8f794d6..a50b202 100644
--- a/pkg/controller/kameletbinding/initialize.go
+++ b/pkg/controller/kameletbinding/common.go
@@ -20,41 +20,19 @@ package kameletbinding
 import (
 	"context"
 	"encoding/json"
-	"strings"
 
 	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+	"github.com/apache/camel-k/pkg/client"
 	"github.com/apache/camel-k/pkg/platform"
 	"github.com/apache/camel-k/pkg/util/bindings"
 	"github.com/apache/camel-k/pkg/util/knative"
-	"github.com/apache/camel-k/pkg/util/kubernetes"
-	"github.com/apache/camel-k/pkg/util/patch"
 	"github.com/pkg/errors"
-	corev1 "k8s.io/api/core/v1"
 	k8serrors "k8s.io/apimachinery/pkg/api/errors"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-	"k8s.io/apimachinery/pkg/types"
-	"sigs.k8s.io/controller-runtime/pkg/client"
 )
 
-// NewInitializeAction returns a action that initializes the kamelet binding configuration when not provided by the user
-func NewInitializeAction() Action {
-	return &initializeAction{}
-}
-
-type initializeAction struct {
-	baseAction
-}
-
-func (action *initializeAction) Name() string {
-	return "initialize"
-}
-
-func (action *initializeAction) CanHandle(kameletbinding *v1alpha1.KameletBinding) bool {
-	return kameletbinding.Status.Phase == v1alpha1.KameletBindingPhaseNone
-}
-
-func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1alpha1.KameletBinding) (*v1alpha1.KameletBinding, error) {
+func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding *v1alpha1.KameletBinding) (*v1.Integration, error) {
 	controller := true
 	blockOwnerDeletion := true
 	it := v1.Integration{
@@ -78,7 +56,7 @@ func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1al
 		it.Spec = *kameletbinding.Spec.Integration.DeepCopy()
 	}
 
-	profile, err := action.determineProfile(ctx, kameletbinding)
+	profile, err := determineProfile(ctx, c, kameletbinding)
 	if err != nil {
 		return nil, err
 	}
@@ -86,7 +64,7 @@ func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1al
 
 	bindingContext := bindings.BindingContext{
 		Ctx:       ctx,
-		Client:    action.client,
+		Client:    c,
 		Namespace: it.Namespace,
 		Profile:   profile,
 	}
@@ -128,77 +106,14 @@ func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1al
 	}
 	it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedFlow})
 
-	if err := kubernetes.ReplaceResource(ctx, action.client, &it); err != nil {
-		return nil, errors.Wrap(err, "could not create integration for kamelet binding")
-	}
-
-	// propagate Kamelet icon (best effort)
-	action.propagateIcon(ctx, kameletbinding)
-
-	target := kameletbinding.DeepCopy()
-	target.Status.Phase = v1alpha1.KameletBindingPhaseCreating
-	return target, nil
-}
-
-func (action *initializeAction) propagateIcon(ctx context.Context, binding *v1alpha1.KameletBinding) {
-	icon, err := action.findIcon(ctx, binding)
-	if err != nil {
-		action.L.Errorf(err, "cannot find icon for kamelet binding %q", binding.Name)
-		return
-	}
-	if icon == "" {
-		return
-	}
-	// compute patch
-	clone := binding.DeepCopy()
-	clone.Annotations = make(map[string]string)
-	for k, v := range binding.Annotations {
-		clone.Annotations[k] = v
-	}
-	if _, ok := clone.Annotations[v1alpha1.AnnotationIcon]; !ok {
-		clone.Annotations[v1alpha1.AnnotationIcon] = icon
-	}
-	p, err := patch.PositiveMergePatch(binding, clone)
-	if err != nil {
-		action.L.Errorf(err, "cannot compute patch to update icon for kamelet binding %q", binding.Name)
-		return
-	}
-	if len(p) > 0 {
-		if err := action.client.Patch(ctx, clone, client.RawPatch(types.MergePatchType, p)); err != nil {
-			action.L.Errorf(err, "cannot apply merge patch to update icon for kamelet binding %q", binding.Name)
-			return
-		}
-	}
-}
-
-func (action *initializeAction) findIcon(ctx context.Context, binding *v1alpha1.KameletBinding) (string, error) {
-	var kameletRef *corev1.ObjectReference
-	if binding.Spec.Source.Ref != nil && binding.Spec.Source.Ref.Kind == "Kamelet" && strings.HasPrefix(binding.Spec.Source.Ref.APIVersion, "camel.apache.org/") {
-		kameletRef = binding.Spec.Source.Ref
-	} else if binding.Spec.Sink.Ref != nil && binding.Spec.Sink.Ref.Kind == "Kamelet" && strings.HasPrefix(binding.Spec.Sink.Ref.APIVersion, "camel.apache.org/") {
-		kameletRef = binding.Spec.Sink.Ref
-	}
-
-	if kameletRef == nil {
-		return "", nil
-	}
-
-	key := client.ObjectKey{
-		Namespace: binding.Namespace,
-		Name:      kameletRef.Name,
-	}
-	var kamelet v1alpha1.Kamelet
-	if err := action.client.Get(ctx, key, &kamelet); err != nil {
-		return "", err
-	}
-	return kamelet.Annotations[v1alpha1.AnnotationIcon], nil
+	return &it, nil
 }
 
-func (action *initializeAction) determineProfile(ctx context.Context, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) {
+func determineProfile(ctx context.Context, c client.Client, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) {
 	if binding.Spec.Integration != nil && binding.Spec.Integration.Profile != "" {
 		return binding.Spec.Integration.Profile, nil
 	}
-	pl, err := platform.GetCurrentPlatform(ctx, action.client, binding.Namespace)
+	pl, err := platform.GetCurrentPlatform(ctx, c, binding.Namespace)
 	if err != nil && !k8serrors.IsNotFound(err) {
 		return "", errors.Wrap(err, "error while retrieving the integration platform")
 	}
@@ -210,7 +125,7 @@ func (action *initializeAction) determineProfile(ctx context.Context, binding *v
 			return pl.Spec.Profile, nil
 		}
 	}
-	if knative.IsEnabledInNamespace(ctx, action.client, binding.Namespace) {
+	if knative.IsEnabledInNamespace(ctx, c, binding.Namespace) {
 		return v1.TraitProfileKnative, nil
 	}
 	if pl != nil {
diff --git a/pkg/controller/kameletbinding/initialize.go b/pkg/controller/kameletbinding/initialize.go
index 8f794d6..cc9c38f 100644
--- a/pkg/controller/kameletbinding/initialize.go
+++ b/pkg/controller/kameletbinding/initialize.go
@@ -19,20 +19,13 @@ package kameletbinding
 
 import (
 	"context"
-	"encoding/json"
 	"strings"
 
-	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
-	"github.com/apache/camel-k/pkg/platform"
-	"github.com/apache/camel-k/pkg/util/bindings"
-	"github.com/apache/camel-k/pkg/util/knative"
 	"github.com/apache/camel-k/pkg/util/kubernetes"
 	"github.com/apache/camel-k/pkg/util/patch"
 	"github.com/pkg/errors"
 	corev1 "k8s.io/api/core/v1"
-	k8serrors "k8s.io/apimachinery/pkg/api/errors"
-	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/types"
 	"sigs.k8s.io/controller-runtime/pkg/client"
 )
@@ -55,80 +48,12 @@ func (action *initializeAction) CanHandle(kameletbinding *v1alpha1.KameletBindin
 }
 
 func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1alpha1.KameletBinding) (*v1alpha1.KameletBinding, error) {
-	controller := true
-	blockOwnerDeletion := true
-	it := v1.Integration{
-		ObjectMeta: metav1.ObjectMeta{
-			Namespace: kameletbinding.Namespace,
-			Name:      kameletbinding.Name,
-			OwnerReferences: []metav1.OwnerReference{
-				{
-					APIVersion:         kameletbinding.APIVersion,
-					Kind:               kameletbinding.Kind,
-					Name:               kameletbinding.Name,
-					UID:                kameletbinding.UID,
-					Controller:         &controller,
-					BlockOwnerDeletion: &blockOwnerDeletion,
-				},
-			},
-		},
-	}
-	// start from the integration spec defined in the binding
-	if kameletbinding.Spec.Integration != nil {
-		it.Spec = *kameletbinding.Spec.Integration.DeepCopy()
-	}
-
-	profile, err := action.determineProfile(ctx, kameletbinding)
-	if err != nil {
-		return nil, err
-	}
-	it.Spec.Profile = profile
-
-	bindingContext := bindings.BindingContext{
-		Ctx:       ctx,
-		Client:    action.client,
-		Namespace: it.Namespace,
-		Profile:   profile,
-	}
-
-	from, err := bindings.Translate(bindingContext, v1alpha1.EndpointTypeSource, kameletbinding.Spec.Source)
-	if err != nil {
-		return nil, errors.Wrap(err, "could not determine source URI")
-	}
-	to, err := bindings.Translate(bindingContext, v1alpha1.EndpointTypeSink, kameletbinding.Spec.Sink)
-	if err != nil {
-		return nil, errors.Wrap(err, "could not determine sink URI")
-	}
-
-	if len(from.Traits) > 0 || len(to.Traits) > 0 {
-		if it.Spec.Traits == nil {
-			it.Spec.Traits = make(map[string]v1.TraitSpec)
-		}
-		for k, v := range from.Traits {
-			it.Spec.Traits[k] = v
-		}
-		for k, v := range to.Traits {
-			it.Spec.Traits[k] = v
-		}
-	}
-
-	flow := map[string]interface{}{
-		"from": map[string]interface{}{
-			"uri": from.URI,
-			"steps": []map[string]interface{}{
-				{
-					"to": to.URI,
-				},
-			},
-		},
-	}
-	encodedFlow, err := json.Marshal(flow)
+	it, err := createIntegrationFor(ctx, action.client, kameletbinding)
 	if err != nil {
 		return nil, err
 	}
-	it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedFlow})
 
-	if err := kubernetes.ReplaceResource(ctx, action.client, &it); err != nil {
+	if err := kubernetes.ReplaceResource(ctx, action.client, it); err != nil {
 		return nil, errors.Wrap(err, "could not create integration for kamelet binding")
 	}
 
@@ -193,32 +118,3 @@ func (action *initializeAction) findIcon(ctx context.Context, binding *v1alpha1.
 	}
 	return kamelet.Annotations[v1alpha1.AnnotationIcon], nil
 }
-
-func (action *initializeAction) determineProfile(ctx context.Context, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) {
-	if binding.Spec.Integration != nil && binding.Spec.Integration.Profile != "" {
-		return binding.Spec.Integration.Profile, nil
-	}
-	pl, err := platform.GetCurrentPlatform(ctx, action.client, binding.Namespace)
-	if err != nil && !k8serrors.IsNotFound(err) {
-		return "", errors.Wrap(err, "error while retrieving the integration platform")
-	}
-	if pl != nil {
-		if pl.Status.Profile != "" {
-			return pl.Status.Profile, nil
-		}
-		if pl.Spec.Profile != "" {
-			return pl.Spec.Profile, nil
-		}
-	}
-	if knative.IsEnabledInNamespace(ctx, action.client, binding.Namespace) {
-		return v1.TraitProfileKnative, nil
-	}
-	if pl != nil {
-		// Determine profile from cluster type
-		plProfile := platform.GetProfile(pl)
-		if plProfile != "" {
-			return plProfile, nil
-		}
-	}
-	return v1.DefaultTraitProfile, nil
-}
diff --git a/pkg/controller/kameletbinding/kamelet_binding_controller.go b/pkg/controller/kameletbinding/kamelet_binding_controller.go
index 459c69c..dbebfc3 100644
--- a/pkg/controller/kameletbinding/kamelet_binding_controller.go
+++ b/pkg/controller/kameletbinding/kamelet_binding_controller.go
@@ -21,6 +21,7 @@ import (
 	"context"
 	"time"
 
+	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 	"github.com/apache/camel-k/pkg/client"
 	camelevent "github.com/apache/camel-k/pkg/event"
@@ -84,6 +85,15 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
 		return err
 	}
 
+	// Watch Integration to propagate changes downstream
+	err = c.Watch(&source.Kind{Type: &v1.Integration{}}, &handler.EnqueueRequestForOwner{
+		OwnerType:    &v1alpha1.KameletBinding{},
+		IsController: false,
+	})
+	if err != nil {
+		return err
+	}
+
 	return nil
 }
 
diff --git a/pkg/controller/kameletbinding/monitor.go b/pkg/controller/kameletbinding/monitor.go
index 9980dc5..9cd768a 100644
--- a/pkg/controller/kameletbinding/monitor.go
+++ b/pkg/controller/kameletbinding/monitor.go
@@ -19,12 +19,14 @@ package kameletbinding
 
 import (
 	"context"
+
 	v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 	"github.com/pkg/errors"
 	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/equality"
+	k8serrors "k8s.io/apimachinery/pkg/api/errors"
 	"sigs.k8s.io/controller-runtime/pkg/client"
-
-	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 )
 
 // NewMonitorAction returns an action that monitors the kamelet binding after it's fully initialized
@@ -52,10 +54,41 @@ func (action *monitorAction) Handle(ctx context.Context, kameletbinding *v1alpha
 		Name:      kameletbinding.Name,
 	}
 	it := v1.Integration{}
-	if err := action.client.Get(ctx, key, &it); err != nil {
+	if err := action.client.Get(ctx, key, &it); err != nil && k8serrors.IsNotFound(err) {
+		target := kameletbinding.DeepCopy()
+		// Rebuild the integration
+		target.Status.Phase = v1alpha1.KameletBindingPhaseNone
+		target.Status.SetCondition(
+			v1alpha1.KameletBindingConditionReady,
+			corev1.ConditionFalse,
+			"",
+			"",
+		)
+		return target, nil
+	} else if err != nil {
 		return nil, errors.Wrapf(err, "could not load integration for KameletBinding %q", kameletbinding.Name)
 	}
 
+	// Check if the integration needs to be changed
+	expected, err := createIntegrationFor(ctx, action.client, kameletbinding)
+	if err != nil {
+		return nil, err
+	}
+
+	if !equality.Semantic.DeepDerivative(expected.Spec, it.Spec) {
+		// KameletBinding has changed and needs rebuild
+		target := kameletbinding.DeepCopy()
+		// Rebuild the integration
+		target.Status.Phase = v1alpha1.KameletBindingPhaseNone
+		target.Status.SetCondition(
+			v1alpha1.KameletBindingConditionReady,
+			corev1.ConditionFalse,
+			"",
+			"",
+		)
+		return target, nil
+	}
+
 	// Map integration phases to KameletBinding phases
 	target := kameletbinding.DeepCopy()
 	if it.Status.Phase == v1.IntegrationPhaseRunning {