You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by pc...@apache.org on 2023/04/26 08:51:27 UTC
[camel-k] 11/18: chore(controller): keep KameletBindings reconciliation loop
This is an automated email from the ASF dual-hosted git repository.
pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 167395a876e7702b1e651e4479e1f7c30a5eaa7f
Author: Pasquale Congiusti <pa...@gmail.com>
AuthorDate: Mon Apr 3 15:28:59 2023 +0200
chore(controller): keep KameletBindings reconciliation loop
---
config/crd/bases/camel.apache.org_bindings.yaml | 2 +
config/rbac/operator-role.yaml | 5 +
config/rbac/user-cluster-role.yaml | 5 +
helm/camel-k/crds/crd-binding.yaml | 2 +
pkg/apis/camel/v1/binding_types.go | 2 +-
pkg/cmd/rebuild.go | 92 +------
pkg/cmd/rebuild_test.go | 62 -----
pkg/cmd/reset.go | 25 +-
.../{add_kameletbinding.go => add_binding.go} | 0
pkg/controller/add_kameletbinding.go | 4 +-
.../action.go} | 36 ++-
pkg/controller/kameletbinding/error_handler.go | 109 +++++++++
.../kameletbinding/error_handler_test.go | 103 ++++++++
pkg/controller/kameletbinding/initialize.go | 131 ++++++++++
pkg/controller/kameletbinding/integration.go | 265 +++++++++++++++++++++
.../kameletbinding/kameletbinding_controller.go | 239 +++++++++++++++++++
.../log.go} | 11 +-
pkg/controller/kameletbinding/monitor.go | 218 +++++++++++++++++
pkg/event/manager.go | 32 +++
pkg/resources/resources.go | 12 +-
pkg/trait/util.go | 68 ++++++
pkg/util/bindings/api_support.go | 10 +
pkg/util/bindings/api_v1alpha1.go | 55 +++++
pkg/util/bindings/catalog.go | 48 +++-
pkg/util/bindings/v1alpha1_kamelet.go | 164 +++++++++++++
pkg/util/log/log.go | 12 +
26 files changed, 1537 insertions(+), 175 deletions(-)
diff --git a/config/crd/bases/camel.apache.org_bindings.yaml b/config/crd/bases/camel.apache.org_bindings.yaml
index d2a6833c1..a2694d60a 100644
--- a/config/crd/bases/camel.apache.org_bindings.yaml
+++ b/config/crd/bases/camel.apache.org_bindings.yaml
@@ -33,6 +33,8 @@ spec:
kind: Binding
listKind: BindingList
plural: bindings
+ shortNames:
+ - bd
singular: binding
scope: Namespaced
versions:
diff --git a/config/rbac/operator-role.yaml b/config/rbac/operator-role.yaml
index 0c9dc702d..1a5904312 100644
--- a/config/rbac/operator-role.yaml
+++ b/config/rbac/operator-role.yaml
@@ -31,6 +31,8 @@ rules:
- integrationplatforms
- integrations
- bindings
+ # Deprecated: kameletbindings CR
+ - kameletbindings
- kamelets
verbs:
- create
@@ -56,6 +58,9 @@ rules:
- integrations/status
- bindings/status
- bindings/scale
+ # Deprecated: kameletbindings CR
+ - kameletbindings/status
+ - kameletbindings/scale
- kamelets/status
verbs:
- get
diff --git a/config/rbac/user-cluster-role.yaml b/config/rbac/user-cluster-role.yaml
index 86e19096b..8942be225 100644
--- a/config/rbac/user-cluster-role.yaml
+++ b/config/rbac/user-cluster-role.yaml
@@ -34,6 +34,8 @@ rules:
- integrationplatforms
- integrations
- bindings
+ # Deprecated: kameletbindings CR
+ - kameletbindings
- kamelets
verbs:
- create
@@ -55,6 +57,9 @@ rules:
- integrations/status
- bindings/scale
- bindings/status
+ # Deprecated: kameletbindings CR
+ - kameletbindings/scale
+ - kameletbindings/status
- kamelets/status
verbs:
- get
diff --git a/helm/camel-k/crds/crd-binding.yaml b/helm/camel-k/crds/crd-binding.yaml
index d2a6833c1..a2694d60a 100644
--- a/helm/camel-k/crds/crd-binding.yaml
+++ b/helm/camel-k/crds/crd-binding.yaml
@@ -33,6 +33,8 @@ spec:
kind: Binding
listKind: BindingList
plural: bindings
+ shortNames:
+ - bd
singular: binding
scope: Namespaced
versions:
diff --git a/pkg/apis/camel/v1/binding_types.go b/pkg/apis/camel/v1/binding_types.go
index 550141542..6d22c550d 100644
--- a/pkg/apis/camel/v1/binding_types.go
+++ b/pkg/apis/camel/v1/binding_types.go
@@ -24,7 +24,7 @@ import (
// +genclient
// +kubebuilder:object:root=true
-// +kubebuilder:resource:path=bindings,scope=Namespaced,categories=kamel;camel
+// +kubebuilder:resource:path=bindings,scope=Namespaced,shortName=bd,categories=kamel;camel
// +kubebuilder:subresource:status
// +genclient:method=GetScale,verb=get,subresource=scale,result=k8s.io/api/autoscaling/v1.Scale
// +genclient:method=UpdateScale,verb=update,subresource=scale,input=k8s.io/api/autoscaling/v1.Scale,result=k8s.io/api/autoscaling/v1.Scale
diff --git a/pkg/cmd/rebuild.go b/pkg/cmd/rebuild.go
index f4505bf39..5b30d158d 100644
--- a/pkg/cmd/rebuild.go
+++ b/pkg/cmd/rebuild.go
@@ -22,14 +22,11 @@ import (
"github.com/pkg/errors"
"github.com/spf13/cobra"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/selection"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
"github.com/apache/camel-k/v2/pkg/client"
- "github.com/apache/camel-k/v2/pkg/util/kubernetes"
)
func newCmdRebuild(rootCmdOptions *RootCmdOptions) (*cobra.Command, *rebuildCmdOptions) {
@@ -71,76 +68,6 @@ func (o *rebuildCmdOptions) validate(args []string) error {
}
func (o *rebuildCmdOptions) run(cmd *cobra.Command, args []string) error {
- errKlbs := o.rebuildBindingType(cmd, args)
- errIts := o.rebuildIntegrationType(cmd, args)
-
- if errIts != nil && errKlbs != nil {
- return errors.Wrap(errIts, errKlbs.Error())
- }
-
- return nil
-}
-
-func (o *rebuildCmdOptions) rebuildBindingType(cmd *cobra.Command, args []string) error {
- c, err := o.GetCmdClient()
- if err != nil {
- return err
- }
- var bindings []v1.Binding
- if o.RebuildAll {
- if bindings, err = o.listAllBindings(c); err != nil {
- return err
- }
- } else if len(args) > 0 {
- if bindings, err = o.getBindings(c, args); err != nil {
- return err
- }
- }
-
- if err = o.rebuildBindings(c, bindings); err != nil {
- return err
- }
-
- fmt.Fprintln(cmd.OutOrStdout(), len(bindings), "bindings have been rebuilt")
- return nil
-}
-
-func (o *rebuildCmdOptions) listAllBindings(c client.Client) ([]v1.Binding, error) {
- list := v1.NewBindingList()
- if err := c.List(o.Context, &list, k8sclient.InNamespace(o.Namespace)); err != nil {
- return nil, errors.Wrap(err, fmt.Sprintf("could not retrieve bindings from namespace %s", o.Namespace))
- }
- return list.Items, nil
-}
-
-func (o *rebuildCmdOptions) getBindings(c client.Client, names []string) ([]v1.Binding, error) {
- klbs := make([]v1.Binding, 0, len(names))
- for _, n := range names {
- klb := v1.NewBinding(o.Namespace, n)
- key := k8sclient.ObjectKey{
- Name: n,
- Namespace: o.Namespace,
- }
- if err := c.Get(o.Context, key, &klb); err != nil {
- return nil, errors.Wrap(err, fmt.Sprintf("could not find binding %s in namespace %s", klb.Name, o.Namespace))
- }
- klbs = append(klbs, klb)
- }
- return klbs, nil
-}
-
-func (o *rebuildCmdOptions) rebuildBindings(c k8sclient.StatusClient, bindings []v1.Binding) error {
- for _, i := range bindings {
- klb := i
- klb.Status = v1.BindingStatus{}
- if err := c.Status().Update(o.Context, &klb); err != nil {
- return errors.Wrap(err, fmt.Sprintf("could not rebuild binding %s in namespace %s", klb.Name, o.Namespace))
- }
- }
- return nil
-}
-
-func (o *rebuildCmdOptions) rebuildIntegrationType(cmd *cobra.Command, args []string) error {
c, err := o.GetCmdClient()
if err != nil {
return err
@@ -166,19 +93,7 @@ func (o *rebuildCmdOptions) rebuildIntegrationType(cmd *cobra.Command, args []st
func (o *rebuildCmdOptions) listAllIntegrations(c client.Client) ([]v1.Integration, error) {
list := v1.NewIntegrationList()
- // Integrations controlled by Bindings are not included
- excludeItsFromKlbs, err := labels.NewRequirement(kubernetes.CamelCreatorLabelKind, selection.NotEquals, []string{
- "Binding",
- })
- if err != nil {
- return list.Items, err
- }
- if err := c.List(o.Context, &list,
- k8sclient.InNamespace(o.Namespace),
- k8sclient.MatchingLabelsSelector{
- Selector: labels.NewSelector().Add(*excludeItsFromKlbs),
- },
- ); err != nil {
+ if err := c.List(o.Context, &list, k8sclient.InNamespace(o.Namespace)); err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("could not retrieve integrations from namespace %s", o.Namespace))
}
return list.Items, nil
@@ -195,10 +110,7 @@ func (o *rebuildCmdOptions) getIntegrations(c client.Client, names []string) ([]
if err := c.Get(o.Context, key, &it); err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("could not find integration %s in namespace %s", it.Name, o.Namespace))
}
- // Integrations controlled by Bindings are not included
- if it.Labels[kubernetes.CamelCreatorLabelKind] != "Binding" {
- ints = append(ints, it)
- }
+ ints = append(ints, it)
}
return ints, nil
}
diff --git a/pkg/cmd/rebuild_test.go b/pkg/cmd/rebuild_test.go
index 4b7ac33c5..021c1cf6b 100644
--- a/pkg/cmd/rebuild_test.go
+++ b/pkg/cmd/rebuild_test.go
@@ -20,7 +20,6 @@ package cmd
import (
"testing"
- "github.com/apache/camel-k/v2/pkg/util/kubernetes"
"github.com/apache/camel-k/v2/pkg/util/test"
"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
@@ -65,64 +64,3 @@ func TestRebuildAllFlag(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, true, rebuildCmdOptions.RebuildAll)
}
-
-func TestRebuildAllBindingsAndIntegrations(t *testing.T) {
- defaultIntegration := nominalIntegration("my-it-test")
- defaultKB := nominalBinding("my-kb-test-rebuild")
- itGeneratedByKlb := nominalIntegration("my-kb-test-rebuild")
- itGeneratedByKlb.Labels = map[string]string{
- kubernetes.CamelCreatorLabelKind: "Binding",
- }
-
- _, rebuildCmd, _ := initializeRebuildOptions(t, &defaultIntegration, &defaultKB, &itGeneratedByKlb)
- output, err := test.ExecuteCommand(rebuildCmd, cmdRebuild, "--all")
- assert.Nil(t, err)
- assert.Contains(t, output, "1 bindings have been rebuilt")
- assert.Contains(t, output, "1 integrations have been rebuilt")
-}
-
-func TestRebuildNone(t *testing.T) {
- defaultIntegration := nominalIntegration("my-it-test")
- defaultKB := nominalBinding("my-kb-test-rebuild")
- itGeneratedByKlb := nominalIntegration("my-kb-test-rebuild")
- itGeneratedByKlb.Labels = map[string]string{
- kubernetes.CamelCreatorLabelKind: "Binding",
- }
-
- _, rebuildCmd, _ := initializeRebuildOptions(t, &defaultIntegration, &defaultKB, &itGeneratedByKlb)
- output, err := test.ExecuteCommand(rebuildCmd, cmdRebuild, "my-missing")
- assert.NotNil(t, err)
- assert.NotContains(t, output, "have been rebuilt")
- assert.Contains(t, output, "could not find binding my-missing in namespace default")
- assert.Contains(t, output, "could not find integration my-missing in namespace default")
-}
-
-func TestRebuildBindingOnly(t *testing.T) {
- defaultIntegration := nominalIntegration("my-it-test")
- defaultKB := nominalBinding("my-kb-test-rebuild")
- itGeneratedByKlb := nominalIntegration("my-kb-test-rebuild")
- itGeneratedByKlb.Labels = map[string]string{
- kubernetes.CamelCreatorLabelKind: "Binding",
- }
-
- _, rebuildCmd, _ := initializeRebuildOptions(t, &defaultIntegration, &defaultKB, &itGeneratedByKlb)
- output, err := test.ExecuteCommand(rebuildCmd, cmdRebuild, "my-kb-test-rebuild")
- assert.Nil(t, err)
- assert.Contains(t, output, "1 bindings have been rebuilt")
- assert.NotContains(t, output, "1 integrations have been rebuilt")
-}
-
-func TestRebuildIntegrationOnly(t *testing.T) {
- defaultIntegration := nominalIntegration("my-it-test")
- defaultKB := nominalBinding("my-kb-test-rebuild")
- itGeneratedByKlb := nominalIntegration("my-kb-test-rebuild")
- itGeneratedByKlb.Labels = map[string]string{
- kubernetes.CamelCreatorLabelKind: "Binding",
- }
-
- _, rebuildCmd, _ := initializeRebuildOptions(t, &defaultIntegration, &defaultKB, &itGeneratedByKlb)
- output, err := test.ExecuteCommand(rebuildCmd, cmdRebuild, "my-it-test")
- assert.Nil(t, err)
- assert.NotContains(t, output, "1 bindings have been rebuilt")
- assert.Contains(t, output, "1 integrations have been rebuilt")
-}
diff --git a/pkg/cmd/reset.go b/pkg/cmd/reset.go
index 5a7023638..39a42ebf5 100644
--- a/pkg/cmd/reset.go
+++ b/pkg/cmd/reset.go
@@ -21,6 +21,7 @@ import (
"fmt"
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/v2/pkg/client"
"github.com/pkg/errors"
"github.com/spf13/cobra"
@@ -67,6 +68,12 @@ func (o *resetCmdOptions) reset(cmd *cobra.Command, _ []string) {
return
}
fmt.Fprintln(cmd.OutOrStdout(), n, "bindings deleted from namespace", o.Namespace)
+
+ if n, err = o.deleteAllKameletBindings(c); err != nil {
+ fmt.Fprint(cmd.ErrOrStderr(), err)
+ return
+ }
+ fmt.Fprintln(cmd.OutOrStdout(), n, "kameletbindings deleted from namespace", o.Namespace)
}
if !o.SkipIntegrations {
@@ -128,12 +135,26 @@ func (o *resetCmdOptions) deleteAllIntegrationKits(c client.Client) (int, error)
func (o *resetCmdOptions) deleteAllBindings(c client.Client) (int, error) {
list := v1.NewBindingList()
if err := c.List(o.Context, &list, k8sclient.InNamespace(o.Namespace)); err != nil {
- return 0, errors.Wrap(err, fmt.Sprintf("could not retrieveBindings from namespace %s", o.Namespace))
+ return 0, errors.Wrap(err, fmt.Sprintf("could not retrieve Bindings from namespace %s", o.Namespace))
+ }
+ for _, i := range list.Items {
+ klb := i
+ if err := c.Delete(o.Context, &klb); err != nil {
+ return 0, errors.Wrap(err, fmt.Sprintf("could not delete Binding %s from namespace %s", klb.Name, klb.Namespace))
+ }
+ }
+ return len(list.Items), nil
+}
+
+func (o *resetCmdOptions) deleteAllKameletBindings(c client.Client) (int, error) {
+ list := v1alpha1.NewKameletBindingList()
+ if err := c.List(o.Context, &list, k8sclient.InNamespace(o.Namespace)); err != nil {
+ return 0, errors.Wrap(err, fmt.Sprintf("could not retrieve KameletBindings from namespace %s", o.Namespace))
}
for _, i := range list.Items {
klb := i
if err := c.Delete(o.Context, &klb); err != nil {
- return 0, errors.Wrap(err, fmt.Sprintf("could not deleteBinding %s from namespace %s", klb.Name, klb.Namespace))
+ return 0, errors.Wrap(err, fmt.Sprintf("could not delete KameletBinding %s from namespace %s", klb.Name, klb.Namespace))
}
}
return len(list.Items), nil
diff --git a/pkg/controller/add_kameletbinding.go b/pkg/controller/add_binding.go
similarity index 100%
copy from pkg/controller/add_kameletbinding.go
copy to pkg/controller/add_binding.go
diff --git a/pkg/controller/add_kameletbinding.go b/pkg/controller/add_kameletbinding.go
index 9e977f7a1..c517b61b0 100644
--- a/pkg/controller/add_kameletbinding.go
+++ b/pkg/controller/add_kameletbinding.go
@@ -18,9 +18,9 @@ limitations under the License.
package controller
import (
- "github.com/apache/camel-k/v2/pkg/controller/binding"
+ "github.com/apache/camel-k/v2/pkg/controller/kameletbinding"
)
func init() {
- addToManager = append(addToManager, binding.Add)
+ addToManager = append(addToManager, kameletbinding.Add)
}
diff --git a/pkg/controller/add_kameletbinding.go b/pkg/controller/kameletbinding/action.go
similarity index 50%
copy from pkg/controller/add_kameletbinding.go
copy to pkg/controller/kameletbinding/action.go
index 9e977f7a1..4900f70ba 100644
--- a/pkg/controller/add_kameletbinding.go
+++ b/pkg/controller/kameletbinding/action.go
@@ -15,12 +15,40 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package controller
+package kameletbinding
import (
- "github.com/apache/camel-k/v2/pkg/controller/binding"
+ "context"
+
+ "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
+ "github.com/apache/camel-k/v2/pkg/client"
+ "github.com/apache/camel-k/v2/pkg/util/log"
)
-func init() {
- addToManager = append(addToManager, binding.Add)
+// Action --.
+type Action interface {
+ client.Injectable
+ log.Injectable
+
+ // a user friendly name for the action
+ Name() string
+
+ // returns true if the action can handle the binding
+ CanHandle(kb *v1alpha1.KameletBinding) bool
+
+ // executes the handling function
+ Handle(ctx context.Context, kb *v1alpha1.KameletBinding) (*v1alpha1.KameletBinding, error)
+}
+
+type baseAction struct {
+ client client.Client
+ L log.Logger
+}
+
+func (action *baseAction) InjectClient(client client.Client) {
+ action.client = client
+}
+
+func (action *baseAction) InjectLogger(log log.Logger) {
+ action.L = log
}
diff --git a/pkg/controller/kameletbinding/error_handler.go b/pkg/controller/kameletbinding/error_handler.go
new file mode 100644
index 000000000..571934a1a
--- /dev/null
+++ b/pkg/controller/kameletbinding/error_handler.go
@@ -0,0 +1,109 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kameletbinding
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
+ "github.com/apache/camel-k/v2/pkg/util/bindings"
+ "github.com/pkg/errors"
+)
+
+func maybeErrorHandler(errHandlConf *v1alpha1.ErrorHandlerSpec, bindingContext bindings.V1alpha1BindingContext) (*bindings.Binding, error) {
+ var errorHandlerBinding *bindings.Binding
+ if errHandlConf != nil {
+ errorHandlerSpec, err := parseErrorHandler(errHandlConf.RawMessage)
+ if err != nil {
+ return nil, errors.Wrap(err, "could not parse error handler")
+ }
+ // We need to get the translated URI from any referenced resource (ie, kamelets)
+ if errorHandlerSpec.Type() == v1alpha1.ErrorHandlerTypeSink {
+ errorHandlerBinding, err = bindings.TranslateV1alpha1(bindingContext, bindings.V1alpha1EndpointContext{Type: v1alpha1.EndpointTypeErrorHandler}, *errorHandlerSpec.Endpoint())
+ if err != nil {
+ return nil, errors.Wrap(err, "could not determine error handler URI")
+ }
+ } else {
+ // Create a new binding otherwise in order to store error handler application properties
+ errorHandlerBinding = &bindings.Binding{
+ ApplicationProperties: make(map[string]string),
+ }
+ }
+
+ err = setErrorHandlerConfiguration(errorHandlerBinding, errorHandlerSpec)
+ if err != nil {
+ return nil, errors.Wrap(err, "could not set integration error handler")
+ }
+
+ return errorHandlerBinding, nil
+ }
+ return nil, nil
+}
+
+func parseErrorHandler(rawMessage v1alpha1.RawMessage) (v1alpha1.ErrorHandler, error) {
+ var properties map[v1alpha1.ErrorHandlerType]v1alpha1.RawMessage
+ err := json.Unmarshal(rawMessage, &properties)
+ if err != nil {
+ return nil, err
+ }
+ if len(properties) > 1 {
+ return nil, errors.Errorf("You must provide just 1 error handler, provided %d", len(properties))
+ }
+
+ for errHandlType, errHandlValue := range properties {
+ var dst v1alpha1.ErrorHandler
+ switch errHandlType {
+ case v1alpha1.ErrorHandlerTypeNone:
+ dst = new(v1alpha1.ErrorHandlerNone)
+ case v1alpha1.ErrorHandlerTypeLog:
+ dst = new(v1alpha1.ErrorHandlerLog)
+ case v1alpha1.ErrorHandlerTypeSink:
+ dst = new(v1alpha1.ErrorHandlerSink)
+ default:
+ return nil, errors.Errorf("Unknown error handler type %s", errHandlType)
+ }
+
+ err := json.Unmarshal(errHandlValue, dst)
+ if err != nil {
+ return nil, err
+ }
+
+ return dst, nil
+ }
+
+ return nil, errors.New("You must provide any supported error handler")
+}
+
+func setErrorHandlerConfiguration(errorHandlerBinding *bindings.Binding, errorHandler v1alpha1.ErrorHandler) error {
+ properties, err := errorHandler.Configuration()
+ if err != nil {
+ return err
+ }
+ // initialize map if not yet initialized
+ if errorHandlerBinding.ApplicationProperties == nil {
+ errorHandlerBinding.ApplicationProperties = make(map[string]string)
+ }
+ for key, value := range properties {
+ errorHandlerBinding.ApplicationProperties[key] = fmt.Sprintf("%v", value)
+ }
+ if errorHandler.Type() == v1alpha1.ErrorHandlerTypeSink && errorHandlerBinding.URI != "" {
+ errorHandlerBinding.ApplicationProperties[fmt.Sprintf("%s.deadLetterUri", v1alpha1.ErrorHandlerAppPropertiesPrefix)] = fmt.Sprintf("%v", errorHandlerBinding.URI)
+ }
+ return nil
+}
diff --git a/pkg/controller/kameletbinding/error_handler_test.go b/pkg/controller/kameletbinding/error_handler_test.go
new file mode 100644
index 000000000..6abbf2187
--- /dev/null
+++ b/pkg/controller/kameletbinding/error_handler_test.go
@@ -0,0 +1,103 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kameletbinding
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestParseErrorHandlerNoneDoesSucceed(t *testing.T) {
+ noErrorHandler, err := parseErrorHandler(
+ []byte(`{"none": null}`),
+ )
+ assert.Nil(t, err)
+ assert.Equal(t, v1alpha1.ErrorHandlerTypeNone, noErrorHandler.Type())
+ parameters, err := noErrorHandler.Configuration()
+ assert.Nil(t, err)
+ assert.Equal(t, "#class:org.apache.camel.builder.NoErrorHandlerBuilder", parameters[v1alpha1.ErrorHandlerAppPropertiesPrefix])
+ assert.Equal(t, v1alpha1.ErrorHandlerRefDefaultName, parameters[v1alpha1.ErrorHandlerRefName])
+}
+
+func TestParseErrorHandlerLogDoesSucceed(t *testing.T) {
+ logErrorHandler, err := parseErrorHandler(
+ []byte(`{"log": null}`),
+ )
+ assert.Nil(t, err)
+ assert.Equal(t, v1alpha1.ErrorHandlerTypeLog, logErrorHandler.Type())
+ parameters, err := logErrorHandler.Configuration()
+ assert.Nil(t, err)
+ assert.Equal(t, "#class:org.apache.camel.builder.DefaultErrorHandlerBuilder", parameters[v1alpha1.ErrorHandlerAppPropertiesPrefix])
+ assert.Equal(t, v1alpha1.ErrorHandlerRefDefaultName, parameters[v1alpha1.ErrorHandlerRefName])
+}
+
+func TestParseErrorHandlerLogWithParametersDoesSucceed(t *testing.T) {
+ logErrorHandler, err := parseErrorHandler(
+ []byte(`{"log": {"parameters": {"param1": "value1", "param2": "value2"}}}`),
+ )
+ assert.Nil(t, err)
+ assert.Equal(t, v1alpha1.ErrorHandlerTypeLog, logErrorHandler.Type())
+ parameters, err := logErrorHandler.Configuration()
+ assert.Nil(t, err)
+ assert.Equal(t, "#class:org.apache.camel.builder.DefaultErrorHandlerBuilder", parameters[v1alpha1.ErrorHandlerAppPropertiesPrefix])
+ assert.Equal(t, "value1", parameters["camel.beans.defaultErrorHandler.param1"])
+ assert.Equal(t, "value2", parameters["camel.beans.defaultErrorHandler.param2"])
+ assert.Equal(t, v1alpha1.ErrorHandlerRefDefaultName, parameters[v1alpha1.ErrorHandlerRefName])
+}
+
+func TestParseErrorHandlerSinkDoesSucceed(t *testing.T) {
+ fmt.Println("Test")
+ sinkErrorHandler, err := parseErrorHandler(
+ []byte(`{"sink": {"endpoint": {"uri": "someUri"}}}`),
+ )
+ assert.Nil(t, err)
+ assert.NotNil(t, sinkErrorHandler)
+ assert.Equal(t, v1alpha1.ErrorHandlerTypeSink, sinkErrorHandler.Type())
+ assert.Equal(t, "someUri", *sinkErrorHandler.Endpoint().URI)
+ parameters, err := sinkErrorHandler.Configuration()
+ assert.Nil(t, err)
+ assert.Equal(t, "#class:org.apache.camel.builder.DeadLetterChannelBuilder", parameters[v1alpha1.ErrorHandlerAppPropertiesPrefix])
+ assert.Equal(t, v1alpha1.ErrorHandlerRefDefaultName, parameters[v1alpha1.ErrorHandlerRefName])
+}
+
+func TestParseErrorHandlerSinkWithParametersDoesSucceed(t *testing.T) {
+ sinkErrorHandler, err := parseErrorHandler(
+ []byte(`{
+ "sink": {
+ "endpoint": {
+ "uri": "someUri"
+ },
+ "parameters":
+ {"param1": "value1", "param2": "value2"}
+ }
+ }`),
+ )
+ assert.Nil(t, err)
+ assert.NotNil(t, sinkErrorHandler)
+ assert.Equal(t, v1alpha1.ErrorHandlerTypeSink, sinkErrorHandler.Type())
+ assert.Equal(t, "someUri", *sinkErrorHandler.Endpoint().URI)
+ parameters, err := sinkErrorHandler.Configuration()
+ assert.Nil(t, err)
+ assert.Equal(t, "#class:org.apache.camel.builder.DeadLetterChannelBuilder", parameters[v1alpha1.ErrorHandlerAppPropertiesPrefix])
+ assert.Equal(t, v1alpha1.ErrorHandlerRefDefaultName, parameters[v1alpha1.ErrorHandlerRefName])
+ assert.Equal(t, "value1", parameters["camel.beans.defaultErrorHandler.param1"])
+ assert.Equal(t, "value2", parameters["camel.beans.defaultErrorHandler.param2"])
+}
diff --git a/pkg/controller/kameletbinding/initialize.go b/pkg/controller/kameletbinding/initialize.go
new file mode 100644
index 000000000..35afba27f
--- /dev/null
+++ b/pkg/controller/kameletbinding/initialize.go
@@ -0,0 +1,131 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kameletbinding
+
+import (
+ "context"
+ "strings"
+
+ v1alpha1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
+
+ "github.com/apache/camel-k/v2/pkg/kamelet/repository"
+ "github.com/apache/camel-k/v2/pkg/platform"
+ "github.com/apache/camel-k/v2/pkg/util/kubernetes"
+ "github.com/apache/camel-k/v2/pkg/util/patch"
+ "github.com/pkg/errors"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+// NewInitializeAction returns a action that initializes the KameletBinding configuration when not provided by the user.
+func NewInitializeAction() Action {
+ return &initializeAction{}
+}
+
+type initializeAction struct {
+ baseAction
+}
+
+func (action *initializeAction) Name() string {
+ return "initialize"
+}
+
+func (action *initializeAction) CanHandle(binding *v1alpha1.KameletBinding) bool {
+ return binding.Status.Phase == v1alpha1.KameletBindingPhaseNone
+}
+
+func (action *initializeAction) Handle(ctx context.Context, binding *v1alpha1.KameletBinding) (*v1alpha1.KameletBinding, error) {
+ it, err := CreateIntegrationFor(ctx, action.client, binding)
+ if err != nil {
+ binding.Status.Phase = v1alpha1.KameletBindingPhaseError
+ binding.Status.SetErrorCondition(v1alpha1.KameletBindingIntegrationConditionError,
+ "Couldn't create an Integration custom resource", err)
+ return binding, err
+ }
+
+ if _, err := kubernetes.ReplaceResource(ctx, action.client, it); err != nil {
+ return nil, errors.Wrap(err, "could not create integration for KameletBinding")
+ }
+
+ // propagate Kamelet icon (best effort)
+ action.propagateIcon(ctx, binding)
+
+ target := binding.DeepCopy()
+ target.Status.Phase = v1alpha1.KameletBindingPhaseCreating
+ return target, nil
+}
+
+func (action *initializeAction) propagateIcon(ctx context.Context, binding *v1alpha1.KameletBinding) {
+ icon, err := action.findIcon(ctx, binding)
+ if err != nil {
+ action.L.Errorf(err, "cannot find icon for KameletBinding %q", binding.Name)
+ return
+ }
+ if icon == "" {
+ return
+ }
+ // compute patch
+ clone := binding.DeepCopy()
+ clone.Annotations = make(map[string]string)
+ for k, v := range binding.Annotations {
+ clone.Annotations[k] = v
+ }
+ if _, ok := clone.Annotations[v1alpha1.AnnotationIcon]; !ok {
+ clone.Annotations[v1alpha1.AnnotationIcon] = icon
+ }
+ p, err := patch.MergePatch(binding, clone)
+ if err != nil {
+ action.L.Errorf(err, "cannot compute patch to update icon for KameletBinding %q", binding.Name)
+ return
+ }
+ if len(p) > 0 {
+ if err := action.client.Patch(ctx, clone, client.RawPatch(types.MergePatchType, p)); err != nil {
+ action.L.Errorf(err, "cannot apply merge patch to update icon for KameletBinding %q", binding.Name)
+ return
+ }
+ }
+}
+
+func (action *initializeAction) findIcon(ctx context.Context, binding *v1alpha1.KameletBinding) (string, error) {
+ var kameletRef *corev1.ObjectReference
+ if binding.Spec.Source.Ref != nil && binding.Spec.Source.Ref.Kind == "Kamelet" && strings.HasPrefix(binding.Spec.Source.Ref.APIVersion, "camel.apache.org/") {
+ kameletRef = binding.Spec.Source.Ref
+ } else if binding.Spec.Sink.Ref != nil && binding.Spec.Sink.Ref.Kind == "Kamelet" && strings.HasPrefix(binding.Spec.Sink.Ref.APIVersion, "camel.apache.org/") {
+ kameletRef = binding.Spec.Sink.Ref
+ }
+
+ if kameletRef == nil {
+ return "", nil
+ }
+
+ repo, err := repository.New(ctx, action.client, binding.Namespace, platform.GetOperatorNamespace())
+ if err != nil {
+ return "", err
+ }
+
+ kamelet, err := repo.Get(ctx, kameletRef.Name)
+ if err != nil {
+ return "", err
+ }
+ if kamelet == nil {
+ return "", nil
+ }
+
+ return kamelet.Annotations[v1alpha1.AnnotationIcon], nil
+}
diff --git a/pkg/controller/kameletbinding/integration.go b/pkg/controller/kameletbinding/integration.go
new file mode 100644
index 000000000..1d4fdc796
--- /dev/null
+++ b/pkg/controller/kameletbinding/integration.go
@@ -0,0 +1,265 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kameletbinding
+
+import (
+ "context"
+ "encoding/json"
+ "sort"
+
+ "github.com/pkg/errors"
+
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
+ "github.com/apache/camel-k/v2/pkg/client"
+ "github.com/apache/camel-k/v2/pkg/platform"
+ "github.com/apache/camel-k/v2/pkg/util"
+ "github.com/apache/camel-k/v2/pkg/util/bindings"
+ "github.com/apache/camel-k/v2/pkg/util/knative"
+ "github.com/apache/camel-k/v2/pkg/util/kubernetes"
+ "github.com/apache/camel-k/v2/pkg/util/property"
+)
+
+var (
+ endpointTypeSourceContext = bindings.V1alpha1EndpointContext{Type: v1alpha1.EndpointTypeSource}
+ endpointTypeSinkContext = bindings.V1alpha1EndpointContext{Type: v1alpha1.EndpointTypeSink}
+)
+
+func CreateIntegrationFor(ctx context.Context, c client.Client, binding *v1alpha1.KameletBinding) (*v1.Integration, error) {
+ controller := true
+ blockOwnerDeletion := true
+ annotations := util.CopyMap(binding.Annotations)
+ // avoid propagating the icon to the integration as it's heavyweight and not needed
+ delete(annotations, v1alpha1.AnnotationIcon)
+
+ it := v1.Integration{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: binding.Namespace,
+ Name: binding.Name,
+ Annotations: annotations,
+ Labels: util.CopyMap(binding.Labels),
+ OwnerReferences: []metav1.OwnerReference{
+ {
+ APIVersion: binding.APIVersion,
+ Kind: binding.Kind,
+ Name: binding.Name,
+ UID: binding.UID,
+ Controller: &controller,
+ BlockOwnerDeletion: &blockOwnerDeletion,
+ },
+ },
+ },
+ }
+
+ // creator labels
+ if it.GetLabels() == nil {
+ it.SetLabels(make(map[string]string))
+ }
+ it.GetLabels()[kubernetes.CamelCreatorLabelKind] = binding.Kind
+ it.GetLabels()[kubernetes.CamelCreatorLabelName] = binding.Name
+
+ // start from the integration spec defined in the binding
+ if binding.Spec.Integration != nil {
+ it.Spec = *binding.Spec.Integration.DeepCopy()
+ }
+
+ // Set replicas (or override podspecable value) if present
+ if binding.Spec.Replicas != nil {
+ replicas := *binding.Spec.Replicas
+ it.Spec.Replicas = &replicas
+ }
+
+ profile, err := determineProfile(ctx, c, binding)
+ if err != nil {
+ return nil, err
+ }
+ it.Spec.Profile = profile
+
+ if binding.Spec.ServiceAccountName != "" {
+ it.Spec.ServiceAccountName = binding.Spec.ServiceAccountName
+ }
+
+ bindingContext := bindings.V1alpha1BindingContext{
+ Ctx: ctx,
+ Client: c,
+ Namespace: it.Namespace,
+ Profile: profile,
+ }
+
+ from, err := bindings.TranslateV1alpha1(bindingContext, endpointTypeSourceContext, binding.Spec.Source)
+ if err != nil {
+ return nil, errors.Wrap(err, "could not determine source URI")
+ }
+ to, err := bindings.TranslateV1alpha1(bindingContext, endpointTypeSinkContext, binding.Spec.Sink)
+ if err != nil {
+ return nil, errors.Wrap(err, "could not determine sink URI")
+ }
+ // error handler is optional
+ errorHandler, err := maybeErrorHandler(binding.Spec.ErrorHandler, bindingContext)
+ if err != nil {
+ return nil, errors.Wrap(err, "could not determine error handler")
+ }
+
+ steps := make([]*bindings.Binding, 0, len(binding.Spec.Steps))
+ for idx, step := range binding.Spec.Steps {
+ position := idx
+ stepKameletBinding, err := bindings.TranslateV1alpha1(bindingContext, bindings.V1alpha1EndpointContext{
+ Type: v1alpha1.EndpointTypeAction,
+ Position: &position,
+ }, step)
+ if err != nil {
+ return nil, errors.Wrapf(err, "could not determine URI for step %d", idx)
+ }
+ steps = append(steps, stepKameletBinding)
+ }
+
+ if to.Step == nil && to.URI == "" {
+ return nil, errors.Errorf("illegal step definition for sink step: either Step or URI should be provided")
+ }
+ if from.URI == "" {
+ return nil, errors.Errorf("illegal step definition for source step: URI should be provided")
+ }
+ for index, step := range steps {
+ if step.Step == nil && step.URI == "" {
+ return nil, errors.Errorf("illegal step definition for step %d: either Step or URI should be provided", index)
+ }
+ }
+
+ if err := configureKameletBinding(&it, from); err != nil {
+ return nil, err
+ }
+
+ if err := configureKameletBinding(&it, steps...); err != nil {
+ return nil, err
+ }
+
+ if err := configureKameletBinding(&it, to); err != nil {
+ return nil, err
+ }
+
+ if err := configureKameletBinding(&it, errorHandler); err != nil {
+ return nil, err
+ }
+
+ if it.Spec.Configuration != nil {
+ sort.SliceStable(it.Spec.Configuration, func(i, j int) bool {
+ mi, mj := it.Spec.Configuration[i], it.Spec.Configuration[j]
+ switch {
+ case mi.Type != mj.Type:
+ return mi.Type < mj.Type
+ default:
+ return mi.Value < mj.Value
+ }
+ })
+ }
+
+ dslSteps := make([]map[string]interface{}, 0)
+
+ if from.Step != nil {
+ dslSteps = append(dslSteps, from.AsYamlDSL())
+ }
+
+ for _, step := range steps {
+ dslSteps = append(dslSteps, step.AsYamlDSL())
+ }
+
+ if to.Step != nil {
+ dslSteps = append(dslSteps, to.AsYamlDSL())
+ }
+ dslSteps = append(dslSteps, map[string]interface{}{
+ "to": to.URI,
+ })
+
+ fromWrapper := map[string]interface{}{
+ "uri": from.URI,
+ "steps": dslSteps,
+ }
+
+ flowRoute := map[string]interface{}{
+ "route": map[string]interface{}{
+ "id": "binding",
+ "from": fromWrapper,
+ },
+ }
+ encodedRoute, err := json.Marshal(flowRoute)
+ if err != nil {
+ return nil, err
+ }
+ it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedRoute})
+
+ return &it, nil
+}
+
+func configureKameletBinding(integration *v1.Integration, bindings ...*bindings.Binding) error {
+ for _, b := range bindings {
+ if b == nil {
+ continue
+ }
+ if err := integration.Spec.Traits.Merge(b.Traits); err != nil {
+ return err
+ }
+ for k, v := range b.ApplicationProperties {
+ entry, err := property.EncodePropertyFileEntry(k, v)
+ if err != nil {
+ return err
+ }
+
+ integration.Spec.Configuration = append(integration.Spec.Configuration, v1.ConfigurationSpec{
+ Type: "property",
+ Value: entry,
+ })
+ }
+
+ }
+
+ return nil
+}
+
+func determineProfile(ctx context.Context, c client.Client, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) {
+ if binding.Spec.Integration != nil && binding.Spec.Integration.Profile != "" {
+ return binding.Spec.Integration.Profile, nil
+ }
+ pl, err := platform.GetForResource(ctx, c, binding)
+ if err != nil && !k8serrors.IsNotFound(err) {
+ return "", errors.Wrap(err, "error while retrieving the integration platform")
+ }
+ if pl != nil {
+ if pl.Status.Profile != "" {
+ return pl.Status.Profile, nil
+ }
+ if pl.Spec.Profile != "" {
+ return pl.Spec.Profile, nil
+ }
+ }
+ if ok, err := knative.IsInstalled(c); err != nil {
+ return "", err
+ } else if ok {
+ return v1.TraitProfileKnative, nil
+ }
+ if pl != nil {
+ // Determine profile from cluster type
+ plProfile := platform.GetProfile(pl)
+ if plProfile != "" {
+ return plProfile, nil
+ }
+ }
+ return v1.DefaultTraitProfile, nil
+}
diff --git a/pkg/controller/kameletbinding/kameletbinding_controller.go b/pkg/controller/kameletbinding/kameletbinding_controller.go
new file mode 100644
index 000000000..0553b4d92
--- /dev/null
+++ b/pkg/controller/kameletbinding/kameletbinding_controller.go
@@ -0,0 +1,239 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kameletbinding
+
+import (
+ "context"
+
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/client-go/tools/record"
+
+ v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
+ ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/event"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+
+ "github.com/apache/camel-k/v2/pkg/client"
+ "github.com/apache/camel-k/v2/pkg/trait"
+
+ camelevent "github.com/apache/camel-k/v2/pkg/event"
+ "github.com/apache/camel-k/v2/pkg/platform"
+ "github.com/apache/camel-k/v2/pkg/util/log"
+ "github.com/apache/camel-k/v2/pkg/util/monitoring"
+)
+
+// Add creates a new KameletBinding Controller and adds it to the Manager. The Manager will set fields on the Controller
+// and Start it when the Manager is Started.
+func Add(ctx context.Context, mgr manager.Manager, c client.Client) error {
+ return add(mgr, newReconciler(mgr, c))
+}
+
+func newReconciler(mgr manager.Manager, c client.Client) reconcile.Reconciler {
+ return monitoring.NewInstrumentedReconciler(
+ &ReconcileKameletBinding{
+ client: c,
+ scheme: mgr.GetScheme(),
+ recorder: mgr.GetEventRecorderFor("camel-k-kamelet-binding-controller"),
+ },
+ schema.GroupVersionKind{
+ Group: v1alpha1.SchemeGroupVersion.Group,
+ Version: v1alpha1.SchemeGroupVersion.Version,
+ Kind: v1alpha1.KameletBindingKind,
+ },
+ )
+}
+
+func add(mgr manager.Manager, r reconcile.Reconciler) error {
+ c, err := controller.New("kamelet-binding-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+
+ // Watch for changes to primary resource KameletBinding
+ err = c.Watch(&source.Kind{Type: &v1alpha1.KameletBinding{}},
+ &handler.EnqueueRequestForObject{},
+ platform.FilteringFuncs{
+ UpdateFunc: func(e event.UpdateEvent) bool {
+ oldKameletBinding, ok := e.ObjectOld.(*v1alpha1.KameletBinding)
+ if !ok {
+ return false
+ }
+ newKameletBinding, ok := e.ObjectNew.(*v1alpha1.KameletBinding)
+ if !ok {
+ return false
+ }
+
+ // If traits have changed, the reconciliation loop must kick in as
+ // traits may have impact
+ sameTraits, err := trait.KameletBindingsHaveSameTraits(oldKameletBinding, newKameletBinding)
+ if err != nil {
+ Log.ForKameletBinding(newKameletBinding).Error(
+ err,
+ "unable to determine if old and new resource have the same traits")
+ }
+ if !sameTraits {
+ return true
+ }
+
+ // Ignore updates to the binding status in which case metadata.Generation
+ // does not change, or except when the binding phase changes as it's used
+ // to transition from one phase to another
+ return oldKameletBinding.Generation != newKameletBinding.Generation ||
+ oldKameletBinding.Status.Phase != newKameletBinding.Status.Phase
+ },
+ DeleteFunc: func(e event.DeleteEvent) bool {
+ // Evaluates to false if the object has been confirmed deleted
+ return !e.DeleteStateUnknown
+ },
+ },
+ )
+ if err != nil {
+ return err
+ }
+
+ // Watch Integration to propagate changes downstream
+ err = c.Watch(&source.Kind{Type: &v1.Integration{}}, &handler.EnqueueRequestForOwner{
+ OwnerType: &v1alpha1.KameletBinding{},
+ IsController: false,
+ })
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+var _ reconcile.Reconciler = &ReconcileKameletBinding{}
+
+// ReconcileKameletBinding reconciles a KameletBinding object.
+type ReconcileKameletBinding struct {
+ // This client, initialized using mgr.Client() above, is a split client
+ // that reads objects from the cache and writes to the API server
+ client client.Client
+ scheme *runtime.Scheme
+ recorder record.EventRecorder
+}
+
+// Reconcile reads that state of the cluster for a KameletBinding object and makes changes based
+// on the state read and what is in the KameletBinding.Spec
+// Note:
+// The Controller will requeue the Request to be processed again if the returned error is non-nil or
+// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
+func (r *ReconcileKameletBinding) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
+ rlog := Log.WithValues("request-namespace", request.Namespace, "request-name", request.Name)
+ rlog.Info("Reconciling KameletBinding")
+
+ // Make sure the operator is allowed to act on namespace
+ if ok, err := platform.IsOperatorAllowedOnNamespace(ctx, r.client, request.Namespace); err != nil {
+ return reconcile.Result{}, err
+ } else if !ok {
+ rlog.Info("Ignoring request because namespace is locked")
+ return reconcile.Result{}, nil
+ }
+
+ // Fetch the KameletBinding instance
+ var instance v1alpha1.KameletBinding
+
+ if err := r.client.Get(ctx, request.NamespacedName, &instance); err != nil {
+ if errors.IsNotFound(err) {
+ // Request object not found, could have been deleted after reconcile request.
+ // Owned objects are automatically garbage collected. For additional cleanup
+ // logic use finalizers.
+
+ // Return and don't requeue
+ return reconcile.Result{}, nil
+ }
+ // Error reading the object - requeue the request.
+ return reconcile.Result{}, err
+ }
+
+ // Only process resources assigned to the operator
+ if !platform.IsOperatorHandlerConsideringLock(ctx, r.client, request.Namespace, &instance) {
+ rlog.Info("Ignoring request because resource is not assigned to current operator")
+ return reconcile.Result{}, nil
+ }
+
+ actions := []Action{
+ NewInitializeAction(),
+ NewMonitorAction(),
+ }
+
+ var err error
+
+ target := instance.DeepCopy()
+ targetLog := rlog.ForKameletBinding(target)
+
+ for _, a := range actions {
+ a.InjectClient(r.client)
+ a.InjectLogger(targetLog)
+
+ if a.CanHandle(target) {
+ targetLog.Infof("Invoking action %s", a.Name())
+
+ target, err = a.Handle(ctx, target)
+ if err != nil {
+ camelevent.NotifyKameletBindingError(ctx, r.client, r.recorder, &instance, target, err)
+ // Update the binding (mostly just to update its phase) if the new instance is returned
+ if target != nil {
+ _ = r.update(ctx, &instance, target, &targetLog)
+ }
+ return reconcile.Result{}, err
+ }
+
+ if target != nil {
+ if err := r.update(ctx, &instance, target, &targetLog); err != nil {
+ camelevent.NotifyKameletBindingError(ctx, r.client, r.recorder, &instance, target, err)
+ return reconcile.Result{}, err
+ }
+ }
+
+ // handle one action at time so the resource
+ // is always at its latest state
+ camelevent.NotifyKameletBindingUpdated(ctx, r.client, r.recorder, &instance, target)
+ break
+ }
+ }
+
+ return reconcile.Result{}, nil
+}
+
+func (r *ReconcileKameletBinding) update(ctx context.Context, base *v1alpha1.KameletBinding, target *v1alpha1.KameletBinding, log *log.Logger) error {
+ target.Status.ObservedGeneration = base.Generation
+
+ if err := r.client.Status().Patch(ctx, target, ctrl.MergeFrom(base)); err != nil {
+ camelevent.NotifyKameletBindingError(ctx, r.client, r.recorder, base, target, err)
+ return err
+ }
+
+ if target.Status.Phase != base.Status.Phase {
+ log.Info(
+ "state transition",
+ "phase-from", base.Status.Phase,
+ "phase-to", target.Status.Phase,
+ )
+ }
+
+ return nil
+}
diff --git a/pkg/controller/add_kameletbinding.go b/pkg/controller/kameletbinding/log.go
similarity index 83%
copy from pkg/controller/add_kameletbinding.go
copy to pkg/controller/kameletbinding/log.go
index 9e977f7a1..a7a70a117 100644
--- a/pkg/controller/add_kameletbinding.go
+++ b/pkg/controller/kameletbinding/log.go
@@ -15,12 +15,9 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package controller
+package kameletbinding
-import (
- "github.com/apache/camel-k/v2/pkg/controller/binding"
-)
+import "github.com/apache/camel-k/v2/pkg/util/log"
-func init() {
- addToManager = append(addToManager, binding.Add)
-}
+// Log --.
+var Log = log.Log.WithName("controller").WithName("kameletbinding")
diff --git a/pkg/controller/kameletbinding/monitor.go b/pkg/controller/kameletbinding/monitor.go
new file mode 100644
index 000000000..26cbea1f0
--- /dev/null
+++ b/pkg/controller/kameletbinding/monitor.go
@@ -0,0 +1,218 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kameletbinding
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/pkg/errors"
+
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/equality"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+
+ "sigs.k8s.io/controller-runtime/pkg/client"
+
+ v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
+ "github.com/apache/camel-k/v2/pkg/trait"
+)
+
+// NewMonitorAction returns an action that monitors the Binding after it's fully initialized.
+func NewMonitorAction() Action {
+ return &monitorAction{}
+}
+
+type monitorAction struct {
+ baseAction
+}
+
+func (action *monitorAction) Name() string {
+ return "monitor"
+}
+
+func (action *monitorAction) CanHandle(binding *v1alpha1.KameletBinding) bool {
+ return binding.Status.Phase == v1alpha1.KameletBindingPhaseCreating ||
+ (binding.Status.Phase == v1alpha1.KameletBindingPhaseError &&
+ binding.Status.GetCondition(v1alpha1.KameletBindingIntegrationConditionError) == nil) ||
+ binding.Status.Phase == v1alpha1.KameletBindingPhaseReady
+}
+
+func (action *monitorAction) Handle(ctx context.Context, binding *v1alpha1.KameletBinding) (*v1alpha1.KameletBinding, error) {
+ key := client.ObjectKey{
+ Namespace: binding.Namespace,
+ Name: binding.Name,
+ }
+ it := v1.Integration{}
+ if err := action.client.Get(ctx, key, &it); err != nil && k8serrors.IsNotFound(err) {
+ target := binding.DeepCopy()
+ // Rebuild the integration
+ target.Status.Phase = v1alpha1.KameletBindingPhaseNone
+ target.Status.SetCondition(
+ v1alpha1.KameletBindingConditionReady,
+ corev1.ConditionFalse,
+ "",
+ "",
+ )
+ return target, nil
+ } else if err != nil {
+ return nil, errors.Wrapf(err, "could not load integration for Binding %q", binding.Name)
+ }
+
+ operatorIDChanged := v1.GetOperatorIDAnnotation(binding) != "" &&
+ (v1.GetOperatorIDAnnotation(binding) != v1.GetOperatorIDAnnotation(&it))
+
+ sameTraits, err := trait.IntegrationAndKameletBindingSameTraits(&it, binding)
+ if err != nil {
+ return nil, err
+ }
+
+ // Check if the integration needs to be changed
+ expected, err := CreateIntegrationFor(ctx, action.client, binding)
+ if err != nil {
+ binding.Status.Phase = v1alpha1.KameletBindingPhaseError
+ binding.Status.SetErrorCondition(v1alpha1.KameletBindingIntegrationConditionError,
+ "Couldn't create an Integration custom resource", err)
+ return binding, err
+ }
+
+ semanticEquality := equality.Semantic.DeepDerivative(expected.Spec, it.Spec)
+
+ if !semanticEquality || operatorIDChanged || !sameTraits {
+ action.L.Info(
+ "Binding needs a rebuild",
+ "semantic-equality", !semanticEquality,
+ "operatorid-changed", operatorIDChanged,
+ "traits-changed", !sameTraits)
+
+ // Binding has changed and needs rebuild
+ target := binding.DeepCopy()
+ // Rebuild the integration
+ target.Status.Phase = v1alpha1.KameletBindingPhaseNone
+ target.Status.SetCondition(
+ v1alpha1.KameletBindingConditionReady,
+ corev1.ConditionFalse,
+ "",
+ "",
+ )
+ return target, nil
+ }
+
+ // Map integration phase and conditions to Binding
+ target := binding.DeepCopy()
+
+ switch it.Status.Phase {
+
+ case v1.IntegrationPhaseRunning:
+ target.Status.Phase = v1alpha1.KameletBindingPhaseReady
+ setKameletBindingReadyCondition(target, &it)
+
+ case v1.IntegrationPhaseError:
+ target.Status.Phase = v1alpha1.KameletBindingPhaseError
+ setKameletBindingReadyCondition(target, &it)
+
+ default:
+ target.Status.Phase = v1alpha1.KameletBindingPhaseCreating
+
+ c := v1alpha1.KameletBindingCondition{
+ Type: v1alpha1.KameletBindingConditionReady,
+ Status: corev1.ConditionFalse,
+ Reason: string(target.Status.Phase),
+ Message: fmt.Sprintf("Integration %q is in %q phase", it.GetName(), target.Status.Phase),
+ }
+
+ if condition := it.Status.GetCondition(v1.IntegrationConditionReady); condition != nil {
+ if condition.Pods != nil {
+ c.Pods = make([]v1.PodCondition, 0, len(condition.Pods))
+ c.Pods = append(c.Pods, condition.Pods...)
+ }
+ }
+
+ target.Status.SetConditions(c)
+ }
+
+ // Mirror status replicas and selector
+ target.Status.Replicas = it.Status.Replicas
+ target.Status.Selector = it.Status.Selector
+
+ return target, nil
+}
+
+func setBindingReadyCondition(kb *v1.Binding, it *v1.Integration) {
+ if condition := it.Status.GetCondition(v1.IntegrationConditionReady); condition != nil {
+ message := condition.Message
+ if message == "" {
+ message = fmt.Sprintf("Integration %q readiness condition is %q", it.GetName(), condition.Status)
+ }
+
+ c := v1.BindingCondition{
+ Type: v1.BindingConditionReady,
+ Status: condition.Status,
+ Reason: condition.Reason,
+ Message: message,
+ }
+
+ if condition.Pods != nil {
+ c.Pods = make([]v1.PodCondition, 0, len(condition.Pods))
+ c.Pods = append(c.Pods, condition.Pods...)
+ }
+
+ kb.Status.SetConditions(c)
+
+ } else {
+ kb.Status.SetCondition(
+ v1.BindingConditionReady,
+ corev1.ConditionUnknown,
+ "",
+ fmt.Sprintf("Integration %q does not have a readiness condition", it.GetName()),
+ )
+ }
+}
+
+// Deprecated
+func setKameletBindingReadyCondition(kb *v1alpha1.KameletBinding, it *v1.Integration) {
+ if condition := it.Status.GetCondition(v1.IntegrationConditionReady); condition != nil {
+ message := condition.Message
+ if message == "" {
+ message = fmt.Sprintf("Integration %q readiness condition is %q", it.GetName(), condition.Status)
+ }
+
+ c := v1alpha1.KameletBindingCondition{
+ Type: v1alpha1.KameletBindingConditionReady,
+ Status: condition.Status,
+ Reason: condition.Reason,
+ Message: message,
+ }
+
+ if condition.Pods != nil {
+ c.Pods = make([]v1.PodCondition, 0, len(condition.Pods))
+ c.Pods = append(c.Pods, condition.Pods...)
+ }
+
+ kb.Status.SetConditions(c)
+
+ } else {
+ kb.Status.SetCondition(
+ v1alpha1.KameletBindingConditionReady,
+ corev1.ConditionUnknown,
+ "",
+ fmt.Sprintf("Integration %q does not have a readiness condition", it.GetName()),
+ )
+ }
+}
diff --git a/pkg/event/manager.go b/pkg/event/manager.go
index 0a019d161..55c788589 100644
--- a/pkg/event/manager.go
+++ b/pkg/event/manager.go
@@ -28,6 +28,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/v2/pkg/client"
"github.com/apache/camel-k/v2/pkg/util/kubernetes"
"github.com/apache/camel-k/v2/pkg/util/log"
@@ -254,6 +255,37 @@ func NotifyBindingError(ctx context.Context, c client.Client, recorder record.Ev
recorder.Eventf(k, corev1.EventTypeWarning, ReasonKameletError, "Cannot reconcile Binding %s: %v", k.Name, err)
}
+// NotifyKameletBindingUpdated automatically generates events when a KameletBinding changes.
+// Deprecated
+func NotifyKameletBindingUpdated(ctx context.Context, c client.Client, recorder record.EventRecorder, old, newResource *v1alpha1.KameletBinding) {
+ if newResource == nil {
+ return
+ }
+ oldPhase := ""
+ var oldConditions []v1.ResourceCondition
+ if old != nil {
+ oldPhase = string(old.Status.Phase)
+ oldConditions = old.Status.GetConditions()
+ }
+ if newResource.Status.Phase != v1alpha1.KameletBindingPhaseNone {
+ notifyIfConditionUpdated(recorder, newResource, oldConditions, newResource.Status.GetConditions(), "KameletBinding", newResource.Name, ReasonBindingConditionChanged)
+ }
+ notifyIfPhaseUpdated(ctx, c, recorder, newResource, oldPhase, string(newResource.Status.Phase), "KameletBinding", newResource.Name, ReasonBindingPhaseUpdated, "")
+}
+
+// NotifyKameletBindingError automatically generates error events when the binding reconcile cycle phase has an error.
+// Deprecated
+func NotifyKameletBindingError(ctx context.Context, c client.Client, recorder record.EventRecorder, old, newResource *v1alpha1.KameletBinding, err error) {
+ k := old
+ if newResource != nil {
+ k = newResource
+ }
+ if k == nil {
+ return
+ }
+ recorder.Eventf(k, corev1.EventTypeWarning, ReasonKameletError, "Cannot reconcile KameletBinding %s: %v", k.Name, err)
+}
+
// NotifyBuildUpdated automatically generates events when a build changes.
func NotifyBuildUpdated(ctx context.Context, c client.Client, recorder record.EventRecorder, old, newResource *v1.Build) {
if newResource == nil {
diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go
index ce7e3ddd5..c2d833616 100644
--- a/pkg/resources/resources.go
+++ b/pkg/resources/resources.go
@@ -117,9 +117,9 @@ var assets = func() http.FileSystem {
"/crd/bases/camel.apache.org_bindings.yaml": &vfsgen۰CompressedFileInfo{
name: "camel.apache.org_bindings.yaml",
modTime: time.Time{},
- uncompressedSize: 572366,
+ uncompressedSize: 572391,
- compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xfd\x79\x73\x1b\x37\xd6\x28\x8c\xff\xef\x4f\x81\x92\x53\x57\xd2\x13\x91\xb2\x33\x4b\xcd\xf8\x37\x75\x53\x1a\x59\x76\xf4\x8b\x2d\xb3\x2c\x25\xb9\x29\x27\x4f\x02\x76\x83\x24\xae\xba\x81\x7e\x00\x34\x25\xe6\xf5\xfb\xdd\xdf\xc2\x01\xd0\x0b\x37\xe1\x34\x25\x8d\x3c\xd3\x98\xaa\x8c\x49\xb1\x4f\x63\x3b\xfb\xf6\x9c\x0c\xee\x6f\x3c\x7b\x4e\xde\xf1\x84\x09\xcd\x52\x62\x24\x31\x33\x46\x4e\x0a\x9a\xcc\x18\xb9\x94\x13\x73\x43\x [...]
+ compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\xfd\x79\x73\x1b\x37\xd6\x28\x8c\xff\xef\x4f\x81\x92\x53\x57\xd2\x13\x91\xb2\x33\x4b\xcd\xf8\x37\x75\x53\x1a\x59\x76\xf4\x8b\x2d\xb3\x2c\x25\xb9\x29\x27\x4f\x02\x76\x83\x24\xae\xba\x81\x7e\x00\x34\x25\xe6\xf5\xfb\xdd\xdf\xc2\x01\xd0\x0b\x37\xe1\x34\x25\x8d\x3c\xd3\x98\xaa\x8c\x49\xb1\x4f\x63\x3b\xfb\xf6\x9c\x0c\xee\x6f\x3c\x7b\x4e\xde\xf1\x84\x09\xcd\x52\x62\x24\x31\x33\x46\x4e\x0a\x9a\xcc\x18\xb9\x94\x13\x73\x43\x [...]
},
"/crd/bases/camel.apache.org_builds.yaml": &vfsgen۰CompressedFileInfo{
name: "camel.apache.org_builds.yaml",
@@ -460,9 +460,9 @@ var assets = func() http.FileSystem {
"/rbac/operator-role.yaml": &vfsgen۰CompressedFileInfo{
name: "operator-role.yaml",
modTime: time.Time{},
- uncompressedSize: 3021,
+ uncompressedSize: 3164,
- compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x56\x41\x8f\xdb\x36\x13\xbd\xeb\x57\x0c\xac\x4b\x02\xec\xda\xdf\xd7\x53\xe1\x9e\xdc\x64\xb7\x35\x1a\xd8\xc0\xda\x69\x90\xe3\x88\x1a\xcb\x53\x53\x1c\x76\x48\xad\xd7\xfd\xf5\x05\x65\x29\xf6\xae\xec\xa0\x68\x02\x6c\x7d\x31\x35\x1c\xbd\x79\xf3\xe6\x51\x60\x0e\xb7\xdf\xef\x97\xe5\xf0\x81\x0d\xb9\x40\x25\x44\x81\xb8\x25\x98\x79\x34\x5b\x82\x95\x6c\xe2\x1e\x95\xe0\x5e\x1a\x57\x62\x64\x71\xf0\x66\xb6\xba\x7f\x0b\x8d\x2b\x [...]
+ compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x56\x4d\x8f\xe2\x46\x10\xbd\xfb\x57\x94\xf0\x65\x57\x1a\x20\xc9\x29\x22\x27\x32\x1f\x09\xca\x0a\xa4\x31\x9b\xd5\x1e\xcb\xed\xc2\x54\x68\x77\x75\xba\xdb\xc3\x90\x5f\x1f\xb5\xb1\x07\x0f\x86\x55\xb4\xbb\xd2\x84\x0b\xed\xaa\xa2\xea\xd5\x7b\xaf\x91\x53\x18\x7f\xbf\x4f\x92\xc2\x07\x56\x64\x3c\x15\x10\x04\xc2\x96\x60\x6e\x51\x6d\x09\x32\xd9\x84\x3d\x3a\x82\x07\xa9\x4d\x81\x81\xc5\xc0\xbb\x79\xf6\xf0\x1e\x6a\x53\x90\x03\x [...]
},
"/rbac/patch-role-to-clusterrole.yaml": &vfsgen۰CompressedFileInfo{
name: "patch-role-to-clusterrole.yaml",
@@ -481,9 +481,9 @@ var assets = func() http.FileSystem {
"/rbac/user-cluster-role.yaml": &vfsgen۰CompressedFileInfo{
name: "user-cluster-role.yaml",
modTime: time.Time{},
- uncompressedSize: 1804,
+ uncompressedSize: 1947,
- compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x54\xc1\x6e\xe3\x36\x10\xbd\xeb\x2b\x1e\xa4\xcb\x2e\x10\xcb\x6d\x4f\x85\x7b\x72\xb3\x49\x6b\x74\x61\x03\x91\xb7\x8b\x3d\x8e\xc5\xb1\x34\x30\x45\xaa\x24\x15\x6d\xfa\xf5\x05\x29\x2b\x76\x9a\xcd\xa1\xc5\xfa\x62\x6a\x66\xf4\xe6\xbd\x37\x43\x15\x58\x7c\xbf\x5f\x56\xe0\xa3\xd4\x6c\x3c\x2b\x04\x8b\xd0\x32\xd6\x3d\xd5\x2d\xa3\xb2\xc7\x30\x92\x63\xdc\xdb\xc1\x28\x0a\x62\x0d\xde\xad\xab\xfb\xf7\x18\x8c\x62\x07\x6b\x18\xd6\x [...]
+ compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x54\x4d\x8f\xdb\x36\x10\xbd\xeb\x57\x3c\x48\x97\x04\x58\xdb\x6d\x4f\x85\x7b\x72\xf7\xa3\x35\x1a\xd8\xc0\xca\x69\x90\xe3\x58\x1c\x4b\x84\x29\x52\x25\x47\xab\x6c\x7f\x7d\x41\xda\x5a\x6b\xe3\xec\xa1\x41\x7c\x31\xf5\x66\x34\xf3\xde\xbc\xa1\x0a\xcc\x7e\xdc\x2f\x2b\xf0\x41\x57\x6c\x03\x2b\x88\x83\x34\x8c\x55\x47\x55\xc3\x28\xdd\x41\x06\xf2\x8c\x07\xd7\x5b\x45\xa2\x9d\xc5\xbb\x55\xf9\xf0\x1e\xbd\x55\xec\xe1\x2c\xc3\x79\x [...]
},
"/samples": &vfsgen۰DirInfo{
name: "samples",
diff --git a/pkg/trait/util.go b/pkg/trait/util.go
index c1b26a1d3..fa7d03229 100644
--- a/pkg/trait/util.go
+++ b/pkg/trait/util.go
@@ -34,6 +34,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/v2/pkg/client"
"github.com/apache/camel-k/v2/pkg/metadata"
"github.com/apache/camel-k/v2/pkg/util"
@@ -387,6 +388,21 @@ func BindingsHaveSameTraits(i1 *v1.Binding, i2 *v1.Binding) (bool, error) {
return Equals(c1, c2), nil
}
+// KameletBindingsHaveSameTraits return if traits are the same.
+// Deprecated
+func KameletBindingsHaveSameTraits(i1 *v1alpha1.KameletBinding, i2 *v1alpha1.KameletBinding) (bool, error) {
+ c1, err := NewTraitsOptionsForKameletBinding(i1)
+ if err != nil {
+ return false, err
+ }
+ c2, err := NewTraitsOptionsForKameletBinding(i2)
+ if err != nil {
+ return false, err
+ }
+
+ return Equals(c1, c2), nil
+}
+
// IntegrationAndBindingSameTraits return if traits are the same.
// The comparison is done for the subset of traits defines on the binding as during the trait processing,
// some traits may be added to the Integration i.e. knative configuration in case of sink binding.
@@ -410,6 +426,30 @@ func IntegrationAndBindingSameTraits(i1 *v1.Integration, i2 *v1.Binding) (bool,
return Equals(klbOpts, toCompare), nil
}
+// IntegrationAndBindingSameTraits return if traits are the same.
+// The comparison is done for the subset of traits defines on the binding as during the trait processing,
+// some traits may be added to the Integration i.e. knative configuration in case of sink binding.
+// Deprecated
+func IntegrationAndKameletBindingSameTraits(i1 *v1.Integration, i2 *v1alpha1.KameletBinding) (bool, error) {
+ itOpts, err := NewTraitsOptionsForIntegration(i1)
+ if err != nil {
+ return false, err
+ }
+ klbOpts, err := NewTraitsOptionsForKameletBinding(i2)
+ if err != nil {
+ return false, err
+ }
+
+ toCompare := make(Options)
+ for k := range klbOpts {
+ if v, ok := itOpts[k]; ok {
+ toCompare[k] = v
+ }
+ }
+
+ return Equals(klbOpts, toCompare), nil
+}
+
// IntegrationAndKitHaveSameTraits return if traits are the same.
func IntegrationAndKitHaveSameTraits(i1 *v1.Integration, i2 *v1.IntegrationKit) (bool, error) {
itOpts, err := NewTraitsOptionsForIntegration(i1)
@@ -505,6 +545,34 @@ func NewTraitsOptionsForBinding(i *v1.Binding) (Options, error) {
return m1, nil
}
+// Deprecated
+func NewTraitsOptionsForKameletBinding(i *v1alpha1.KameletBinding) (Options, error) {
+ if i.Spec.Integration != nil {
+ m1, err := ToTraitMap(i.Spec.Integration.Traits)
+ if err != nil {
+ return nil, err
+ }
+
+ m2, err := FromAnnotations(&i.ObjectMeta)
+ if err != nil {
+ return nil, err
+ }
+
+ for k, v := range m2 {
+ m1[k] = v
+ }
+
+ return m1, nil
+ }
+
+ m1, err := FromAnnotations(&i.ObjectMeta)
+ if err != nil {
+ return nil, err
+ }
+
+ return m1, nil
+}
+
func FromAnnotations(meta *metav1.ObjectMeta) (Options, error) {
options := make(Options)
diff --git a/pkg/util/bindings/api_support.go b/pkg/util/bindings/api_support.go
index e103b0d84..c06b5a123 100644
--- a/pkg/util/bindings/api_support.go
+++ b/pkg/util/bindings/api_support.go
@@ -40,3 +40,13 @@ func (c EndpointContext) GenerateID() string {
}
return id
}
+
+// GenerateID generates an identifier based on the context type and its optional position.
+// Deprecated
+func (c V1alpha1EndpointContext) GenerateID() string {
+ id := string(c.Type)
+ if c.Position != nil {
+ id = fmt.Sprintf("%s-%d", id, *c.Position)
+ }
+ return id
+}
diff --git a/pkg/util/bindings/api_v1alpha1.go b/pkg/util/bindings/api_v1alpha1.go
new file mode 100644
index 000000000..7639a2f91
--- /dev/null
+++ b/pkg/util/bindings/api_v1alpha1.go
@@ -0,0 +1,55 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package bindings provides APIs to transform Kubernetes objects into Camel URIs equivalents
+package bindings
+
+import (
+ "context"
+
+ v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ v1alpha1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
+ "github.com/apache/camel-k/v2/pkg/client"
+)
+
+// V1alpha1BindingProvider maps a Binding endpoint into Camel K resources.
+// Deprecated
+type V1alpha1BindingProvider interface {
+ // ID returns the name of the binding provider
+ ID() string
+ // Translate does the actual mapping
+ Translate(ctx V1alpha1BindingContext, endpointContext V1alpha1EndpointContext, endpoint v1alpha1.Endpoint) (*Binding, error)
+ // Order returns the relative order of execution of the binding provider
+ Order() int
+}
+
+// V1alpha1BindingContext --
+// nolint: containedctx
+// Deprecated
+type V1alpha1BindingContext struct {
+ Ctx context.Context
+ Client client.Client
+ Namespace string
+ Profile v1.TraitProfile
+}
+
+// V1alpha1EndpointContext --
+// Deprecated
+type V1alpha1EndpointContext struct {
+ Type v1alpha1.EndpointType
+ Position *int
+}
diff --git a/pkg/util/bindings/catalog.go b/pkg/util/bindings/catalog.go
index 6f7eb6c56..ce3199fed 100644
--- a/pkg/util/bindings/catalog.go
+++ b/pkg/util/bindings/catalog.go
@@ -22,10 +22,12 @@ import (
"sort"
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
)
var bindingProviders []BindingProvider
+// RegisterBindingProvider --
func RegisterBindingProvider(bp BindingProvider) {
bindingProviders = append(bindingProviders, bp)
sort.Slice(bindingProviders, func(i, j int) bool {
@@ -36,6 +38,21 @@ func RegisterBindingProvider(bp BindingProvider) {
})
}
+// Deprecated
+var v1alpha1BindingProviders []V1alpha1BindingProvider
+
+// V1alpha1RegisterBindingProvider --
+// Deprecated
+func V1alpha1RegisterBindingProvider(bp V1alpha1BindingProvider) {
+ v1alpha1BindingProviders = append(v1alpha1BindingProviders, bp)
+ sort.Slice(v1alpha1BindingProviders, func(i, j int) bool {
+ bi := v1alpha1BindingProviders[i]
+ bj := v1alpha1BindingProviders[j]
+ return (bi.Order() < bj.Order()) ||
+ (bi.Order() == bj.Order() && bi.ID() < bj.ID())
+ })
+}
+
// Translate execute all chained binding providers, returning the first success or the first error.
func Translate(ctx BindingContext, endpointCtx EndpointContext, endpoint v1.Endpoint) (*Binding, error) {
if err := validateEndpoint(ctx, endpoint); err != nil {
@@ -58,7 +75,36 @@ func validateEndpoint(ctx BindingContext, e v1.Endpoint) error {
return errors.New("cannot use both ref and URI to specify an endpoint: only one of them should be used")
}
if e.Ref != nil && e.Ref.Namespace != "" && e.Ref.Namespace != ctx.Namespace {
- return errors.New("cross-namespace references are not allowed inBinding")
+ return errors.New("cross-namespace references are not allowed in Binding")
+ }
+ return nil
+}
+
+// TranslateV1alpha1 execute all chained binding providers, returning the first success or the first error.
+// Deprecated
+func TranslateV1alpha1(ctx V1alpha1BindingContext, endpointCtx V1alpha1EndpointContext, endpoint v1alpha1.Endpoint) (*Binding, error) {
+ if err := validateEndpointV1alpha1(ctx, endpoint); err != nil {
+ return nil, err
+ }
+
+ for _, bp := range v1alpha1BindingProviders {
+ b, err := bp.Translate(ctx, endpointCtx, endpoint)
+ if b != nil || err != nil {
+ return b, err
+ }
+ }
+ return nil, nil
+}
+
+// Deprecated
+func validateEndpointV1alpha1(ctx V1alpha1BindingContext, e v1alpha1.Endpoint) error {
+ if e.Ref == nil && e.URI == nil {
+ return errors.New("no ref or URI specified in endpoint")
+ } else if e.Ref != nil && e.URI != nil {
+ return errors.New("cannot use both ref and URI to specify an endpoint: only one of them should be used")
+ }
+ if e.Ref != nil && e.Ref.Namespace != "" && e.Ref.Namespace != ctx.Namespace {
+ return errors.New("cross-namespace references are not allowed in KameletBinding")
}
return nil
}
diff --git a/pkg/util/bindings/v1alpha1_kamelet.go b/pkg/util/bindings/v1alpha1_kamelet.go
new file mode 100644
index 000000000..682db9317
--- /dev/null
+++ b/pkg/util/bindings/v1alpha1_kamelet.go
@@ -0,0 +1,164 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package bindings
+
+import (
+ "fmt"
+ "net/url"
+
+ v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ v1alpha1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
+
+ "k8s.io/apimachinery/pkg/runtime/schema"
+)
+
+// V1alpha1BindingConverter converts a reference to a Kamelet into a Camel URI.
+// Deprecated
+type V1alpha1BindingConverter struct{}
+
+func (k V1alpha1BindingConverter) ID() string {
+ return "kamelet"
+}
+
+func (k V1alpha1BindingConverter) Translate(ctx V1alpha1BindingContext, endpointCtx V1alpha1EndpointContext, e v1alpha1.Endpoint) (*Binding, error) {
+ if e.Ref == nil {
+ // works only on refs
+ return nil, nil
+ }
+ gv, err := schema.ParseGroupVersion(e.Ref.APIVersion)
+ if err != nil {
+ return nil, err
+ }
+ // it translates only Kamelet refs
+ if e.Ref.Kind == v1.KameletKind && gv.Group == v1.SchemeGroupVersion.Group {
+ kameletName := url.PathEscape(e.Ref.Name)
+
+ props, err := e.Properties.GetPropertyMap()
+ if err != nil {
+ return nil, err
+ }
+
+ id, idPresent := props[v1.KameletIDProperty]
+ if idPresent {
+ delete(props, v1.KameletIDProperty)
+ } else {
+ id = endpointCtx.GenerateID()
+ }
+
+ binding := Binding{}
+ binding.ApplicationProperties = make(map[string]string)
+ for k, v := range props {
+ propKey := fmt.Sprintf("camel.kamelet.%s.%s.%s", kameletName, id, k)
+ binding.ApplicationProperties[propKey] = v
+ }
+
+ switch endpointCtx.Type {
+ case v1alpha1.EndpointTypeAction:
+ steps := make([]map[string]interface{}, 0)
+
+ if in, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotIn); in != nil {
+ steps = append(steps, in)
+ for k, v := range applicationProperties {
+ binding.ApplicationProperties[k] = v
+ }
+ }
+
+ steps = append(steps, map[string]interface{}{
+ "kamelet": map[string]interface{}{
+ "name": fmt.Sprintf("%s/%s", kameletName, url.PathEscape(id)),
+ },
+ })
+
+ if out, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotOut); out != nil {
+ steps = append(steps, out)
+ for k, v := range applicationProperties {
+ binding.ApplicationProperties[k] = v
+ }
+ }
+
+ if len(steps) > 1 {
+ binding.Step = map[string]interface{}{
+ "pipeline": map[string]interface{}{
+ "id": fmt.Sprintf("%s-pipeline", id),
+ "steps": steps,
+ },
+ }
+ } else {
+ binding.Step = steps[0]
+ }
+ case v1alpha1.EndpointTypeSource:
+ if out, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotOut); out != nil {
+ binding.Step = out
+ for k, v := range applicationProperties {
+ binding.ApplicationProperties[k] = v
+ }
+ }
+
+ binding.URI = fmt.Sprintf("kamelet:%s/%s", kameletName, url.PathEscape(id))
+ case v1alpha1.EndpointTypeSink:
+ if in, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotIn); in != nil {
+ binding.Step = in
+ for k, v := range applicationProperties {
+ binding.ApplicationProperties[k] = v
+ }
+ }
+
+ binding.URI = fmt.Sprintf("kamelet:%s/%s", kameletName, url.PathEscape(id))
+ default:
+ binding.URI = fmt.Sprintf("kamelet:%s/%s", kameletName, url.PathEscape(id))
+ }
+
+ return &binding, nil
+ }
+ return nil, nil
+}
+
+func (k V1alpha1BindingConverter) DataTypeStep(e v1alpha1.Endpoint, id string, typeSlot v1alpha1.TypeSlot) (map[string]interface{}, map[string]string) {
+ if e.DataTypes == nil {
+ return nil, nil
+ }
+
+ if inDataType, ok := e.DataTypes[typeSlot]; ok {
+ scheme := "camel"
+ if inDataType.Scheme != "" {
+ scheme = inDataType.Scheme
+ }
+
+ props := make(map[string]string, 2)
+ props[fmt.Sprintf("camel.kamelet.%s.%s-%s.scheme", datTypeActionKamelet, id, typeSlot)] = scheme
+ props[fmt.Sprintf("camel.kamelet.%s.%s-%s.format", datTypeActionKamelet, id, typeSlot)] = inDataType.Format
+
+ stepDsl := map[string]interface{}{
+ "kamelet": map[string]interface{}{
+ "name": fmt.Sprintf("%s/%s-%s", datTypeActionKamelet, url.PathEscape(id), typeSlot),
+ },
+ }
+
+ return stepDsl, props
+ }
+
+ return nil, nil
+}
+
+func (k V1alpha1BindingConverter) Order() int {
+ return OrderStandard
+}
+
+func init() {
+ V1alpha1RegisterBindingProvider(V1alpha1BindingConverter{})
+}
diff --git a/pkg/util/log/log.go b/pkg/util/log/log.go
index 3bf028560..c3ef3ae11 100644
--- a/pkg/util/log/log.go
+++ b/pkg/util/log/log.go
@@ -21,6 +21,7 @@ import (
"fmt"
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
"github.com/go-logr/logr"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)
@@ -148,6 +149,17 @@ func (l Logger) ForBinding(target *v1.Binding) Logger {
)
}
+// ForKameletBinding --.
+// Deprecated: use ForBinding instead
+func (l Logger) ForKameletBinding(target *v1alpha1.KameletBinding) Logger {
+ return l.WithValues(
+ "api-version", target.APIVersion,
+ "kind", target.Kind,
+ "ns", target.Namespace,
+ "name", target.Name,
+ )
+}
+
// ForCatalog --.
func (l Logger) ForCatalog(target *v1.CamelCatalog) Logger {
return l.WithValues(