You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2018/11/29 13:28:24 UTC
[camel-k] 02/03: Fix #218: allow to push to Knative
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 1445a6f099268a28ffa10782c98ac87e21450bef
Author: nferraro <ni...@gmail.com>
AuthorDate: Mon Nov 26 17:07:58 2018 +0100
Fix #218: allow to push to Knative
---
pkg/trait/knative.go | 112 +++++++++++++++++----
.../camel/component/knative/KnativeEndpoint.java | 4 +-
2 files changed, 97 insertions(+), 19 deletions(-)
diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go
index e463746..b2cd958 100644
--- a/pkg/trait/knative.go
+++ b/pkg/trait/knative.go
@@ -20,6 +20,8 @@ package trait
import (
"encoding/json"
"fmt"
+ "github.com/operator-framework/operator-sdk/pkg/sdk"
+ "github.com/pkg/errors"
"strings"
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
@@ -28,7 +30,6 @@ import (
knativeutil "github.com/apache/camel-k/pkg/util/knative"
eventing "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
serving "github.com/knative/serving/pkg/apis/serving/v1alpha1"
- "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@@ -36,6 +37,7 @@ import (
type knativeTrait struct {
BaseTrait `property:",squash"`
Sources string `property:"sources"`
+ Sinks string `property:"sinks"`
}
func newKnativeTrait() *knativeTrait {
@@ -50,9 +52,13 @@ func (t *knativeTrait) appliesTo(e *Environment) bool {
func (t *knativeTrait) autoconfigure(e *Environment) error {
if t.Sources == "" {
- channels := getSourceChannels(e)
+ channels := t.getSourceChannels(e)
t.Sources = strings.Join(channels, ",")
}
+ if t.Sinks == "" {
+ channels := t.getSinkChannels(e)
+ t.Sinks = strings.Join(channels, ",")
+ }
return nil
}
@@ -60,11 +66,15 @@ func (t *knativeTrait) apply(e *Environment) error {
for _, sub := range t.getSubscriptionsFor(e) {
e.Resources.Add(sub)
}
- e.Resources.Add(t.getServiceFor(e))
+ svc, err := t.getServiceFor(e)
+ if err != nil {
+ return err
+ }
+ e.Resources.Add(svc)
return nil
}
-func (t *knativeTrait) getServiceFor(e *Environment) *serving.Service {
+func (t *knativeTrait) getServiceFor(e *Environment) (*serving.Service, error) {
// combine properties of integration with context, integration
// properties have the priority
properties := CombineConfigurationAsMap("property", e.Context, e.Integration)
@@ -102,7 +112,11 @@ func (t *knativeTrait) getServiceFor(e *Environment) *serving.Service {
environment["AB_JOLOKIA_OFF"] = "true"
// Knative integration
- environment["CAMEL_KNATIVE_CONFIGURATION"] = t.getConfigurationSerialized(e)
+ conf, err := t.getConfigurationSerialized(e)
+ if err != nil {
+ return nil, err
+ }
+ environment["CAMEL_KNATIVE_CONFIGURATION"] = conf
labels := map[string]string{
"camel.apache.org/integration": e.Integration.Name,
@@ -135,11 +149,11 @@ func (t *knativeTrait) getServiceFor(e *Environment) *serving.Service {
},
}
- return &svc
+ return &svc, nil
}
func (t *knativeTrait) getSubscriptionsFor(e *Environment) []*eventing.Subscription {
- channels := getConfiguredSourceChannels(t.Sources)
+ channels := t.getConfiguredSourceChannels()
subs := make([]*eventing.Subscription, 0)
for _, ch := range channels {
subs = append(subs, t.getSubscriptionFor(e, ch))
@@ -174,19 +188,19 @@ func (*knativeTrait) getSubscriptionFor(e *Environment, channel string) *eventin
}
}
-func (t *knativeTrait) getConfigurationSerialized(e *Environment) string {
- env := t.getConfiguration(e)
+func (t *knativeTrait) getConfigurationSerialized(e *Environment) (string, error) {
+ env, err := t.getConfiguration(e)
res, err := json.Marshal(env)
if err != nil {
- logrus.Warning("Unable to serialize Knative configuration", err)
- return ""
+ return "", errors.Wrap(err, "unable to serialize Knative configuration")
}
- return string(res)
+ return string(res), nil
}
-func (t *knativeTrait) getConfiguration(e *Environment) knativeutil.CamelEnvironment {
- sourceChannels := getConfiguredSourceChannels(t.Sources)
+func (t *knativeTrait) getConfiguration(e *Environment) (knativeutil.CamelEnvironment, error) {
env := knativeutil.NewCamelEnvironment()
+ // Sources
+ sourceChannels := t.getConfiguredSourceChannels()
for _, ch := range sourceChannels {
svc := knativeutil.CamelServiceDefinition{
Name: ch,
@@ -200,6 +214,29 @@ func (t *knativeTrait) getConfiguration(e *Environment) knativeutil.CamelEnviron
}
env.Services = append(env.Services, svc)
}
+ // Sinks
+ sinkChannels := t.getConfiguredSinkChannels()
+ for _, ch := range sinkChannels {
+ channel, err := t.retrieveChannel(e.Integration.Namespace, ch)
+ if err != nil {
+ return env, err
+ }
+ hostname := channel.Status.Address.Hostname
+ if hostname == "" {
+ return env, errors.New("cannot find address of channel " + ch)
+ }
+ svc := knativeutil.CamelServiceDefinition{
+ Name: ch,
+ Host: hostname,
+ Port: 80,
+ Protocol: knativeutil.CamelProtocolHTTP,
+ ServiceType: knativeutil.CamelServiceTypeChannel,
+ Metadata: map[string]string{
+ knativeutil.CamelMetaServicePath: "/",
+ },
+ }
+ env.Services = append(env.Services, svc)
+ }
// Adding default endpoint
defSvc := knativeutil.CamelServiceDefinition{
Name: "default",
@@ -212,12 +249,12 @@ func (t *knativeTrait) getConfiguration(e *Environment) knativeutil.CamelEnviron
},
}
env.Services = append(env.Services, defSvc)
- return env
+ return env, nil
}
-func getConfiguredSourceChannels(sources string) []string {
+func (t *knativeTrait) getConfiguredSourceChannels() []string {
channels := make([]string, 0)
- for _, ch := range strings.Split(sources, ",") {
+ for _, ch := range strings.Split(t.Sources, ",") {
cht := strings.Trim(ch, " \t\"")
if cht != "" {
channels = append(channels, cht)
@@ -226,7 +263,7 @@ func getConfiguredSourceChannels(sources string) []string {
return channels
}
-func getSourceChannels(e *Environment) []string {
+func (*knativeTrait) getSourceChannels(e *Environment) []string {
channels := make([]string, 0)
metadata.Each(e.Integration.Spec.Sources, func(_ int, meta metadata.IntegrationMetadata) bool {
@@ -236,3 +273,42 @@ func getSourceChannels(e *Environment) []string {
return channels
}
+
+func (t *knativeTrait) getConfiguredSinkChannels() []string {
+ channels := make([]string, 0)
+ for _, ch := range strings.Split(t.Sinks, ",") {
+ cht := strings.Trim(ch, " \t\"")
+ if cht != "" {
+ channels = append(channels, cht)
+ }
+ }
+ return channels
+}
+
+func (*knativeTrait) getSinkChannels(e *Environment) []string {
+ channels := make([]string, 0)
+
+ metadata.Each(e.Integration.Spec.Sources, func(_ int, meta metadata.IntegrationMetadata) bool {
+ channels = append(channels, knativeutil.ExtractChannelNames(meta.ToURIs)...)
+ return true
+ })
+
+ return channels
+}
+
+func (*knativeTrait) retrieveChannel(namespace string, name string) (*eventing.Channel, error) {
+ channel := eventing.Channel{
+ TypeMeta: metav1.TypeMeta{
+ Kind: "Channel",
+ APIVersion: eventing.SchemeGroupVersion.String(),
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: namespace,
+ Name: name,
+ },
+ }
+ if err := sdk.Get(&channel); err != nil {
+ return nil, errors.Wrap(err, "could not retrieve channel " + name + " in namespace " + namespace)
+ }
+ return &channel, nil
+}
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index eb3a1fd..c1b92cd 100644
--- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -52,7 +52,6 @@ import static org.apache.camel.util.ObjectHelper.ifNotEmpty;
scheme = "knative",
syntax = "knative:type/target",
title = "Knative",
- producerOnly = true,
label = "cloud,eventing")
public class KnativeEndpoint extends DefaultEndpoint implements DelegateEndpoint {
@UriPath(description = "The Knative type")
@@ -118,6 +117,9 @@ public class KnativeEndpoint extends DefaultEndpoint implements DelegateEndpoint
headers.putIfAbsent("CE-EventTime", eventTime);
headers.putIfAbsent("CE-Source", getEndpointUri());
headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
+
+ // Always remove host so it's always computed from the URL and not inherited from the exchange
+ headers.remove("Host");
},
endpoint.createProducer()
);