You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2020/10/28 14:05:37 UTC
[camel-k] 03/05: Fix #1785: propagate klb changes to integrations
This is an automated email from the ASF dual-hosted git repository.
nferraro pushed a commit to branch release-1.2.x
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 5ec2dd4db12dffab14da857ab7df3d2450299e1d
Author: nicolaferraro <ni...@gmail.com>
AuthorDate: Mon Oct 26 15:11:41 2020 +0100
Fix #1785: propagate klb changes to integrations
---
e2e/knative/files/display.groovy | 20 ++++
e2e/knative/kamelet_test.go | 60 ++++++++++++
e2e/support/test_support.go | 90 ++++++++++++++++-
pkg/cmd/reset.go | 16 +--
.../kameletbinding/{initialize.go => common.go} | 101 ++-----------------
pkg/controller/kameletbinding/initialize.go | 108 +--------------------
.../kameletbinding/kamelet_binding_controller.go | 10 ++
pkg/controller/kameletbinding/monitor.go | 39 +++++++-
8 files changed, 233 insertions(+), 211 deletions(-)
diff --git a/e2e/knative/files/display.groovy b/e2e/knative/files/display.groovy
new file mode 100644
index 0000000..43a595f
--- /dev/null
+++ b/e2e/knative/files/display.groovy
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+from('knative:channel/messages')
+ .convertBodyTo(String.class)
+ .to('log:info?showAll=false')
diff --git a/e2e/knative/kamelet_test.go b/e2e/knative/kamelet_test.go
new file mode 100644
index 0000000..cfc90f3
--- /dev/null
+++ b/e2e/knative/kamelet_test.go
@@ -0,0 +1,60 @@
+// +build integration
+
+// To enable compilation of this file in Goland, go to "Settings -> Go -> Vendoring & Build Tags -> Custom Tags" and add "integration"
+
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package knative
+
+import (
+ "testing"
+
+ . "github.com/apache/camel-k/e2e/support"
+ camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+ . "github.com/onsi/gomega"
+ v1 "k8s.io/api/core/v1"
+ messaging "knative.dev/eventing/pkg/apis/messaging/v1beta1"
+)
+
+// Test that kamelet binding can be changed and changes propagated to integrations
+func TestKameletChange(t *testing.T) {
+
+ WithNewTestNamespace(t, func(ns string) {
+ RegisterTestingT(t)
+
+ Expect(Kamel("install", "-n", ns).Execute()).Should(BeNil())
+ Expect(CreateTimerKamelet(ns, "timer-source")()).Should(BeNil())
+ Expect(CreateKnativeChannelv1Beta1(ns, "messages")()).Should(BeNil())
+ Expect(Kamel("run", "-n", ns, "files/display.groovy", "-w").Execute()).Should(BeNil())
+ ref := v1.ObjectReference{
+ Kind: "InMemoryChannel",
+ Name: "messages",
+ APIVersion: messaging.SchemeGroupVersion.String(),
+ }
+ Expect(BindKameletTo(ns, "timer-binding", "timer-source", ref, map[string]string{"message": "message is Hello"})()).Should(BeNil())
+ Eventually(IntegrationPodPhase(ns, "timer-binding"), TestTimeoutMedium).Should(Equal(v1.PodRunning))
+ Eventually(IntegrationCondition(ns, "timer-binding", camelv1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(v1.ConditionTrue))
+ Eventually(IntegrationLogs(ns, "display"), TestTimeoutShort).Should(ContainSubstring("message is Hello"))
+
+ Expect(BindKameletTo(ns, "timer-binding", "timer-source", ref, map[string]string{"message": "message is Hi"})()).Should(BeNil())
+ Eventually(IntegrationPodPhase(ns, "timer-binding"), TestTimeoutMedium).Should(Equal(v1.PodRunning))
+ Eventually(IntegrationCondition(ns, "timer-binding", camelv1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(v1.ConditionTrue))
+ Eventually(IntegrationLogs(ns, "display"), TestTimeoutShort).Should(ContainSubstring("message is Hi"))
+ })
+
+}
diff --git a/e2e/support/test_support.go b/e2e/support/test_support.go
index c070c87..e572602 100644
--- a/e2e/support/test_support.go
+++ b/e2e/support/test_support.go
@@ -23,6 +23,7 @@ package support
import (
"context"
+ "encoding/json"
"errors"
"fmt"
"io"
@@ -33,9 +34,10 @@ import (
"testing"
"time"
+ "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+ "github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/google/uuid"
"github.com/onsi/gomega"
-
"github.com/spf13/cobra"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/batch/v1beta1"
@@ -891,6 +893,92 @@ func CreateKnativeChannelv1Beta1(ns string, name string) func() error {
}
/*
+ Kamelets
+*/
+
+func CreateTimerKamelet(ns string, name string) func() error {
+ return func() error {
+ kamelet := v1alpha1.Kamelet{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: ns,
+ Name: name,
+ },
+ Spec: v1alpha1.KameletSpec{
+ Definition: v1alpha1.JSONSchemaProps{
+ Properties: map[string]v1alpha1.JSONSchemaProps{
+ "message": {
+ Type: "string",
+ },
+ },
+ },
+ Flow: asFlow(map[string]interface{}{
+ "from": map[string]interface{}{
+ "uri": "timer:tick",
+ "steps": []map[string]interface{}{
+ {
+ "set-body": map[string]interface{}{
+ "constant": "{{message}}",
+ },
+ },
+ {
+ "to": "kamelet:sink",
+ },
+ },
+ },
+ }),
+ },
+ }
+ return TestClient.Create(TestContext, &kamelet)
+ }
+}
+
+func BindKameletTo(ns, name, from string, to corev1.ObjectReference, properties map[string]string) func() error {
+ return func() error {
+ kb := v1alpha1.KameletBinding{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: ns,
+ Name: name,
+ },
+ Spec: v1alpha1.KameletBindingSpec{
+ Source: v1alpha1.Endpoint{
+ Ref: &corev1.ObjectReference{
+ Kind: "Kamelet",
+ APIVersion: v1alpha1.SchemeGroupVersion.String(),
+ Name: from,
+ },
+ Properties: asEndpointProperties(properties),
+ },
+ Sink: v1alpha1.Endpoint{
+ Ref: &to,
+ Properties: asEndpointProperties(map[string]string{}),
+ },
+ },
+ }
+ return kubernetes.ReplaceResource(TestContext, TestClient, &kb)
+ }
+}
+
+func asFlow(source map[string]interface{}) *v1.Flow {
+ bytes, err := json.Marshal(source)
+ if err != nil {
+ panic(err)
+ }
+ return &v1.Flow{
+ RawMessage: bytes,
+ }
+}
+
+func asEndpointProperties(props map[string]string) v1alpha1.EndpointProperties {
+ bytes, err := json.Marshal(props)
+ if err != nil {
+ panic(err)
+ }
+ return v1alpha1.EndpointProperties{
+ RawMessage: bytes,
+ }
+}
+
+/*
Namespace testing functions
*/
diff --git a/pkg/cmd/reset.go b/pkg/cmd/reset.go
index f6fa6f6..5831d85 100644
--- a/pkg/cmd/reset.go
+++ b/pkg/cmd/reset.go
@@ -61,6 +61,14 @@ func (o *resetCmdOptions) reset(_ *cobra.Command, _ []string) {
}
var n int
+ if !o.SkipKameletBindings {
+ if n, err = o.deleteAllKameletBindings(c); err != nil {
+ fmt.Print(err)
+ return
+ }
+ fmt.Printf("%d kamelet bindings deleted from namespace %s\n", n, o.Namespace)
+ }
+
if !o.SkipIntegrations {
if n, err = o.deleteAllIntegrations(c); err != nil {
fmt.Print(err)
@@ -77,14 +85,6 @@ func (o *resetCmdOptions) reset(_ *cobra.Command, _ []string) {
fmt.Printf("%d integration kits deleted from namespace %s\n", n, o.Namespace)
}
- if !o.SkipKameletBindings {
- if n, err = o.deleteAllKameletBindings(c); err != nil {
- fmt.Print(err)
- return
- }
- fmt.Printf("%d kamelet bindings deleted from namespace %s\n", n, o.Namespace)
- }
-
if err = o.resetIntegrationPlatform(c); err != nil {
fmt.Println(err)
return
diff --git a/pkg/controller/kameletbinding/initialize.go b/pkg/controller/kameletbinding/common.go
similarity index 51%
copy from pkg/controller/kameletbinding/initialize.go
copy to pkg/controller/kameletbinding/common.go
index 8f794d6..a50b202 100644
--- a/pkg/controller/kameletbinding/initialize.go
+++ b/pkg/controller/kameletbinding/common.go
@@ -20,41 +20,19 @@ package kameletbinding
import (
"context"
"encoding/json"
- "strings"
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+ "github.com/apache/camel-k/pkg/client"
"github.com/apache/camel-k/pkg/platform"
"github.com/apache/camel-k/pkg/util/bindings"
"github.com/apache/camel-k/pkg/util/knative"
- "github.com/apache/camel-k/pkg/util/kubernetes"
- "github.com/apache/camel-k/pkg/util/patch"
"github.com/pkg/errors"
- corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/types"
- "sigs.k8s.io/controller-runtime/pkg/client"
)
-// NewInitializeAction returns a action that initializes the kamelet binding configuration when not provided by the user
-func NewInitializeAction() Action {
- return &initializeAction{}
-}
-
-type initializeAction struct {
- baseAction
-}
-
-func (action *initializeAction) Name() string {
- return "initialize"
-}
-
-func (action *initializeAction) CanHandle(kameletbinding *v1alpha1.KameletBinding) bool {
- return kameletbinding.Status.Phase == v1alpha1.KameletBindingPhaseNone
-}
-
-func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1alpha1.KameletBinding) (*v1alpha1.KameletBinding, error) {
+func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding *v1alpha1.KameletBinding) (*v1.Integration, error) {
controller := true
blockOwnerDeletion := true
it := v1.Integration{
@@ -78,7 +56,7 @@ func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1al
it.Spec = *kameletbinding.Spec.Integration.DeepCopy()
}
- profile, err := action.determineProfile(ctx, kameletbinding)
+ profile, err := determineProfile(ctx, c, kameletbinding)
if err != nil {
return nil, err
}
@@ -86,7 +64,7 @@ func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1al
bindingContext := bindings.BindingContext{
Ctx: ctx,
- Client: action.client,
+ Client: c,
Namespace: it.Namespace,
Profile: profile,
}
@@ -128,77 +106,14 @@ func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1al
}
it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedFlow})
- if err := kubernetes.ReplaceResource(ctx, action.client, &it); err != nil {
- return nil, errors.Wrap(err, "could not create integration for kamelet binding")
- }
-
- // propagate Kamelet icon (best effort)
- action.propagateIcon(ctx, kameletbinding)
-
- target := kameletbinding.DeepCopy()
- target.Status.Phase = v1alpha1.KameletBindingPhaseCreating
- return target, nil
-}
-
-func (action *initializeAction) propagateIcon(ctx context.Context, binding *v1alpha1.KameletBinding) {
- icon, err := action.findIcon(ctx, binding)
- if err != nil {
- action.L.Errorf(err, "cannot find icon for kamelet binding %q", binding.Name)
- return
- }
- if icon == "" {
- return
- }
- // compute patch
- clone := binding.DeepCopy()
- clone.Annotations = make(map[string]string)
- for k, v := range binding.Annotations {
- clone.Annotations[k] = v
- }
- if _, ok := clone.Annotations[v1alpha1.AnnotationIcon]; !ok {
- clone.Annotations[v1alpha1.AnnotationIcon] = icon
- }
- p, err := patch.PositiveMergePatch(binding, clone)
- if err != nil {
- action.L.Errorf(err, "cannot compute patch to update icon for kamelet binding %q", binding.Name)
- return
- }
- if len(p) > 0 {
- if err := action.client.Patch(ctx, clone, client.RawPatch(types.MergePatchType, p)); err != nil {
- action.L.Errorf(err, "cannot apply merge patch to update icon for kamelet binding %q", binding.Name)
- return
- }
- }
-}
-
-func (action *initializeAction) findIcon(ctx context.Context, binding *v1alpha1.KameletBinding) (string, error) {
- var kameletRef *corev1.ObjectReference
- if binding.Spec.Source.Ref != nil && binding.Spec.Source.Ref.Kind == "Kamelet" && strings.HasPrefix(binding.Spec.Source.Ref.APIVersion, "camel.apache.org/") {
- kameletRef = binding.Spec.Source.Ref
- } else if binding.Spec.Sink.Ref != nil && binding.Spec.Sink.Ref.Kind == "Kamelet" && strings.HasPrefix(binding.Spec.Sink.Ref.APIVersion, "camel.apache.org/") {
- kameletRef = binding.Spec.Sink.Ref
- }
-
- if kameletRef == nil {
- return "", nil
- }
-
- key := client.ObjectKey{
- Namespace: binding.Namespace,
- Name: kameletRef.Name,
- }
- var kamelet v1alpha1.Kamelet
- if err := action.client.Get(ctx, key, &kamelet); err != nil {
- return "", err
- }
- return kamelet.Annotations[v1alpha1.AnnotationIcon], nil
+ return &it, nil
}
-func (action *initializeAction) determineProfile(ctx context.Context, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) {
+func determineProfile(ctx context.Context, c client.Client, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) {
if binding.Spec.Integration != nil && binding.Spec.Integration.Profile != "" {
return binding.Spec.Integration.Profile, nil
}
- pl, err := platform.GetCurrentPlatform(ctx, action.client, binding.Namespace)
+ pl, err := platform.GetCurrentPlatform(ctx, c, binding.Namespace)
if err != nil && !k8serrors.IsNotFound(err) {
return "", errors.Wrap(err, "error while retrieving the integration platform")
}
@@ -210,7 +125,7 @@ func (action *initializeAction) determineProfile(ctx context.Context, binding *v
return pl.Spec.Profile, nil
}
}
- if knative.IsEnabledInNamespace(ctx, action.client, binding.Namespace) {
+ if knative.IsEnabledInNamespace(ctx, c, binding.Namespace) {
return v1.TraitProfileKnative, nil
}
if pl != nil {
diff --git a/pkg/controller/kameletbinding/initialize.go b/pkg/controller/kameletbinding/initialize.go
index 8f794d6..cc9c38f 100644
--- a/pkg/controller/kameletbinding/initialize.go
+++ b/pkg/controller/kameletbinding/initialize.go
@@ -19,20 +19,13 @@ package kameletbinding
import (
"context"
- "encoding/json"
"strings"
- v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
- "github.com/apache/camel-k/pkg/platform"
- "github.com/apache/camel-k/pkg/util/bindings"
- "github.com/apache/camel-k/pkg/util/knative"
"github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/apache/camel-k/pkg/util/patch"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
- k8serrors "k8s.io/apimachinery/pkg/api/errors"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
@@ -55,80 +48,12 @@ func (action *initializeAction) CanHandle(kameletbinding *v1alpha1.KameletBindin
}
func (action *initializeAction) Handle(ctx context.Context, kameletbinding *v1alpha1.KameletBinding) (*v1alpha1.KameletBinding, error) {
- controller := true
- blockOwnerDeletion := true
- it := v1.Integration{
- ObjectMeta: metav1.ObjectMeta{
- Namespace: kameletbinding.Namespace,
- Name: kameletbinding.Name,
- OwnerReferences: []metav1.OwnerReference{
- {
- APIVersion: kameletbinding.APIVersion,
- Kind: kameletbinding.Kind,
- Name: kameletbinding.Name,
- UID: kameletbinding.UID,
- Controller: &controller,
- BlockOwnerDeletion: &blockOwnerDeletion,
- },
- },
- },
- }
- // start from the integration spec defined in the binding
- if kameletbinding.Spec.Integration != nil {
- it.Spec = *kameletbinding.Spec.Integration.DeepCopy()
- }
-
- profile, err := action.determineProfile(ctx, kameletbinding)
- if err != nil {
- return nil, err
- }
- it.Spec.Profile = profile
-
- bindingContext := bindings.BindingContext{
- Ctx: ctx,
- Client: action.client,
- Namespace: it.Namespace,
- Profile: profile,
- }
-
- from, err := bindings.Translate(bindingContext, v1alpha1.EndpointTypeSource, kameletbinding.Spec.Source)
- if err != nil {
- return nil, errors.Wrap(err, "could not determine source URI")
- }
- to, err := bindings.Translate(bindingContext, v1alpha1.EndpointTypeSink, kameletbinding.Spec.Sink)
- if err != nil {
- return nil, errors.Wrap(err, "could not determine sink URI")
- }
-
- if len(from.Traits) > 0 || len(to.Traits) > 0 {
- if it.Spec.Traits == nil {
- it.Spec.Traits = make(map[string]v1.TraitSpec)
- }
- for k, v := range from.Traits {
- it.Spec.Traits[k] = v
- }
- for k, v := range to.Traits {
- it.Spec.Traits[k] = v
- }
- }
-
- flow := map[string]interface{}{
- "from": map[string]interface{}{
- "uri": from.URI,
- "steps": []map[string]interface{}{
- {
- "to": to.URI,
- },
- },
- },
- }
- encodedFlow, err := json.Marshal(flow)
+ it, err := createIntegrationFor(ctx, action.client, kameletbinding)
if err != nil {
return nil, err
}
- it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedFlow})
- if err := kubernetes.ReplaceResource(ctx, action.client, &it); err != nil {
+ if err := kubernetes.ReplaceResource(ctx, action.client, it); err != nil {
return nil, errors.Wrap(err, "could not create integration for kamelet binding")
}
@@ -193,32 +118,3 @@ func (action *initializeAction) findIcon(ctx context.Context, binding *v1alpha1.
}
return kamelet.Annotations[v1alpha1.AnnotationIcon], nil
}
-
-func (action *initializeAction) determineProfile(ctx context.Context, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) {
- if binding.Spec.Integration != nil && binding.Spec.Integration.Profile != "" {
- return binding.Spec.Integration.Profile, nil
- }
- pl, err := platform.GetCurrentPlatform(ctx, action.client, binding.Namespace)
- if err != nil && !k8serrors.IsNotFound(err) {
- return "", errors.Wrap(err, "error while retrieving the integration platform")
- }
- if pl != nil {
- if pl.Status.Profile != "" {
- return pl.Status.Profile, nil
- }
- if pl.Spec.Profile != "" {
- return pl.Spec.Profile, nil
- }
- }
- if knative.IsEnabledInNamespace(ctx, action.client, binding.Namespace) {
- return v1.TraitProfileKnative, nil
- }
- if pl != nil {
- // Determine profile from cluster type
- plProfile := platform.GetProfile(pl)
- if plProfile != "" {
- return plProfile, nil
- }
- }
- return v1.DefaultTraitProfile, nil
-}
diff --git a/pkg/controller/kameletbinding/kamelet_binding_controller.go b/pkg/controller/kameletbinding/kamelet_binding_controller.go
index 459c69c..dbebfc3 100644
--- a/pkg/controller/kameletbinding/kamelet_binding_controller.go
+++ b/pkg/controller/kameletbinding/kamelet_binding_controller.go
@@ -21,6 +21,7 @@ import (
"context"
"time"
+ v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/client"
camelevent "github.com/apache/camel-k/pkg/event"
@@ -84,6 +85,15 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}
+ // Watch Integration to propagate changes downstream
+ err = c.Watch(&source.Kind{Type: &v1.Integration{}}, &handler.EnqueueRequestForOwner{
+ OwnerType: &v1alpha1.KameletBinding{},
+ IsController: false,
+ })
+ if err != nil {
+ return err
+ }
+
return nil
}
diff --git a/pkg/controller/kameletbinding/monitor.go b/pkg/controller/kameletbinding/monitor.go
index 9980dc5..9cd768a 100644
--- a/pkg/controller/kameletbinding/monitor.go
+++ b/pkg/controller/kameletbinding/monitor.go
@@ -19,12 +19,14 @@ package kameletbinding
import (
"context"
+
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+ "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/equality"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
-
- "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
)
// NewMonitorAction returns an action that monitors the kamelet binding after it's fully initialized
@@ -52,10 +54,41 @@ func (action *monitorAction) Handle(ctx context.Context, kameletbinding *v1alpha
Name: kameletbinding.Name,
}
it := v1.Integration{}
- if err := action.client.Get(ctx, key, &it); err != nil {
+ if err := action.client.Get(ctx, key, &it); err != nil && k8serrors.IsNotFound(err) {
+ target := kameletbinding.DeepCopy()
+ // Rebuild the integration
+ target.Status.Phase = v1alpha1.KameletBindingPhaseNone
+ target.Status.SetCondition(
+ v1alpha1.KameletBindingConditionReady,
+ corev1.ConditionFalse,
+ "",
+ "",
+ )
+ return target, nil
+ } else if err != nil {
return nil, errors.Wrapf(err, "could not load integration for KameletBinding %q", kameletbinding.Name)
}
+ // Check if the integration needs to be changed
+ expected, err := createIntegrationFor(ctx, action.client, kameletbinding)
+ if err != nil {
+ return nil, err
+ }
+
+ if !equality.Semantic.DeepDerivative(expected.Spec, it.Spec) {
+ // KameletBinding has changed and needs rebuild
+ target := kameletbinding.DeepCopy()
+ // Rebuild the integration
+ target.Status.Phase = v1alpha1.KameletBindingPhaseNone
+ target.Status.SetCondition(
+ v1alpha1.KameletBindingConditionReady,
+ corev1.ConditionFalse,
+ "",
+ "",
+ )
+ return target, nil
+ }
+
// Map integration phases to KameletBinding phases
target := kameletbinding.DeepCopy()
if it.Status.Phase == v1.IntegrationPhaseRunning {