You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/05/23 08:59:39 UTC

[camel-k] 06/07: Fix #668: allow to subscribe to multiple channels

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit ab773ded9e75f5794ef02a6c1e676de6c7963ff4
Author: nferraro <ni...@gmail.com>
AuthorDate: Wed May 22 13:05:28 2019 +0200

    Fix #668: allow to subscribe to multiple channels
---
 deploy/resources.go                      | 46 ++++++++++++++++----------------
 pkg/apis/camel/v1alpha1/knative/types.go | 16 ++++++-----
 pkg/trait/knative.go                     | 36 ++++++++++++++++++-------
 3 files changed, 58 insertions(+), 40 deletions(-)

diff --git a/deploy/resources.go b/deploy/resources.go
index 882550a..c6878fa 100644
--- a/deploy/resources.go
+++ b/deploy/resources.go
@@ -8352,29 +8352,6 @@ spec:
         passive: false
 
 `
-	Resources["cr-example.yaml"] =
-		`
-apiVersion: camel.apache.org/v1alpha1
-kind: Integration
-metadata:
-  name: example
-spec:
-  source:
-    content: |-
-      // This is Camel K Groovy example route
-
-      rnd = new Random()
-
-      from('timer:groovy?period=1s')
-          .routeId('groovy')
-          .setBody()
-              .constant('Hello Camel K!')
-          .process {
-              it.in.headers['RandomValue'] = rnd.nextInt()
-          }
-          .to('log:info?showHeaders=true')
-    name: routes.groovy
-`
 	Resources["crd-build.yaml"] =
 		`
 apiVersion: apiextensions.k8s.io/v1beta1
@@ -8541,6 +8518,29 @@ spec:
       JSONPath: .status.context
 
 `
+	Resources["cr-example.yaml"] =
+		`
+apiVersion: camel.apache.org/v1alpha1
+kind: Integration
+metadata:
+  name: example
+spec:
+  source:
+    content: |-
+      // This is Camel K Groovy example route
+
+      rnd = new Random()
+
+      from('timer:groovy?period=1s')
+          .routeId('groovy')
+          .setBody()
+              .constant('Hello Camel K!')
+          .process {
+              it.in.headers['RandomValue'] = rnd.nextInt()
+          }
+          .to('log:info?showHeaders=true')
+    name: routes.groovy
+`
 	Resources["operator-deployment.yaml"] =
 		`
 apiVersion: apps/v1
diff --git a/pkg/apis/camel/v1alpha1/knative/types.go b/pkg/apis/camel/v1alpha1/knative/types.go
index 5c80e43..833931b 100644
--- a/pkg/apis/camel/v1alpha1/knative/types.go
+++ b/pkg/apis/camel/v1alpha1/knative/types.go
@@ -60,11 +60,13 @@ const (
 
 // Meta Options
 const (
-	CamelMetaServicePath     = "service.path"
-	CamelMetaServiceID       = "service.id"
-	CamelMetaServiceName     = "service.name"
-	CamelMetaServiceHost     = "service.host"
-	CamelMetaServicePort     = "service.port"
-	CamelMetaServiceZone     = "service.zone"
-	CamelMetaServiceProtocol = "service.protocol"
+	CamelMetaServicePath       = "service.path"
+	CamelMetaServiceID         = "service.id"
+	CamelMetaServiceName       = "service.name"
+	CamelMetaServiceHost       = "service.host"
+	CamelMetaServicePort       = "service.port"
+	CamelMetaServiceZone       = "service.zone"
+	CamelMetaServiceProtocol   = "service.protocol"
+	CamelMetaFilterHeaderName  = "filter.header.name"
+	CamelMetaFilterHeaderValue = "filter.header.value"
 )
diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go
index 7436f67..edad18f 100644
--- a/pkg/trait/knative.go
+++ b/pkg/trait/knative.go
@@ -34,15 +34,20 @@ import (
 )
 
 type knativeTrait struct {
-	BaseTrait       `property:",squash"`
-	Configuration   string `property:"configuration"`
-	ChannelSources  string `property:"channel-sources"`
-	ChannelSinks    string `property:"channel-sinks"`
-	EndpointSources string `property:"endpoint-sources"`
-	EndpointSinks   string `property:"endpoint-sinks"`
-	Auto            *bool  `property:"auto"`
+	BaseTrait            `property:",squash"`
+	Configuration        string `property:"configuration"`
+	ChannelSources       string `property:"channel-sources"`
+	ChannelSinks         string `property:"channel-sinks"`
+	EndpointSources      string `property:"endpoint-sources"`
+	EndpointSinks        string `property:"endpoint-sinks"`
+	FilterSourceChannels *bool  `property:"filter-source-channels"`
+	Auto                 *bool  `property:"auto"`
 }
 
+const (
+	knativeHistoryHeader = "ce-knativehistory"
+)
+
 func newKnativeTrait() *knativeTrait {
 	t := &knativeTrait{
 		BaseTrait: newBaseTrait("knative"),
@@ -101,6 +106,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) {
 
 			t.EndpointSinks = strings.Join(items, ",")
 		}
+		if t.FilterSourceChannels == nil && len(strings.Split(t.ChannelSources, ",")) > 1 {
+			// Filter channels when the integration subscribes to more than one
+			filter := true
+			t.FilterSourceChannels = &filter
+		}
 	}
 
 	return true, nil
@@ -169,15 +179,21 @@ func (t *knativeTrait) configureChannels(e *Environment, env *knativeapi.CamelEn
 		if env.ContainsService(ch, knativeapi.CamelServiceTypeChannel) {
 			continue
 		}
+		meta := map[string]string{
+			knativeapi.CamelMetaServicePath: "/",
+		}
+		if t.FilterSourceChannels != nil && *t.FilterSourceChannels {
+			fullName := ch + "." + e.Integration.Namespace + ".channels.cluster.local"
+			meta[knativeapi.CamelMetaFilterHeaderName] = knativeHistoryHeader
+			meta[knativeapi.CamelMetaFilterHeaderValue] = fullName
+		}
 		svc := knativeapi.CamelServiceDefinition{
 			Name:        ch,
 			Host:        "0.0.0.0",
 			Port:        8080,
 			Protocol:    knativeapi.CamelProtocolHTTP,
 			ServiceType: knativeapi.CamelServiceTypeChannel,
-			Metadata: map[string]string{
-				knativeapi.CamelMetaServicePath: "/",
-			},
+			Metadata: meta,
 		}
 		env.Services = append(env.Services, svc)
 	}