You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2020/08/07 12:21:31 UTC
[camel-k] 01/07: Fix #1548: initial work on sinkbinding
This is an automated email from the ASF dual-hosted git repository.
nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit d07ea17312f0134e3bf12bbd9a34c9a360888cd3
Author: Nicola Ferraro <ni...@gmail.com>
AuthorDate: Fri Jul 24 17:38:07 2020 +0200
Fix #1548: initial work on sinkbinding
---
...yaml => camel-catalog-1.5.0-SNAPSHOT-main.yaml} | 6 +-
...l => camel-catalog-1.5.0-SNAPSHOT-quarkus.yaml} | 6 +-
...el-k.v1.1.0-snapshot.clusterserviceversion.yaml | 1 +
deploy/operator-role-knative.yaml | 1 +
deploy/resources.go | 24 +--
examples/knative/messages-channel.yaml | 2 +-
examples/knative/words-channel.yaml | 2 +-
go.sum | 2 +
helm/camel-k/templates/operator-role.yaml | 1 +
pkg/apis/addtoscheme_knative_eventing.go | 2 +
pkg/trait/knative.go | 178 +++++++++++++++------
pkg/util/defaults/defaults.go | 2 +-
pkg/util/knative/knative.go | 39 ++++-
pkg/util/kubernetes/collection.go | 34 ++++
script/Makefile | 4 +-
15 files changed, 235 insertions(+), 69 deletions(-)
diff --git a/deploy/camel-catalog-1.4.1-main.yaml b/deploy/camel-catalog-1.5.0-SNAPSHOT-main.yaml
similarity index 99%
rename from deploy/camel-catalog-1.4.1-main.yaml
rename to deploy/camel-catalog-1.5.0-SNAPSHOT-main.yaml
index c7f9361..1ce39cb 100644
--- a/deploy/camel-catalog-1.4.1-main.yaml
+++ b/deploy/camel-catalog-1.5.0-SNAPSHOT-main.yaml
@@ -18,16 +18,16 @@
apiVersion: camel.apache.org/v1
kind: CamelCatalog
metadata:
- name: camel-catalog-1.4.1-main
+ name: camel-catalog-1.5.0-snapshot-main
labels:
app: camel-k
camel.apache.org/catalog.version: 3.4.0
camel.apache.org/catalog.loader.version: 3.4.0
- camel.apache.org/runtime.version: 1.4.1
+ camel.apache.org/runtime.version: 1.5.0-SNAPSHOT
camel.apache.org/runtime.provider: main
spec:
runtime:
- version: 1.4.1
+ version: 1.5.0-SNAPSHOT
provider: main
applicationClass: org.apache.camel.k.main.Application
metadata:
diff --git a/deploy/camel-catalog-1.4.1-quarkus.yaml b/deploy/camel-catalog-1.5.0-SNAPSHOT-quarkus.yaml
similarity index 99%
rename from deploy/camel-catalog-1.4.1-quarkus.yaml
rename to deploy/camel-catalog-1.5.0-SNAPSHOT-quarkus.yaml
index e8cde54..113288e 100644
--- a/deploy/camel-catalog-1.4.1-quarkus.yaml
+++ b/deploy/camel-catalog-1.5.0-SNAPSHOT-quarkus.yaml
@@ -18,16 +18,16 @@
apiVersion: camel.apache.org/v1
kind: CamelCatalog
metadata:
- name: camel-catalog-1.4.1-quarkus
+ name: camel-catalog-1.5.0-snapshot-quarkus
labels:
app: camel-k
camel.apache.org/catalog.version: 3.4.0
camel.apache.org/catalog.loader.version: 3.4.0
- camel.apache.org/runtime.version: 1.4.1
+ camel.apache.org/runtime.version: 1.5.0-SNAPSHOT
camel.apache.org/runtime.provider: quarkus
spec:
runtime:
- version: 1.4.1
+ version: 1.5.0-SNAPSHOT
provider: quarkus
applicationClass: io.quarkus.runner.GeneratedMain
metadata:
diff --git a/deploy/olm-catalog/camel-k-dev/1.1.0-snapshot/camel-k.v1.1.0-snapshot.clusterserviceversion.yaml b/deploy/olm-catalog/camel-k-dev/1.1.0-snapshot/camel-k.v1.1.0-snapshot.clusterserviceversion.yaml
index 4cc81c8..d761fee 100644
--- a/deploy/olm-catalog/camel-k-dev/1.1.0-snapshot/camel-k.v1.1.0-snapshot.clusterserviceversion.yaml
+++ b/deploy/olm-catalog/camel-k-dev/1.1.0-snapshot/camel-k.v1.1.0-snapshot.clusterserviceversion.yaml
@@ -411,6 +411,7 @@ spec:
- apiGroups:
- eventing.knative.dev
- messaging.knative.dev
+ - sources.knative.dev
resources:
- '*'
verbs:
diff --git a/deploy/operator-role-knative.yaml b/deploy/operator-role-knative.yaml
index 47ed6c6..36c6a44 100644
--- a/deploy/operator-role-knative.yaml
+++ b/deploy/operator-role-knative.yaml
@@ -38,6 +38,7 @@ rules:
- apiGroups:
- eventing.knative.dev
- messaging.knative.dev
+ - sources.knative.dev
resources:
- "*"
verbs:
diff --git a/deploy/resources.go b/deploy/resources.go
index c43e536..e1e547b 100644
--- a/deploy/resources.go
+++ b/deploy/resources.go
@@ -88,19 +88,19 @@ var assets = func() http.FileSystem {
compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x53\x3d\x6f\xdb\x30\x14\xdc\xf9\x2b\x0e\xd6\x92\x00\xfe\x68\x3b\xba\x93\x9a\xd8\xa8\xd0\xc0\x06\x22\xa7\x41\xc6\x67\xf1\x59\x7a\x08\x45\xaa\x24\x15\xc5\xff\xbe\xa0\x6c\x37\x09\xba\x86\x9b\xa0\xd3\x7d\xf0\x4e\x19\x66\x9f\x77\x54\x86\x3b\xa9\xd8\x06\xd6\x88\x0e\xb1\x61\xe4\x1d\x55\x0d\xa3\x74\x87\x38\x90\x67\xac\x5d\x6f\x35\x45\x71\x16\x57\x79\xb9\xbe\x46\x6f\x35\x7b\x38\xcb\x70\x1e\xad\xf3\xac\x32\x54\xce\x46\x2f\x [...]
},
- "/camel-catalog-1.4.1-main.yaml": &vfsgen۰CompressedFileInfo{
- name: "camel-catalog-1.4.1-main.yaml",
+ "/camel-catalog-1.5.0-SNAPSHOT-main.yaml": &vfsgen۰CompressedFileInfo{
+ name: "camel-catalog-1.5.0-SNAPSHOT-main.yaml",
modTime: time.Time{},
- uncompressedSize: 89151,
+ uncompressedSize: 89178,
- compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\xbd\x5b\x77\xdb\x38\xb2\x2f\xfe\x9e\x4f\xc1\x35\x79\xd9\x7b\xfd\x47\x35\xdd\xce\xec\xe9\xff\xee\xf3\x64\xcb\x71\x62\xc7\x76\xdc\x91\x27\xc9\xf4\x4b\x2f\x88\x84\x24\x48\x24\x41\x03\xd0\xc5\xfe\xf4\x67\xe1\xc2\xab\x20\x48\x24\x0c\xaf\xe3\x07\x93\x22\xaa\x7e\xc5\x22\x48\x5c\x0a\x85\xaa\xf7\xd1\xe8\xf5\xfe\xde\xbd\x8f\x6e\x49\x8c\x73\x8e\x93\x48\xd0\x48\x2c\x70\x74\x5e\xa0\x78\x81\xa3\x09\x9d\x89\x2d\x62\x38\xba\xa2\x [...]
+ compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x7d\x5b\x77\xdb\x38\xb2\xee\x7b\x7e\x05\xd7\xe4\x65\xef\x75\x86\x35\x3d\xce\xcc\xf4\xd9\x7d\x9e\x6c\x39\x4e\xec\xd8\x8e\x3b\xf2\x24\x99\x7e\xe9\x05\x91\x90\x04\x8b\x24\x68\x00\x92\x65\xff\xfa\xb3\x70\xe1\x55\x10\x24\x12\x86\xd7\xf6\x83\x49\x11\x55\x5f\xb1\x08\x12\x97\x42\xa1\xea\x7d\x14\xbf\xde\xdf\xbb\xf7\xd1\x35\x49\x70\xc1\x71\x1a\x09\x1a\x89\x25\x8e\x4e\x4b\x94\x2c\x71\x34\xa5\x73\xf1\x84\x18\x8e\x2e\xe8\xba\x [...]
},
- "/camel-catalog-1.4.1-quarkus.yaml": &vfsgen۰CompressedFileInfo{
- name: "camel-catalog-1.4.1-quarkus.yaml",
+ "/camel-catalog-1.5.0-SNAPSHOT-quarkus.yaml": &vfsgen۰CompressedFileInfo{
+ name: "camel-catalog-1.5.0-SNAPSHOT-quarkus.yaml",
modTime: time.Time{},
- uncompressedSize: 49007,
+ uncompressedSize: 49034,
- compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xc4\x5d\xdf\x77\xdb\xa8\xf2\x7f\xcf\x5f\xa1\xd3\xbc\xdc\x7b\xce\x8a\xdd\x4d\xf7\xee\x43\xbf\x4f\x49\xda\x74\x93\x36\x69\x1a\xe7\xb6\xdd\x7d\xe9\xc1\x12\xb6\x89\x25\x50\x00\xd9\x4e\xfe\xfa\xef\x01\x21\x4b\xb2\x95\xd1\x8f\x80\x6f\x1e\x22\x59\x0c\x9f\x81\x01\xc1\x68\x98\x81\xe3\x20\x74\xf7\x77\x74\x1c\x7c\xa6\x11\x61\x92\xc4\x81\xe2\x81\x5a\x90\xe0\x34\xc3\xd1\x82\x04\x13\x3e\x53\x6b\x2c\x48\x70\xc1\x73\x16\x63\x45\x39\x [...]
+ compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xc4\x7d\x5f\x77\xdb\x2c\xf2\xff\x7d\x5e\x85\x4e\x73\xb3\x7b\xce\x23\x9e\x6e\xba\xfb\x5c\xf4\x77\x95\xa4\x4d\x9b\xb4\x49\xd3\x38\xdb\x76\x9f\x9b\x1e\x2c\x61\x9b\x58\x02\x05\x90\xed\xe4\xd5\xff\x0e\x08\x59\x92\xad\x8c\xfe\x04\xfc\xcd\x45\x24\x8b\xe1\x33\x30\x20\x18\x0d\x33\x70\x1c\x84\xee\xfe\x8e\x8e\x83\xaf\x34\x22\x4c\x92\x38\x50\x3c\x50\x0b\x12\x9c\x66\x38\x5a\x90\x60\xc2\x67\x6a\x8d\x05\x09\x2e\x78\xce\x62\xac\x28\x [...]
},
"/cr-example.yaml": &vfsgen۰CompressedFileInfo{
name: "cr-example.yaml",
@@ -189,9 +189,9 @@ var assets = func() http.FileSystem {
"/operator-role-knative.yaml": &vfsgen۰CompressedFileInfo{
name: "operator-role-knative.yaml",
modTime: time.Time{},
- uncompressedSize: 1423,
+ uncompressedSize: 1447,
- compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xbc\x53\xc1\x8e\xdb\x36\x10\xbd\xf3\x2b\x1e\xac\x4b\x52\xac\xe5\xa6\xa7\xc2\x3d\xb9\xc9\x6e\x2b\x34\xb0\x81\x95\xd3\x20\xc7\x31\x35\x96\x06\xa6\x48\x75\x48\x59\xd9\x7e\x7d\x21\xda\x6e\x76\xb1\x87\x5e\x82\xf2\xe2\x31\xf5\xf4\xe6\xbd\x79\xa3\x02\xcb\xef\x77\x4c\x81\x8f\x62\xd9\x47\x6e\x90\x02\x52\xc7\xd8\x0c\x64\x3b\x46\x1d\x8e\x69\x22\x65\x3c\x84\xd1\x37\x94\x24\x78\xbc\xd9\xd4\x0f\x6f\x31\xfa\x86\x15\xc1\x33\x82\xa2\x [...]
+ compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xbc\x53\xc1\x8e\xdb\x36\x10\xbd\xf3\x2b\x1e\xac\x4b\x52\xac\xe5\xa6\xa7\xc2\x3d\xb9\x9b\xdd\x56\x68\x60\x03\x2b\xa7\x41\x8e\x63\x6a\x2c\x0d\x4c\x91\xea\x90\xb2\xb2\xfd\xfa\x42\xb2\xdc\x78\xb1\x45\x4f\x41\x78\xd1\x88\x7c\x7a\xf3\x1e\xdf\x28\xc3\xf2\xdb\x2d\x93\xe1\x83\x58\xf6\x91\x2b\xa4\x80\xd4\x30\x36\x1d\xd9\x86\x51\x86\x63\x1a\x48\x19\x8f\xa1\xf7\x15\x25\x09\x1e\x6f\x36\xe5\xe3\x5b\xf4\xbe\x62\x45\xf0\x8c\xa0\x68\x [...]
},
"/operator-role-kubernetes.yaml": &vfsgen۰CompressedFileInfo{
name: "operator-role-kubernetes.yaml",
@@ -372,8 +372,8 @@ var assets = func() http.FileSystem {
fs["/builder-role-kubernetes.yaml"].(os.FileInfo),
fs["/builder-role-openshift.yaml"].(os.FileInfo),
fs["/builder-service-account.yaml"].(os.FileInfo),
- fs["/camel-catalog-1.4.1-main.yaml"].(os.FileInfo),
- fs["/camel-catalog-1.4.1-quarkus.yaml"].(os.FileInfo),
+ fs["/camel-catalog-1.5.0-SNAPSHOT-main.yaml"].(os.FileInfo),
+ fs["/camel-catalog-1.5.0-SNAPSHOT-quarkus.yaml"].(os.FileInfo),
fs["/cr-example.yaml"].(os.FileInfo),
fs["/crd-build.yaml"].(os.FileInfo),
fs["/crd-camel-catalog.yaml"].(os.FileInfo),
diff --git a/examples/knative/messages-channel.yaml b/examples/knative/messages-channel.yaml
index fe44738..abd3483 100644
--- a/examples/knative/messages-channel.yaml
+++ b/examples/knative/messages-channel.yaml
@@ -15,7 +15,7 @@
# limitations under the License.
# ---------------------------------------------------------------------------
-apiVersion: messaging.knative.dev/v1alpha1
+apiVersion: messaging.knative.dev/v1beta1
kind: InMemoryChannel
metadata:
name: messages
diff --git a/examples/knative/words-channel.yaml b/examples/knative/words-channel.yaml
index 94e0ec1..33e5004 100644
--- a/examples/knative/words-channel.yaml
+++ b/examples/knative/words-channel.yaml
@@ -15,7 +15,7 @@
# limitations under the License.
# ---------------------------------------------------------------------------
-apiVersion: messaging.knative.dev/v1alpha1
+apiVersion: messaging.knative.dev/v1beta1
kind: InMemoryChannel
metadata:
name: words
diff --git a/go.sum b/go.sum
index e28b52d..c5bf1b1 100644
--- a/go.sum
+++ b/go.sum
@@ -1106,7 +1106,9 @@ github.com/radovskyb/watcher v1.0.6/go.mod h1:78okwvY5wPdzcb1UYnip1pvrZNIVEIh/Cm
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M=
+github.com/robfig/cron v0.0.0-20170526150127-736158dc09e1 h1:NZInwlJPD/G44mJDgBEMFvBfbv/QQKCrpo+az/QXn8c=
github.com/robfig/cron v0.0.0-20170526150127-736158dc09e1/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
+github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
diff --git a/helm/camel-k/templates/operator-role.yaml b/helm/camel-k/templates/operator-role.yaml
index 46948b2..d4f7ef4 100644
--- a/helm/camel-k/templates/operator-role.yaml
+++ b/helm/camel-k/templates/operator-role.yaml
@@ -203,6 +203,7 @@ rules:
- apiGroups:
- eventing.knative.dev
- messaging.knative.dev
+ - sources.knative.dev
resources:
- "*"
verbs:
diff --git a/pkg/apis/addtoscheme_knative_eventing.go b/pkg/apis/addtoscheme_knative_eventing.go
index ef0486b..de1ef1e 100644
--- a/pkg/apis/addtoscheme_knative_eventing.go
+++ b/pkg/apis/addtoscheme_knative_eventing.go
@@ -22,6 +22,7 @@ import (
eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1"
messagingv1alpha1 "knative.dev/eventing/pkg/apis/messaging/v1alpha1"
messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
+ sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1"
)
func init() {
@@ -30,4 +31,5 @@ func init() {
AddToSchemes = append(AddToSchemes, eventingv1beta1.AddToScheme)
AddToSchemes = append(AddToSchemes, messagingv1alpha1.AddToScheme)
AddToSchemes = append(AddToSchemes, messagingv1beta1.AddToScheme)
+ AddToSchemes = append(AddToSchemes, sourcesv1alpha1.AddToScheme)
}
diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go
index daf4d73..2aee1e2 100644
--- a/pkg/trait/knative.go
+++ b/pkg/trait/knative.go
@@ -18,23 +18,23 @@ limitations under the License.
package trait
import (
+ "fmt"
"net/url"
+ "reflect"
"strings"
- "github.com/pkg/errors"
-
- corev1 "k8s.io/api/core/v1"
- k8serrors "k8s.io/apimachinery/pkg/api/errors"
-
- eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
- serving "knative.dev/serving/pkg/apis/serving/v1"
-
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
knativeapi "github.com/apache/camel-k/pkg/apis/camel/v1/knative"
"github.com/apache/camel-k/pkg/metadata"
"github.com/apache/camel-k/pkg/util"
"github.com/apache/camel-k/pkg/util/envvar"
knativeutil "github.com/apache/camel-k/pkg/util/knative"
+ "github.com/pkg/errors"
+ corev1 "k8s.io/api/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/runtime"
+ eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
+ serving "knative.dev/serving/pkg/apis/serving/v1"
)
// The Knative trait automatically discovers addresses of Knative resources and inject them into the
@@ -73,6 +73,10 @@ type knativeTrait struct {
FilterSourceChannels *bool `property:"filter-source-channels" json:"filterSourceChannels,omitempty"`
// Enables Knative CamelSource pre 0.15 compatibility fixes (will be removed in future versions).
CamelSourceCompat *bool `property:"camel-source-compat" json:"camelSourceCompat,omitempty"`
+ // Allows binding the integration to a sink via a Knative SinkBinding resource.
+ // This can be used when the integration targets a single sink.
+ // It's disabled by default.
+ SinkBinding *bool `property:"sink-binding" json:"sinkBinding,omitempty"`
// Enable automatic discovery of all trait properties.
Auto *bool `property:"auto" json:"auto,omitempty"`
}
@@ -201,6 +205,10 @@ func (t *knativeTrait) Apply(e *Environment) error {
}
}
+ if t.SinkBinding != nil && *t.SinkBinding {
+ util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, "mvn:org.apache.camel.k/camel-k-runtime-knative")
+ }
+
if len(t.ChannelSources) > 0 || len(t.EndpointSources) > 0 || len(t.EventSources) > 0 {
util.StringSliceUniqueAdd(&e.Integration.Status.Capabilities, v1.CapabilityPlatformHTTP)
}
@@ -225,6 +233,9 @@ func (t *knativeTrait) Apply(e *Environment) error {
if err := t.configureEvents(e, &env); err != nil {
return err
}
+ if err := t.configureSinkBinding(e, &env); err != nil {
+ return err
+ }
conf, err := env.Serialize()
if err != nil {
@@ -268,19 +279,21 @@ func (t *knativeTrait) configureChannels(e *Environment, env *knativeapi.CamelEn
return err
}
- // Sinks
- err = t.ifServiceMissingDo(e, env, t.ChannelSinks, knativeapi.CamelServiceTypeChannel, knativeapi.CamelEndpointKindSink,
- func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error {
- svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
- knativeapi.CamelServiceTypeChannel, *loc, ref.APIVersion, ref.Kind)
- if err != nil {
- return err
- }
- env.Services = append(env.Services, svc)
- return nil
- })
- if err != nil {
- return err
+ if t.SinkBinding == nil || !*t.SinkBinding {
+ // Sinks
+ err = t.ifServiceMissingDo(e, env, t.ChannelSinks, knativeapi.CamelServiceTypeChannel, knativeapi.CamelEndpointKindSink,
+ func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error {
+ svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
+ knativeapi.CamelServiceTypeChannel, *loc, ref.APIVersion, ref.Kind)
+ if err != nil {
+ return err
+ }
+ env.Services = append(env.Services, svc)
+ return nil
+ })
+ if err != nil {
+ return err
+ }
}
return nil
@@ -319,18 +332,20 @@ func (t *knativeTrait) configureEndpoints(e *Environment, env *knativeapi.CamelE
}
// Sinks
- err := t.ifServiceMissingDo(e, env, t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint, knativeapi.CamelEndpointKindSink,
- func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error {
- svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
- knativeapi.CamelServiceTypeEndpoint, *loc, ref.APIVersion, ref.Kind)
- if err != nil {
- return err
- }
- env.Services = append(env.Services, svc)
- return nil
- })
- if err != nil {
- return err
+ if t.SinkBinding == nil || !*t.SinkBinding {
+ err := t.ifServiceMissingDo(e, env, t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint, knativeapi.CamelEndpointKindSink,
+ func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error {
+ svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
+ knativeapi.CamelServiceTypeEndpoint, *loc, ref.APIVersion, ref.Kind)
+ if err != nil {
+ return err
+ }
+ env.Services = append(env.Services, svc)
+ return nil
+ })
+ if err != nil {
+ return err
+ }
}
return nil
@@ -366,23 +381,96 @@ func (t *knativeTrait) configureEvents(e *Environment, env *knativeapi.CamelEnvi
}
// Sinks
- err = t.ifServiceMissingDo(e, env, t.EventSinks, knativeapi.CamelServiceTypeEvent, knativeapi.CamelEndpointKindSink,
- func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error {
- svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
- knativeapi.CamelServiceTypeEvent, *loc, ref.APIVersion, ref.Kind)
- if err != nil {
- return err
- }
- env.Services = append(env.Services, svc)
- return nil
- })
- if err != nil {
- return err
+ if t.SinkBinding == nil || !*t.SinkBinding {
+ err = t.ifServiceMissingDo(e, env, t.EventSinks, knativeapi.CamelServiceTypeEvent, knativeapi.CamelEndpointKindSink,
+ func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error {
+ svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
+ knativeapi.CamelServiceTypeEvent, *loc, ref.APIVersion, ref.Kind)
+ if err != nil {
+ return err
+ }
+ env.Services = append(env.Services, svc)
+ return nil
+ })
+ if err != nil {
+ return err
+ }
}
return nil
}
+func (t *knativeTrait) configureSinkBinding(e *Environment, env *knativeapi.CamelEnvironment) error {
+ if t.SinkBinding == nil || !*t.SinkBinding {
+ return nil
+ }
+ var serviceType knativeapi.CamelServiceType
+ services := t.extractServices(t.ChannelSinks, knativeapi.CamelServiceTypeChannel)
+ if len(services) > 0 {
+ serviceType = knativeapi.CamelServiceTypeChannel
+ }
+ services = append(services, t.extractServices(t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint)...)
+ if len(serviceType) == 0 && len(services) > 0 {
+ serviceType = knativeapi.CamelServiceTypeEndpoint
+ }
+ services = append(services, t.extractServices(t.EventSinks, knativeapi.CamelServiceTypeEvent)...)
+ if len(serviceType) == 0 && len(services) > 0 {
+ serviceType = knativeapi.CamelServiceTypeEvent
+ }
+
+ if len(services) != 1 {
+ return fmt.Errorf("sinkbinding can only be used with a single sink: found %d sinks", len(services))
+ }
+
+ err := t.withServiceDo(false, e, env, services, serviceType, knativeapi.CamelEndpointKindSink, func(ref *corev1.ObjectReference, url *url.URL, serviceURI string) error {
+ util.StringSliceUniqueAdd(&e.Interceptors, "knative-sink-binding")
+ e.ApplicationProperties["loader.interceptor.knative-sink-binding.name"] = ref.Name
+ e.ApplicationProperties["loader.interceptor.knative-sink-binding.type"] = string(serviceType)
+ e.ApplicationProperties["loader.interceptor.knative-sink-binding.kind"] = ref.Kind
+ e.ApplicationProperties["loader.interceptor.knative-sink-binding.api-version"] = ref.APIVersion
+
+ if e.IntegrationInPhase(v1.IntegrationPhaseDeploying) {
+ e.PostStepProcessors = append(e.PostStepProcessors, func(e *Environment) error {
+ sinkBindingInjected := false
+ e.Resources.Visit(func(object runtime.Object) {
+ gvk := object.GetObjectKind().GroupVersionKind()
+ if gvk.Kind == "SinkBinding" && strings.Contains(gvk.Group, "knative") {
+ sinkBindingInjected = true
+ }
+ })
+ if sinkBindingInjected {
+ return nil
+ }
+
+ controller := e.Resources.GetController(func(object runtime.Object) bool {
+ return true
+ })
+ if controller != nil && !reflect.ValueOf(controller).IsNil() {
+ gvk := controller.GetObjectKind().GroupVersionKind()
+ av, k := gvk.ToAPIVersionAndKind()
+ source := corev1.ObjectReference{
+ Kind: k,
+ Namespace: e.Integration.Namespace,
+ Name: e.Integration.Name,
+ APIVersion: av,
+ }
+ target := corev1.ObjectReference{
+ Kind: ref.Kind,
+ Namespace: e.Integration.Namespace,
+ Name: ref.Name,
+ APIVersion: ref.APIVersion,
+ }
+ e.Resources.Add(knativeutil.CreateSinkBinding(source, target))
+ }
+ return nil
+ })
+ }
+ return nil
+ })
+
+ return err
+}
+
func (t *knativeTrait) createTrigger(e *Environment, ref *corev1.ObjectReference, eventType string) {
// TODO extend to additional filters too, to filter them at source and not at destination
found := e.Resources.HasKnativeTrigger(func(trigger *eventing.Trigger) bool {
diff --git a/pkg/util/defaults/defaults.go b/pkg/util/defaults/defaults.go
index a2b2a84..0b1e481 100644
--- a/pkg/util/defaults/defaults.go
+++ b/pkg/util/defaults/defaults.go
@@ -26,7 +26,7 @@ const (
Version = "1.1.0-SNAPSHOT"
// DefaultRuntimeVersion --
- DefaultRuntimeVersion = "1.4.1"
+ DefaultRuntimeVersion = "1.5.0-SNAPSHOT"
// BuildahVersion --
BuildahVersion = "1.14.0"
diff --git a/pkg/util/knative/knative.go b/pkg/util/knative/knative.go
index e39eb54..5df1247 100644
--- a/pkg/util/knative/knative.go
+++ b/pkg/util/knative/knative.go
@@ -31,9 +31,12 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
- messaging "knative.dev/eventing/pkg/apis/messaging/v1alpha1"
+ messaging "knative.dev/eventing/pkg/apis/messaging/v1beta1"
+ sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1"
"knative.dev/pkg/apis/duck"
duckv1 "knative.dev/pkg/apis/duck/v1"
+ duckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1"
+ "knative.dev/pkg/tracker"
serving "knative.dev/serving/pkg/apis/serving/v1"
controller "sigs.k8s.io/controller-runtime/pkg/client"
)
@@ -98,6 +101,40 @@ func CreateTrigger(brokerReference corev1.ObjectReference, serviceName string, e
return &subs
}
+// CreateSinkBinding ---
+func CreateSinkBinding(source corev1.ObjectReference, target corev1.ObjectReference) runtime.Object {
+ binding := sourcesv1alpha1.SinkBinding{
+ TypeMeta: metav1.TypeMeta{
+ APIVersion: sourcesv1alpha1.SchemeGroupVersion.String(),
+ Kind: "SinkBinding",
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: source.Namespace,
+ Name: source.Name,
+ },
+ Spec: sourcesv1alpha1.SinkBindingSpec{
+ BindingSpec: duckv1alpha1.BindingSpec{
+ Subject: tracker.Reference{
+ APIVersion: source.APIVersion,
+ Kind: source.Kind,
+ Name: source.Name,
+ },
+ },
+ SourceSpec: duckv1.SourceSpec{
+ Sink: duckv1.Destination{
+ Ref: &duckv1.KReference{
+ APIVersion: target.APIVersion,
+ Kind: target.Kind,
+ Name: target.Name,
+ },
+ },
+ },
+ },
+ }
+
+ return &binding
+}
+
// GetAddressableReference looks up the resource among all given types and returns an object reference to it
func GetAddressableReference(ctx context.Context, c client.Client,
possibleReferences []corev1.ObjectReference, namespace string, name string) (*corev1.ObjectReference, error) {
diff --git a/pkg/util/kubernetes/collection.go b/pkg/util/kubernetes/collection.go
index bbfb5d2..e04ddbf 100644
--- a/pkg/util/kubernetes/collection.go
+++ b/pkg/util/kubernetes/collection.go
@@ -254,6 +254,17 @@ func (c *Collection) GetRoute(filter func(*routev1.Route) bool) *routev1.Route {
return retValue
}
+// GetCronJob returns a CronJob that matches the given function
+func (c *Collection) GetCronJob(filter func(job *v1beta1.CronJob) bool) *v1beta1.CronJob {
+ var retValue *v1beta1.CronJob
+ c.VisitCronJob(func(re *v1beta1.CronJob) {
+ if filter(re) {
+ retValue = re
+ }
+ })
+ return retValue
+}
+
// VisitCronJob executes the visitor function on all CronJob resources
func (c *Collection) VisitCronJob(visitor func(*v1beta1.CronJob)) {
c.Visit(func(res runtime.Object) {
@@ -357,6 +368,29 @@ func (c *Collection) VisitContainer(visitor func(container *corev1.Container)) {
})
}
+// GetController returns the controller associated with the integration (e.g. Deployment, Knative Service or CronJob)
+func (c *Collection) GetController(filter func(object runtime.Object) bool) runtime.Object {
+ d := c.GetDeployment(func(deployment *appsv1.Deployment) bool {
+ return filter(deployment)
+ })
+ if d != nil {
+ return d
+ }
+ svc := c.GetKnativeService(func(service *serving.Service) bool {
+ return filter(service)
+ })
+ if svc != nil {
+ return svc
+ }
+ cj := c.GetCronJob(func(job *v1beta1.CronJob) bool {
+ return filter(job)
+ })
+ if cj != nil {
+ return cj
+ }
+ return nil
+}
+
// VisitPodSpec executes the visitor function on all PodSpec inside deployments or other resources
func (c *Collection) VisitPodSpec(visitor func(container *corev1.PodSpec)) {
c.VisitDeployment(func(d *appsv1.Deployment) {
diff --git a/script/Makefile b/script/Makefile
index fabc398..56f433c 100644
--- a/script/Makefile
+++ b/script/Makefile
@@ -16,7 +16,7 @@
VERSIONFILE := pkg/util/defaults/defaults.go
VERSION := 1.1.0-SNAPSHOT
LAST_RELEASED_VERSION := 1.0.1
-RUNTIME_VERSION := 1.4.1
+RUNTIME_VERSION := 1.5.0-SNAPSHOT
BUILDAH_VERSION := 1.14.0
KANIKO_VERSION := 0.17.1
BASE_IMAGE := adoptopenjdk/openjdk11:slim
@@ -30,7 +30,7 @@ LINT_DEADLINE := 10m
# Used to push pre-release artifacts
STAGING_IMAGE_NAME := docker.io/camelk/camel-k
-STAGING_RUNTIME_REPO := https://repository.apache.org/content/repositories/orgapachecamel-1235
+STAGING_RUNTIME_REPO :=
# When packaging artifacts into the docker image, you can "copy" them from local maven
# or "download" them from Apache Snapshots and Maven Central