You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/05/23 08:59:39 UTC
[camel-k] 06/07: Fix #668: allow to subscribe to multiple channels
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit ab773ded9e75f5794ef02a6c1e676de6c7963ff4
Author: nferraro <ni...@gmail.com>
AuthorDate: Wed May 22 13:05:28 2019 +0200
Fix #668: allow to subscribe to multiple channels
---
deploy/resources.go | 46 ++++++++++++++++----------------
pkg/apis/camel/v1alpha1/knative/types.go | 16 ++++++-----
pkg/trait/knative.go | 36 ++++++++++++++++++-------
3 files changed, 58 insertions(+), 40 deletions(-)
diff --git a/deploy/resources.go b/deploy/resources.go
index 882550a..c6878fa 100644
--- a/deploy/resources.go
+++ b/deploy/resources.go
@@ -8352,29 +8352,6 @@ spec:
passive: false
`
- Resources["cr-example.yaml"] =
- `
-apiVersion: camel.apache.org/v1alpha1
-kind: Integration
-metadata:
- name: example
-spec:
- source:
- content: |-
- // This is Camel K Groovy example route
-
- rnd = new Random()
-
- from('timer:groovy?period=1s')
- .routeId('groovy')
- .setBody()
- .constant('Hello Camel K!')
- .process {
- it.in.headers['RandomValue'] = rnd.nextInt()
- }
- .to('log:info?showHeaders=true')
- name: routes.groovy
-`
Resources["crd-build.yaml"] =
`
apiVersion: apiextensions.k8s.io/v1beta1
@@ -8541,6 +8518,29 @@ spec:
JSONPath: .status.context
`
+ Resources["cr-example.yaml"] =
+ `
+apiVersion: camel.apache.org/v1alpha1
+kind: Integration
+metadata:
+ name: example
+spec:
+ source:
+ content: |-
+ // This is Camel K Groovy example route
+
+ rnd = new Random()
+
+ from('timer:groovy?period=1s')
+ .routeId('groovy')
+ .setBody()
+ .constant('Hello Camel K!')
+ .process {
+ it.in.headers['RandomValue'] = rnd.nextInt()
+ }
+ .to('log:info?showHeaders=true')
+ name: routes.groovy
+`
Resources["operator-deployment.yaml"] =
`
apiVersion: apps/v1
diff --git a/pkg/apis/camel/v1alpha1/knative/types.go b/pkg/apis/camel/v1alpha1/knative/types.go
index 5c80e43..833931b 100644
--- a/pkg/apis/camel/v1alpha1/knative/types.go
+++ b/pkg/apis/camel/v1alpha1/knative/types.go
@@ -60,11 +60,13 @@ const (
// Meta Options
const (
- CamelMetaServicePath = "service.path"
- CamelMetaServiceID = "service.id"
- CamelMetaServiceName = "service.name"
- CamelMetaServiceHost = "service.host"
- CamelMetaServicePort = "service.port"
- CamelMetaServiceZone = "service.zone"
- CamelMetaServiceProtocol = "service.protocol"
+ CamelMetaServicePath = "service.path"
+ CamelMetaServiceID = "service.id"
+ CamelMetaServiceName = "service.name"
+ CamelMetaServiceHost = "service.host"
+ CamelMetaServicePort = "service.port"
+ CamelMetaServiceZone = "service.zone"
+ CamelMetaServiceProtocol = "service.protocol"
+ CamelMetaFilterHeaderName = "filter.header.name"
+ CamelMetaFilterHeaderValue = "filter.header.value"
)
diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go
index 7436f67..edad18f 100644
--- a/pkg/trait/knative.go
+++ b/pkg/trait/knative.go
@@ -34,15 +34,20 @@ import (
)
type knativeTrait struct {
- BaseTrait `property:",squash"`
- Configuration string `property:"configuration"`
- ChannelSources string `property:"channel-sources"`
- ChannelSinks string `property:"channel-sinks"`
- EndpointSources string `property:"endpoint-sources"`
- EndpointSinks string `property:"endpoint-sinks"`
- Auto *bool `property:"auto"`
+ BaseTrait `property:",squash"`
+ Configuration string `property:"configuration"`
+ ChannelSources string `property:"channel-sources"`
+ ChannelSinks string `property:"channel-sinks"`
+ EndpointSources string `property:"endpoint-sources"`
+ EndpointSinks string `property:"endpoint-sinks"`
+ FilterSourceChannels *bool `property:"filter-source-channels"`
+ Auto *bool `property:"auto"`
}
+const (
+ knativeHistoryHeader = "ce-knativehistory"
+)
+
func newKnativeTrait() *knativeTrait {
t := &knativeTrait{
BaseTrait: newBaseTrait("knative"),
@@ -101,6 +106,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) {
t.EndpointSinks = strings.Join(items, ",")
}
+ if t.FilterSourceChannels == nil && len(strings.Split(t.ChannelSources, ",")) > 1 {
+ // Filter channels when the integration subscribes to more than one
+ filter := true
+ t.FilterSourceChannels = &filter
+ }
}
return true, nil
@@ -169,15 +179,21 @@ func (t *knativeTrait) configureChannels(e *Environment, env *knativeapi.CamelEn
if env.ContainsService(ch, knativeapi.CamelServiceTypeChannel) {
continue
}
+ meta := map[string]string{
+ knativeapi.CamelMetaServicePath: "/",
+ }
+ if t.FilterSourceChannels != nil && *t.FilterSourceChannels {
+ fullName := ch + "." + e.Integration.Namespace + ".channels.cluster.local"
+ meta[knativeapi.CamelMetaFilterHeaderName] = knativeHistoryHeader
+ meta[knativeapi.CamelMetaFilterHeaderValue] = fullName
+ }
svc := knativeapi.CamelServiceDefinition{
Name: ch,
Host: "0.0.0.0",
Port: 8080,
Protocol: knativeapi.CamelProtocolHTTP,
ServiceType: knativeapi.CamelServiceTypeChannel,
- Metadata: map[string]string{
- knativeapi.CamelMetaServicePath: "/",
- },
+ Metadata: meta,
}
env.Services = append(env.Services, svc)
}