You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by pc...@apache.org on 2023/04/26 08:51:33 UTC
[camel-k] 17/18: fix(kamelets): register v1alpha1 binding providers
This is an automated email from the ASF dual-hosted git repository.
pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 247a8296f10356bf3f7b78c890272bfa6db860cd
Author: Pasquale Congiusti <pa...@gmail.com>
AuthorDate: Mon Apr 10 14:46:22 2023 +0200
fix(kamelets): register v1alpha1 binding providers
---
addons/register_strimzi.go | 1 +
addons/strimzi/strimzi.go | 103 +++++++++++++
config/crd/bases/camel.apache.org_pipes.yaml | 25 +++-
.../properties-binding.yaml | 2 +-
e2e/yaks/common/knative-sinkbinding/source.yaml | 2 +-
.../camel-k/crds/crd-pipe.yaml | 25 +++-
pkg/cmd/reset.go | 2 +-
pkg/util/bindings/camel_uri.go | 42 ++++++
pkg/util/bindings/catalog.go | 2 +-
pkg/util/bindings/kamelet.go | 146 ++++++++++++++++++
pkg/util/bindings/knative_ref.go | 82 +++++++++++
pkg/util/bindings/knative_uri.go | 77 ++++++++++
pkg/util/bindings/v1alpha1_kamelet.go | 164 ---------------------
script/gen_crd.sh | 2 +-
14 files changed, 492 insertions(+), 183 deletions(-)
diff --git a/addons/register_strimzi.go b/addons/register_strimzi.go
index 4fe8707cf..3d9922f9d 100644
--- a/addons/register_strimzi.go
+++ b/addons/register_strimzi.go
@@ -24,4 +24,5 @@ import (
func init() {
bindings.RegisterBindingProvider(strimzi.BindingProvider{})
+ bindings.V1alpha1RegisterBindingProvider(strimzi.V1alpha1BindingProvider{})
}
diff --git a/addons/strimzi/strimzi.go b/addons/strimzi/strimzi.go
index 4207f191a..e7558da80 100644
--- a/addons/strimzi/strimzi.go
+++ b/addons/strimzi/strimzi.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/camel-k/v2/addons/strimzi/duck/v1beta2"
camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ camelv1alpha1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/v2/pkg/util/bindings"
"github.com/apache/camel-k/v2/pkg/util/uri"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -119,6 +120,108 @@ func (s BindingProvider) getBootstrapServers(ctx bindings.BindingContext, cluste
return "", fmt.Errorf("cluster %q has no listeners of type %q", clusterName, v1beta2.StrimziListenerTypePlain)
}
+// Order --.
func (s BindingProvider) Order() int {
return bindings.OrderStandard
}
+
+// V1alpha1BindingProvider allows to connect to a Kafka topic via Binding.
+// Deprecated.
+type V1alpha1BindingProvider struct {
+ Client internalclientset.Interface
+}
+
+// ID --.
+// Deprecated.
+func (s V1alpha1BindingProvider) ID() string {
+ return "strimzi"
+}
+
+// Translate --.
+// Deprecated.
+func (s V1alpha1BindingProvider) Translate(ctx bindings.V1alpha1BindingContext, _ bindings.V1alpha1EndpointContext, endpoint camelv1alpha1.Endpoint) (*bindings.Binding, error) {
+ if endpoint.Ref == nil {
+ // React only on refs
+ return nil, nil
+ }
+ gv, err := schema.ParseGroupVersion(endpoint.Ref.APIVersion)
+ if err != nil {
+ return nil, err
+ }
+
+ if gv.Group != v1beta2.StrimziGroup || endpoint.Ref.Kind != v1beta2.StrimziKindTopic {
+ // Only operates on Strimzi Topics
+ return nil, nil
+ }
+
+ props, err := endpoint.Properties.GetPropertyMap()
+ if err != nil {
+ return nil, err
+ }
+ if props == nil {
+ props = make(map[string]string)
+ }
+
+ if props["brokers"] == "" {
+ // build the client if needed
+ if s.Client == nil {
+ kafkaClient, err := internalclientset.NewForConfig(ctx.Client.GetConfig())
+ if err != nil {
+ return nil, err
+ }
+ s.Client = kafkaClient
+ }
+
+ // look them up
+ topic, err := s.Client.KafkaV1beta2().KafkaTopics(ctx.Namespace).Get(ctx.Ctx, endpoint.Ref.Name, v1.GetOptions{})
+ if err != nil {
+ return nil, err
+ }
+
+ clusterName := topic.Labels[v1beta2.StrimziKafkaClusterLabel]
+ if clusterName == "" {
+ return nil, fmt.Errorf("no %q label defined on topic %s", v1beta2.StrimziKafkaClusterLabel, endpoint.Ref.Name)
+ }
+
+ bootstrapServers, err := s.getBootstrapServers(ctx, clusterName)
+ if err != nil {
+ return nil, err
+ }
+
+ props["brokers"] = bootstrapServers
+ }
+
+ kafkaURI := fmt.Sprintf("kafka:%s", endpoint.Ref.Name)
+ kafkaURI = uri.AppendParameters(kafkaURI, props)
+
+ return &bindings.Binding{
+ URI: kafkaURI,
+ }, nil
+}
+
+// getBootstrapServers --.
+// Deprecated.
+func (s V1alpha1BindingProvider) getBootstrapServers(ctx bindings.V1alpha1BindingContext, clusterName string) (string, error) {
+ cluster, err := s.Client.KafkaV1beta2().Kafkas(ctx.Namespace).Get(ctx.Ctx, clusterName, v1.GetOptions{})
+ if err != nil {
+ return "", err
+ }
+
+ for _, l := range cluster.Status.Listeners {
+ if l.Type == v1beta2.StrimziListenerTypePlain {
+ if l.BootstrapServers == "" {
+ return "", fmt.Errorf("cluster %q has no bootstrap servers in %q listener", clusterName, v1beta2.StrimziListenerTypePlain)
+ }
+
+ return l.BootstrapServers, nil
+ }
+ }
+
+ return "", fmt.Errorf("cluster %q has no listeners of type %q", clusterName, v1beta2.StrimziListenerTypePlain)
+}
+
+// Order --.
+// Deprecated.
+func (s V1alpha1BindingProvider) Order() int {
+ return bindings.OrderStandard
+}
diff --git a/config/crd/bases/camel.apache.org_pipes.yaml b/config/crd/bases/camel.apache.org_pipes.yaml
index e9db2996e..a9fb01df8 100644
--- a/config/crd/bases/camel.apache.org_pipes.yaml
+++ b/config/crd/bases/camel.apache.org_pipes.yaml
@@ -1,11 +1,28 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
----
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.6.1
creationTimestamp: null
+ labels:
+ app: camel-k
name: pipes.camel.apache.org
spec:
group: camel.apache.org
@@ -8512,9 +8529,3 @@ spec:
specReplicasPath: .spec.replicas
statusReplicasPath: .status.replicas
status: {}
-status:
- acceptedNames:
- kind: ""
- plural: ""
- conditions: []
- storedVersions: []
diff --git a/e2e/yaks/common/kamelet-binding-property-encoding/properties-binding.yaml b/e2e/yaks/common/kamelet-binding-property-encoding/properties-binding.yaml
index aa1823da0..f7f9f069e 100644
--- a/e2e/yaks/common/kamelet-binding-property-encoding/properties-binding.yaml
+++ b/e2e/yaks/common/kamelet-binding-property-encoding/properties-binding.yaml
@@ -16,7 +16,7 @@
# ---------------------------------------------------------------------------
kind: KameletBinding
-apiVersion: camel.apache.org/v1
+apiVersion: camel.apache.org/v1alpha1
metadata:
name: properties-binding
spec:
diff --git a/e2e/yaks/common/knative-sinkbinding/source.yaml b/e2e/yaks/common/knative-sinkbinding/source.yaml
index f37427147..9ec23fa81 100644
--- a/e2e/yaks/common/knative-sinkbinding/source.yaml
+++ b/e2e/yaks/common/knative-sinkbinding/source.yaml
@@ -21,7 +21,7 @@
period: "1000"
steps:
- set-body:
- constant: "Hello SinkKameletBinding !!!"
+ constant: "Hello SinkBinding !!!"
- transform:
simple: "${body.toUpperCase()}"
- to: "log:info"
diff --git a/config/crd/bases/camel.apache.org_pipes.yaml b/helm/camel-k/crds/crd-pipe.yaml
similarity index 99%
copy from config/crd/bases/camel.apache.org_pipes.yaml
copy to helm/camel-k/crds/crd-pipe.yaml
index e9db2996e..a9fb01df8 100644
--- a/config/crd/bases/camel.apache.org_pipes.yaml
+++ b/helm/camel-k/crds/crd-pipe.yaml
@@ -1,11 +1,28 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
----
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.6.1
creationTimestamp: null
+ labels:
+ app: camel-k
name: pipes.camel.apache.org
spec:
group: camel.apache.org
@@ -8512,9 +8529,3 @@ spec:
specReplicasPath: .spec.replicas
statusReplicasPath: .status.replicas
status: {}
-status:
- acceptedNames:
- kind: ""
- plural: ""
- conditions: []
- storedVersions: []
diff --git a/pkg/cmd/reset.go b/pkg/cmd/reset.go
index 35c9f78a3..8f08dd0ad 100644
--- a/pkg/cmd/reset.go
+++ b/pkg/cmd/reset.go
@@ -68,7 +68,7 @@ func (o *resetCmdOptions) reset(cmd *cobra.Command, _ []string) {
fmt.Fprint(cmd.ErrOrStderr(), err)
return
}
- fmt.Fprintln(cmd.OutOrStdout(), n, "bindings deleted from namespace", o.Namespace)
+ fmt.Fprintln(cmd.OutOrStdout(), n, "pipes deleted from namespace", o.Namespace)
if n, err = o.deleteAllKameletBindings(c); err != nil {
fmt.Fprint(cmd.ErrOrStderr(), err)
diff --git a/pkg/util/bindings/camel_uri.go b/pkg/util/bindings/camel_uri.go
index 9c7cb56bb..8dad742ad 100644
--- a/pkg/util/bindings/camel_uri.go
+++ b/pkg/util/bindings/camel_uri.go
@@ -19,6 +19,7 @@ package bindings
import (
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ v1alpha1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/v2/pkg/util/uri"
)
@@ -26,10 +27,12 @@ import (
// It's used as fallback if the URI scheme is not known by other providers.
type CamelURIBindingProvider struct{}
+// ID --.
func (k CamelURIBindingProvider) ID() string {
return "camel-uri"
}
+// Translate --.
func (k CamelURIBindingProvider) Translate(ctx BindingContext, endpointCtx EndpointContext, e v1.Endpoint) (*Binding, error) {
if e.URI == nil {
// works only on uris
@@ -48,11 +51,50 @@ func (k CamelURIBindingProvider) Translate(ctx BindingContext, endpointCtx Endpo
}, nil
}
+// Order --.
func (k CamelURIBindingProvider) Order() int {
// Using it as fallback
return OrderLast
}
+// V1alpha1CamelURIBindingProvider --.
+// Deprecated .
+type V1alpha1CamelURIBindingProvider struct{}
+
+// ID --.
+// Deprecated .
+func (k V1alpha1CamelURIBindingProvider) ID() string {
+ return "camel-uri"
+}
+
+// Translate --.
+// Deprecated .
+func (k V1alpha1CamelURIBindingProvider) Translate(ctx V1alpha1BindingContext, endpointCtx V1alpha1EndpointContext, e v1alpha1.Endpoint) (*Binding, error) {
+ if e.URI == nil {
+ // works only on uris
+ return nil, nil
+ }
+
+ endpointURI := *e.URI
+ props, err := e.Properties.GetPropertyMap()
+ if err != nil {
+ return nil, err
+ }
+ endpointURI = uri.AppendParameters(endpointURI, props)
+
+ return &Binding{
+ URI: endpointURI,
+ }, nil
+}
+
+// Order --
+// Deprecated .
+func (k V1alpha1CamelURIBindingProvider) Order() int {
+ // Using it as fallback
+ return OrderLast
+}
+
func init() {
RegisterBindingProvider(CamelURIBindingProvider{})
+ V1alpha1RegisterBindingProvider(V1alpha1CamelURIBindingProvider{})
}
diff --git a/pkg/util/bindings/catalog.go b/pkg/util/bindings/catalog.go
index a2119bb4b..7cd2552b0 100644
--- a/pkg/util/bindings/catalog.go
+++ b/pkg/util/bindings/catalog.go
@@ -75,7 +75,7 @@ func validateEndpoint(ctx BindingContext, e v1.Endpoint) error {
return errors.New("cannot use both ref and URI to specify an endpoint: only one of them should be used")
}
if e.Ref != nil && e.Ref.Namespace != "" && e.Ref.Namespace != ctx.Namespace {
- return errors.New("cross-namespace references are not allowed in Binding")
+ return errors.New("cross-namespace references are not allowed in Pipe")
}
return nil
}
diff --git a/pkg/util/bindings/kamelet.go b/pkg/util/bindings/kamelet.go
index 71a2c52ee..b25164bee 100644
--- a/pkg/util/bindings/kamelet.go
+++ b/pkg/util/bindings/kamelet.go
@@ -22,6 +22,7 @@ import (
"net/url"
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ v1alpha1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
@@ -33,10 +34,12 @@ const (
// BindingConverter converts a reference to a Kamelet into a Camel URI.
type BindingConverter struct{}
+// ID --.
func (k BindingConverter) ID() string {
return "kamelet"
}
+// Translate --.
func (k BindingConverter) Translate(ctx BindingContext, endpointCtx EndpointContext, e v1.Endpoint) (*Binding, error) {
if e.Ref == nil {
// works only on refs
@@ -130,6 +133,7 @@ func (k BindingConverter) Translate(ctx BindingContext, endpointCtx EndpointCont
return nil, nil
}
+// DataTypeStep --.
func (k BindingConverter) DataTypeStep(e v1.Endpoint, id string, typeSlot v1.TypeSlot) (map[string]interface{}, map[string]string) {
if e.DataTypes == nil {
return nil, nil
@@ -157,10 +161,152 @@ func (k BindingConverter) DataTypeStep(e v1.Endpoint, id string, typeSlot v1.Typ
return nil, nil
}
+// Order --.
func (k BindingConverter) Order() int {
return OrderStandard
}
+// V1alpha1BindingConverter converts a reference to a Kamelet into a Camel URI.
+// Deprecated.
+type V1alpha1BindingConverter struct{}
+
+// ID -- .
+// Deprecated.
+func (k V1alpha1BindingConverter) ID() string {
+ return "kamelet"
+}
+
+// Translate -- .
+// Deprecated.
+func (k V1alpha1BindingConverter) Translate(ctx V1alpha1BindingContext, endpointCtx V1alpha1EndpointContext, e v1alpha1.Endpoint) (*Binding, error) {
+ if e.Ref == nil {
+ // works only on refs
+ return nil, nil
+ }
+ gv, err := schema.ParseGroupVersion(e.Ref.APIVersion)
+ if err != nil {
+ return nil, err
+ }
+ // it translates only Kamelet refs
+ if e.Ref.Kind == v1.KameletKind && gv.Group == v1.SchemeGroupVersion.Group {
+ kameletName := url.PathEscape(e.Ref.Name)
+
+ props, err := e.Properties.GetPropertyMap()
+ if err != nil {
+ return nil, err
+ }
+
+ id, idPresent := props[v1.KameletIDProperty]
+ if idPresent {
+ delete(props, v1.KameletIDProperty)
+ } else {
+ id = endpointCtx.GenerateID()
+ }
+
+ binding := Binding{}
+ binding.ApplicationProperties = make(map[string]string)
+ for k, v := range props {
+ propKey := fmt.Sprintf("camel.kamelet.%s.%s.%s", kameletName, id, k)
+ binding.ApplicationProperties[propKey] = v
+ }
+
+ switch endpointCtx.Type {
+ case v1alpha1.EndpointTypeAction:
+ steps := make([]map[string]interface{}, 0)
+
+ if in, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotIn); in != nil {
+ steps = append(steps, in)
+ for k, v := range applicationProperties {
+ binding.ApplicationProperties[k] = v
+ }
+ }
+
+ steps = append(steps, map[string]interface{}{
+ "kamelet": map[string]interface{}{
+ "name": fmt.Sprintf("%s/%s", kameletName, url.PathEscape(id)),
+ },
+ })
+
+ if out, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotOut); out != nil {
+ steps = append(steps, out)
+ for k, v := range applicationProperties {
+ binding.ApplicationProperties[k] = v
+ }
+ }
+
+ if len(steps) > 1 {
+ binding.Step = map[string]interface{}{
+ "pipeline": map[string]interface{}{
+ "id": fmt.Sprintf("%s-pipeline", id),
+ "steps": steps,
+ },
+ }
+ } else {
+ binding.Step = steps[0]
+ }
+ case v1alpha1.EndpointTypeSource:
+ if out, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotOut); out != nil {
+ binding.Step = out
+ for k, v := range applicationProperties {
+ binding.ApplicationProperties[k] = v
+ }
+ }
+
+ binding.URI = fmt.Sprintf("kamelet:%s/%s", kameletName, url.PathEscape(id))
+ case v1alpha1.EndpointTypeSink:
+ if in, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotIn); in != nil {
+ binding.Step = in
+ for k, v := range applicationProperties {
+ binding.ApplicationProperties[k] = v
+ }
+ }
+
+ binding.URI = fmt.Sprintf("kamelet:%s/%s", kameletName, url.PathEscape(id))
+ default:
+ binding.URI = fmt.Sprintf("kamelet:%s/%s", kameletName, url.PathEscape(id))
+ }
+
+ return &binding, nil
+ }
+ return nil, nil
+}
+
+// DataTypeStep -- .
+// Deprecated.
+func (k V1alpha1BindingConverter) DataTypeStep(e v1alpha1.Endpoint, id string, typeSlot v1alpha1.TypeSlot) (map[string]interface{}, map[string]string) {
+ if e.DataTypes == nil {
+ return nil, nil
+ }
+
+ if inDataType, ok := e.DataTypes[typeSlot]; ok {
+ scheme := "camel"
+ if inDataType.Scheme != "" {
+ scheme = inDataType.Scheme
+ }
+
+ props := make(map[string]string, 2)
+ props[fmt.Sprintf("camel.kamelet.%s.%s-%s.scheme", datTypeActionKamelet, id, typeSlot)] = scheme
+ props[fmt.Sprintf("camel.kamelet.%s.%s-%s.format", datTypeActionKamelet, id, typeSlot)] = inDataType.Format
+
+ stepDsl := map[string]interface{}{
+ "kamelet": map[string]interface{}{
+ "name": fmt.Sprintf("%s/%s-%s", datTypeActionKamelet, url.PathEscape(id), typeSlot),
+ },
+ }
+
+ return stepDsl, props
+ }
+
+ return nil, nil
+}
+
+// Order -- .
+// Deprecated.
+func (k V1alpha1BindingConverter) Order() int {
+ return OrderStandard
+}
+
func init() {
RegisterBindingProvider(BindingConverter{})
+ V1alpha1RegisterBindingProvider(V1alpha1BindingConverter{})
}
diff --git a/pkg/util/bindings/knative_ref.go b/pkg/util/bindings/knative_ref.go
index f835c0ede..2859692a4 100644
--- a/pkg/util/bindings/knative_ref.go
+++ b/pkg/util/bindings/knative_ref.go
@@ -24,6 +24,7 @@ import (
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
knativeapis "github.com/apache/camel-k/v2/pkg/apis/camel/v1/knative"
+ v1alpha1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/v2/pkg/util/knative"
"github.com/apache/camel-k/v2/pkg/util/uri"
@@ -33,10 +34,12 @@ import (
// It's used as fallback if no other providers can decode the object reference.
type KnativeRefBindingProvider struct{}
+// ID --.
func (k KnativeRefBindingProvider) ID() string {
return "knative-ref"
}
+// Translate --.
func (k KnativeRefBindingProvider) Translate(ctx BindingContext, endpointCtx EndpointContext, e v1.Endpoint) (*Binding, error) {
if e.Ref == nil {
// works only on refs
@@ -94,11 +97,90 @@ func (k KnativeRefBindingProvider) Translate(ctx BindingContext, endpointCtx End
}, nil
}
+// Order --.
func (k KnativeRefBindingProvider) Order() int {
// Executes as last, as it can be used as fallback for all unknown object references
return OrderLast
}
+// V1alpha1KnativeRefBindingProvider converts a reference to a Kubernetes object into a Camel URI.
+// It's used as fallback if no other providers can decode the object reference.
+// Deprecated.
+type V1alpha1KnativeRefBindingProvider struct{}
+
+// ID --.
+// Deprecated.
+func (k V1alpha1KnativeRefBindingProvider) ID() string {
+ return "knative-ref"
+}
+
+// Translate --.
+// Deprecated.
+func (k V1alpha1KnativeRefBindingProvider) Translate(ctx V1alpha1BindingContext, endpointCtx V1alpha1EndpointContext, e v1alpha1.Endpoint) (*Binding, error) {
+ if e.Ref == nil {
+ // works only on refs
+ return nil, nil
+ }
+
+ serviceType, err := knative.GetServiceType(*e.Ref)
+ if err != nil {
+ return nil, err
+ }
+
+ if serviceType == nil {
+ endpointType := knativeapis.CamelServiceTypeEndpoint
+ serviceType = &endpointType
+ }
+
+ props, err := e.Properties.GetPropertyMap()
+ if err != nil {
+ return nil, err
+ }
+ if props == nil {
+ props = make(map[string]string)
+ }
+ if props["apiVersion"] == "" {
+ props["apiVersion"] = e.Ref.APIVersion
+ }
+ if props["kind"] == "" {
+ props["kind"] = e.Ref.Kind
+ }
+
+ var serviceURI string
+ if *serviceType == knativeapis.CamelServiceTypeEvent {
+ if props["name"] == "" {
+ props["name"] = e.Ref.Name
+ }
+ if eventType, ok := props["type"]; ok {
+ // consume prop
+ delete(props, "type")
+ serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, eventType)
+ } else {
+ if endpointCtx.Type == v1alpha1.EndpointTypeSink || endpointCtx.Type == v1alpha1.EndpointTypeAction {
+ // Allowing no event type, but it can fail. See https://github.com/apache/camel-k/v2-runtime/issues/536
+ serviceURI = fmt.Sprintf("knative:%s", *serviceType)
+ } else {
+ return nil, errors.New(`property "type" must be provided when reading from the Broker`)
+ }
+ }
+ } else {
+ serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, url.PathEscape(e.Ref.Name))
+ }
+
+ serviceURI = uri.AppendParameters(serviceURI, props)
+ return &Binding{
+ URI: serviceURI,
+ }, nil
+}
+
+// Order --.
+// Deprecated.
+func (k V1alpha1KnativeRefBindingProvider) Order() int {
+ // Executes as last, as it can be used as fallback for all unknown object references
+ return OrderLast
+}
+
func init() {
RegisterBindingProvider(KnativeRefBindingProvider{})
+ V1alpha1RegisterBindingProvider(V1alpha1KnativeRefBindingProvider{})
}
diff --git a/pkg/util/bindings/knative_uri.go b/pkg/util/bindings/knative_uri.go
index 0621d2b8b..728dad28e 100644
--- a/pkg/util/bindings/knative_uri.go
+++ b/pkg/util/bindings/knative_uri.go
@@ -26,6 +26,7 @@ import (
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
knativeapis "github.com/apache/camel-k/v2/pkg/apis/camel/v1/knative"
traitv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait"
+ v1alpha1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/v2/pkg/util/uri"
)
@@ -33,10 +34,12 @@ import (
// KnativeURIBindingProvider converts a HTTP/HTTPS URI into a Camel Knative endpoint (to call it via CloudEvents).
type KnativeURIBindingProvider struct{}
+// ID --.
func (k KnativeURIBindingProvider) ID() string {
return "knative-uri"
}
+// Translate --.
func (k KnativeURIBindingProvider) Translate(ctx BindingContext, endpointCtx EndpointContext, e v1.Endpoint) (*Binding, error) {
if e.URI == nil {
// works only on uris
@@ -92,10 +95,84 @@ func (k KnativeURIBindingProvider) Translate(ctx BindingContext, endpointCtx End
}, nil
}
+// Order --.
func (k KnativeURIBindingProvider) Order() int {
return OrderStandard
}
+// V1alpha1KnativeURIBindingProvider converts a HTTP/HTTPS URI into a Camel Knative endpoint (to call it via CloudEvents).
+type V1alpha1KnativeURIBindingProvider struct{}
+
+// ID --.
+// Deprecated.
+func (k V1alpha1KnativeURIBindingProvider) ID() string {
+ return "knative-uri"
+}
+
+// Translate --.
+// Deprecated.
+func (k V1alpha1KnativeURIBindingProvider) Translate(ctx V1alpha1BindingContext, endpointCtx V1alpha1EndpointContext, e v1alpha1.Endpoint) (*Binding, error) {
+ if e.URI == nil {
+ // works only on uris
+ return nil, nil
+ }
+ if ctx.Profile != v1.TraitProfileKnative {
+ // use cloudevent binding only in Knative trait profile
+ return nil, nil
+ }
+ if !strings.HasPrefix(*e.URI, "http:") && !strings.HasPrefix(*e.URI, "https:") {
+ // only translates http/https uri to Knative calls
+ return nil, nil
+ }
+ if endpointCtx.Type == v1alpha1.EndpointTypeSource {
+ // HTTP/HTTPS uri are translated to Knative endpoints only when used as sinks
+ return nil, nil
+ }
+
+ originalURI, err := url.Parse(*e.URI)
+ if err != nil {
+ return nil, err
+ }
+ env := knativeapis.NewCamelEnvironment()
+ svc, err := knativeapis.BuildCamelServiceDefinition("sink",
+ knativeapis.CamelEndpointKindSink,
+ knativeapis.CamelServiceTypeEndpoint,
+ *originalURI, "", "")
+ if err != nil {
+ return nil, err
+ }
+ env.Services = append(env.Services, svc)
+ config, err := env.Serialize()
+ if err != nil {
+ return nil, err
+ }
+
+ // Rewrite URI to match the service definition
+ serviceURI := "knative:endpoint/sink"
+ props, err := e.Properties.GetPropertyMap()
+ if err != nil {
+ return nil, err
+ }
+ serviceURI = uri.AppendParameters(serviceURI, props)
+
+ return &Binding{
+ URI: serviceURI,
+ Traits: v1.Traits{
+ Knative: &traitv1.KnativeTrait{
+ Configuration: config,
+ SinkBinding: pointer.Bool(false),
+ },
+ },
+ }, nil
+}
+
+// Order --.
+// Deprecated.
+func (k V1alpha1KnativeURIBindingProvider) Order() int {
+ return OrderStandard
+}
+
func init() {
RegisterBindingProvider(KnativeURIBindingProvider{})
+ V1alpha1RegisterBindingProvider(V1alpha1KnativeURIBindingProvider{})
}
diff --git a/pkg/util/bindings/v1alpha1_kamelet.go b/pkg/util/bindings/v1alpha1_kamelet.go
deleted file mode 100644
index 30734fc7b..000000000
--- a/pkg/util/bindings/v1alpha1_kamelet.go
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package bindings
-
-import (
- "fmt"
- "net/url"
-
- v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
- v1alpha1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
-
- "k8s.io/apimachinery/pkg/runtime/schema"
-)
-
-// V1alpha1BindingConverter converts a reference to a Kamelet into a Camel URI.
-// Deprecated.
-type V1alpha1BindingConverter struct{}
-
-func (k V1alpha1BindingConverter) ID() string {
- return "kamelet"
-}
-
-func (k V1alpha1BindingConverter) Translate(ctx V1alpha1BindingContext, endpointCtx V1alpha1EndpointContext, e v1alpha1.Endpoint) (*Binding, error) {
- if e.Ref == nil {
- // works only on refs
- return nil, nil
- }
- gv, err := schema.ParseGroupVersion(e.Ref.APIVersion)
- if err != nil {
- return nil, err
- }
- // it translates only Kamelet refs
- if e.Ref.Kind == v1.KameletKind && gv.Group == v1.SchemeGroupVersion.Group {
- kameletName := url.PathEscape(e.Ref.Name)
-
- props, err := e.Properties.GetPropertyMap()
- if err != nil {
- return nil, err
- }
-
- id, idPresent := props[v1.KameletIDProperty]
- if idPresent {
- delete(props, v1.KameletIDProperty)
- } else {
- id = endpointCtx.GenerateID()
- }
-
- binding := Binding{}
- binding.ApplicationProperties = make(map[string]string)
- for k, v := range props {
- propKey := fmt.Sprintf("camel.kamelet.%s.%s.%s", kameletName, id, k)
- binding.ApplicationProperties[propKey] = v
- }
-
- switch endpointCtx.Type {
- case v1alpha1.EndpointTypeAction:
- steps := make([]map[string]interface{}, 0)
-
- if in, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotIn); in != nil {
- steps = append(steps, in)
- for k, v := range applicationProperties {
- binding.ApplicationProperties[k] = v
- }
- }
-
- steps = append(steps, map[string]interface{}{
- "kamelet": map[string]interface{}{
- "name": fmt.Sprintf("%s/%s", kameletName, url.PathEscape(id)),
- },
- })
-
- if out, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotOut); out != nil {
- steps = append(steps, out)
- for k, v := range applicationProperties {
- binding.ApplicationProperties[k] = v
- }
- }
-
- if len(steps) > 1 {
- binding.Step = map[string]interface{}{
- "pipeline": map[string]interface{}{
- "id": fmt.Sprintf("%s-pipeline", id),
- "steps": steps,
- },
- }
- } else {
- binding.Step = steps[0]
- }
- case v1alpha1.EndpointTypeSource:
- if out, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotOut); out != nil {
- binding.Step = out
- for k, v := range applicationProperties {
- binding.ApplicationProperties[k] = v
- }
- }
-
- binding.URI = fmt.Sprintf("kamelet:%s/%s", kameletName, url.PathEscape(id))
- case v1alpha1.EndpointTypeSink:
- if in, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotIn); in != nil {
- binding.Step = in
- for k, v := range applicationProperties {
- binding.ApplicationProperties[k] = v
- }
- }
-
- binding.URI = fmt.Sprintf("kamelet:%s/%s", kameletName, url.PathEscape(id))
- default:
- binding.URI = fmt.Sprintf("kamelet:%s/%s", kameletName, url.PathEscape(id))
- }
-
- return &binding, nil
- }
- return nil, nil
-}
-
-func (k V1alpha1BindingConverter) DataTypeStep(e v1alpha1.Endpoint, id string, typeSlot v1alpha1.TypeSlot) (map[string]interface{}, map[string]string) {
- if e.DataTypes == nil {
- return nil, nil
- }
-
- if inDataType, ok := e.DataTypes[typeSlot]; ok {
- scheme := "camel"
- if inDataType.Scheme != "" {
- scheme = inDataType.Scheme
- }
-
- props := make(map[string]string, 2)
- props[fmt.Sprintf("camel.kamelet.%s.%s-%s.scheme", datTypeActionKamelet, id, typeSlot)] = scheme
- props[fmt.Sprintf("camel.kamelet.%s.%s-%s.format", datTypeActionKamelet, id, typeSlot)] = inDataType.Format
-
- stepDsl := map[string]interface{}{
- "kamelet": map[string]interface{}{
- "name": fmt.Sprintf("%s/%s-%s", datTypeActionKamelet, url.PathEscape(id), typeSlot),
- },
- }
-
- return stepDsl, props
- }
-
- return nil, nil
-}
-
-func (k V1alpha1BindingConverter) Order() int {
- return OrderStandard
-}
-
-func init() {
- V1alpha1RegisterBindingProvider(V1alpha1BindingConverter{})
-}
diff --git a/script/gen_crd.sh b/script/gen_crd.sh
index aa2c70787..0cbffbaae 100755
--- a/script/gen_crd.sh
+++ b/script/gen_crd.sh
@@ -69,4 +69,4 @@ deploy_crd integration-kit integrationkits
deploy_crd integration-platform integrationplatforms
deploy_crd kamelet kamelets
deploy_crd kamelet-binding kameletbindings
-deploy_crd binding bindings
\ No newline at end of file
+deploy_crd pipe pipes
\ No newline at end of file