You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2018/11/29 13:28:24 UTC

[camel-k] 02/03: Fix #218: allow to push to Knative

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 1445a6f099268a28ffa10782c98ac87e21450bef
Author: nferraro <ni...@gmail.com>
AuthorDate: Mon Nov 26 17:07:58 2018 +0100

    Fix #218: allow to push to Knative
---
 pkg/trait/knative.go                               | 112 +++++++++++++++++----
 .../camel/component/knative/KnativeEndpoint.java   |   4 +-
 2 files changed, 97 insertions(+), 19 deletions(-)

diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go
index e463746..b2cd958 100644
--- a/pkg/trait/knative.go
+++ b/pkg/trait/knative.go
@@ -20,6 +20,8 @@ package trait
 import (
 	"encoding/json"
 	"fmt"
+	"github.com/operator-framework/operator-sdk/pkg/sdk"
+	"github.com/pkg/errors"
 	"strings"
 
 	"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
@@ -28,7 +30,6 @@ import (
 	knativeutil "github.com/apache/camel-k/pkg/util/knative"
 	eventing "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
 	serving "github.com/knative/serving/pkg/apis/serving/v1alpha1"
-	"github.com/sirupsen/logrus"
 	corev1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 )
@@ -36,6 +37,7 @@ import (
 type knativeTrait struct {
 	BaseTrait `property:",squash"`
 	Sources   string `property:"sources"`
+	Sinks     string `property:"sinks"`
 }
 
 func newKnativeTrait() *knativeTrait {
@@ -50,9 +52,13 @@ func (t *knativeTrait) appliesTo(e *Environment) bool {
 
 func (t *knativeTrait) autoconfigure(e *Environment) error {
 	if t.Sources == "" {
-		channels := getSourceChannels(e)
+		channels := t.getSourceChannels(e)
 		t.Sources = strings.Join(channels, ",")
 	}
+	if t.Sinks == "" {
+		channels := t.getSinkChannels(e)
+		t.Sinks = strings.Join(channels, ",")
+	}
 	return nil
 }
 
@@ -60,11 +66,15 @@ func (t *knativeTrait) apply(e *Environment) error {
 	for _, sub := range t.getSubscriptionsFor(e) {
 		e.Resources.Add(sub)
 	}
-	e.Resources.Add(t.getServiceFor(e))
+	svc, err := t.getServiceFor(e)
+	if err != nil {
+		return err
+	}
+	e.Resources.Add(svc)
 	return nil
 }
 
-func (t *knativeTrait) getServiceFor(e *Environment) *serving.Service {
+func (t *knativeTrait) getServiceFor(e *Environment) (*serving.Service, error) {
 	// combine properties of integration with context, integration
 	// properties have the priority
 	properties := CombineConfigurationAsMap("property", e.Context, e.Integration)
@@ -102,7 +112,11 @@ func (t *knativeTrait) getServiceFor(e *Environment) *serving.Service {
 	environment["AB_JOLOKIA_OFF"] = "true"
 
 	// Knative integration
-	environment["CAMEL_KNATIVE_CONFIGURATION"] = t.getConfigurationSerialized(e)
+	conf, err := t.getConfigurationSerialized(e)
+	if err != nil {
+		return nil, err
+	}
+	environment["CAMEL_KNATIVE_CONFIGURATION"] = conf
 
 	labels := map[string]string{
 		"camel.apache.org/integration": e.Integration.Name,
@@ -135,11 +149,11 @@ func (t *knativeTrait) getServiceFor(e *Environment) *serving.Service {
 		},
 	}
 
-	return &svc
+	return &svc, nil
 }
 
 func (t *knativeTrait) getSubscriptionsFor(e *Environment) []*eventing.Subscription {
-	channels := getConfiguredSourceChannels(t.Sources)
+	channels := t.getConfiguredSourceChannels()
 	subs := make([]*eventing.Subscription, 0)
 	for _, ch := range channels {
 		subs = append(subs, t.getSubscriptionFor(e, ch))
@@ -174,19 +188,19 @@ func (*knativeTrait) getSubscriptionFor(e *Environment, channel string) *eventin
 	}
 }
 
-func (t *knativeTrait) getConfigurationSerialized(e *Environment) string {
-	env := t.getConfiguration(e)
+func (t *knativeTrait) getConfigurationSerialized(e *Environment) (string, error) {
+	env, err := t.getConfiguration(e)
 	res, err := json.Marshal(env)
 	if err != nil {
-		logrus.Warning("Unable to serialize Knative configuration", err)
-		return ""
+		return "", errors.Wrap(err, "unable to serialize Knative configuration")
 	}
-	return string(res)
+	return string(res), nil
 }
 
-func (t *knativeTrait) getConfiguration(e *Environment) knativeutil.CamelEnvironment {
-	sourceChannels := getConfiguredSourceChannels(t.Sources)
+func (t *knativeTrait) getConfiguration(e *Environment) (knativeutil.CamelEnvironment, error) {
 	env := knativeutil.NewCamelEnvironment()
+	// Sources
+	sourceChannels := t.getConfiguredSourceChannels()
 	for _, ch := range sourceChannels {
 		svc := knativeutil.CamelServiceDefinition{
 			Name:        ch,
@@ -200,6 +214,29 @@ func (t *knativeTrait) getConfiguration(e *Environment) knativeutil.CamelEnviron
 		}
 		env.Services = append(env.Services, svc)
 	}
+	// Sinks
+	sinkChannels := t.getConfiguredSinkChannels()
+	for _, ch := range sinkChannels {
+		channel, err := t.retrieveChannel(e.Integration.Namespace, ch)
+		if err != nil {
+			return env, err
+		}
+		hostname := channel.Status.Address.Hostname
+		if hostname == "" {
+			return env, errors.New("cannot find address of channel " + ch)
+		}
+		svc := knativeutil.CamelServiceDefinition{
+			Name:        ch,
+			Host:        hostname,
+			Port:        80,
+			Protocol:    knativeutil.CamelProtocolHTTP,
+			ServiceType: knativeutil.CamelServiceTypeChannel,
+			Metadata: map[string]string{
+				knativeutil.CamelMetaServicePath: "/",
+			},
+		}
+		env.Services = append(env.Services, svc)
+	}
 	// Adding default endpoint
 	defSvc := knativeutil.CamelServiceDefinition{
 		Name:        "default",
@@ -212,12 +249,12 @@ func (t *knativeTrait) getConfiguration(e *Environment) knativeutil.CamelEnviron
 		},
 	}
 	env.Services = append(env.Services, defSvc)
-	return env
+	return env, nil
 }
 
-func getConfiguredSourceChannels(sources string) []string {
+func (t *knativeTrait) getConfiguredSourceChannels() []string {
 	channels := make([]string, 0)
-	for _, ch := range strings.Split(sources, ",") {
+	for _, ch := range strings.Split(t.Sources, ",") {
 		cht := strings.Trim(ch, " \t\"")
 		if cht != "" {
 			channels = append(channels, cht)
@@ -226,7 +263,7 @@ func getConfiguredSourceChannels(sources string) []string {
 	return channels
 }
 
-func getSourceChannels(e *Environment) []string {
+func (*knativeTrait) getSourceChannels(e *Environment) []string {
 	channels := make([]string, 0)
 
 	metadata.Each(e.Integration.Spec.Sources, func(_ int, meta metadata.IntegrationMetadata) bool {
@@ -236,3 +273,42 @@ func getSourceChannels(e *Environment) []string {
 
 	return channels
 }
+
+func (t *knativeTrait) getConfiguredSinkChannels() []string {
+	channels := make([]string, 0)
+	for _, ch := range strings.Split(t.Sinks, ",") {
+		cht := strings.Trim(ch, " \t\"")
+		if cht != "" {
+			channels = append(channels, cht)
+		}
+	}
+	return channels
+}
+
+func (*knativeTrait) getSinkChannels(e *Environment) []string {
+	channels := make([]string, 0)
+
+	metadata.Each(e.Integration.Spec.Sources, func(_ int, meta metadata.IntegrationMetadata) bool {
+		channels = append(channels, knativeutil.ExtractChannelNames(meta.ToURIs)...)
+		return true
+	})
+
+	return channels
+}
+
+func (*knativeTrait) retrieveChannel(namespace string, name string) (*eventing.Channel, error) {
+	channel := eventing.Channel{
+		TypeMeta: metav1.TypeMeta{
+			Kind:       "Channel",
+			APIVersion: eventing.SchemeGroupVersion.String(),
+		},
+		ObjectMeta: metav1.ObjectMeta{
+			Namespace: namespace,
+			Name:      name,
+		},
+	}
+	if err := sdk.Get(&channel); err != nil {
+		return nil, errors.Wrap(err, "could not retrieve channel " + name + " in namespace " + namespace)
+	}
+	return &channel, nil
+}
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index eb3a1fd..c1b92cd 100644
--- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -52,7 +52,6 @@ import static org.apache.camel.util.ObjectHelper.ifNotEmpty;
     scheme = "knative",
     syntax = "knative:type/target",
     title = "Knative",
-    producerOnly = true,
     label = "cloud,eventing")
 public class KnativeEndpoint extends DefaultEndpoint implements DelegateEndpoint {
     @UriPath(description = "The Knative type")
@@ -118,6 +117,9 @@ public class KnativeEndpoint extends DefaultEndpoint implements DelegateEndpoint
                 headers.putIfAbsent("CE-EventTime", eventTime);
                 headers.putIfAbsent("CE-Source", getEndpointUri());
                 headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
+
+                // Always remove host so it's always computed from the URL and not inherited from the exchange
+                headers.remove("Host");
             },
             endpoint.createProducer()
         );