You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/02/04 07:41:22 UTC

[camel-k] branch master updated: camel-knative: support for cloud events specs v0.2 #376

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c3e250  camel-knative: support for cloud events specs v0.2 #376
3c3e250 is described below

commit 3c3e2502808c8b93f728c1d3ae5e94b8ad110741
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Fri Feb 1 17:03:06 2019 +0100

    camel-knative: support for cloud events specs v0.2 #376
---
 .../camel/component/knative/KnativeComponent.java  |  30 +-
 .../component/knative/KnativeConfiguration.java    |  22 ++
 .../camel/component/knative/KnativeEndpoint.java   |  87 +----
 .../camel/component/knative/KnativeSupport.java    |  13 -
 .../knative/ce/CloudEventsProcessors.java          |  65 ++++
 .../org/apache/camel/component/knative/ce/V01.java | 102 ++++++
 .../org/apache/camel/component/knative/ce/V02.java | 102 ++++++
 .../component/knative/CloudEventsV01Test.java      | 349 +++++++++++++++++++++
 .../component/knative/CloudEventsV02Test.java      | 349 +++++++++++++++++++++
 9 files changed, 1019 insertions(+), 100 deletions(-)

diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
index 264f644..60b7835 100644
--- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java
@@ -30,8 +30,6 @@ public class KnativeComponent extends DefaultComponent {
     private final KnativeConfiguration configuration;
     private String environmentPath;
 
-    private boolean jsonSerializationEnabled;
-
     public KnativeComponent() {
         this(null);
     }
@@ -74,6 +72,22 @@ public class KnativeComponent extends DefaultComponent {
         configuration.setEnvironment(environment);
     }
 
+    public boolean isJsonSerializationEnabled() {
+        return configuration.isJsonSerializationEnabled();
+    }
+
+    public void setJsonSerializationEnabled(boolean jsonSerializationEnabled) {
+        configuration.setJsonSerializationEnabled(jsonSerializationEnabled);
+    }
+
+    public String getCloudEventsSpecVersion() {
+        return configuration.getCloudEventsSpecVersion();
+    }
+
+    public void setCloudEventsSpecVersion(String cloudEventSpecVersion) {
+        configuration.setCloudEventsSpecVersion(cloudEventSpecVersion);
+    }
+
     // ************************
     //
     //
@@ -105,11 +119,11 @@ public class KnativeComponent extends DefaultComponent {
             String envConfig = System.getenv(CONFIGURATION_ENV_VARIABLE);
             if (environmentPath != null) {
                 conf.setEnvironment(
-                        KnativeEnvironment.mandatoryLoadFromResource(getCamelContext(), this.environmentPath)
+                    KnativeEnvironment.mandatoryLoadFromResource(getCamelContext(), this.environmentPath)
                 );
             } else if (envConfig != null) {
                 conf.setEnvironment(
-                        KnativeEnvironment.mandatoryLoadFromSerializedString(getCamelContext(), envConfig)
+                    KnativeEnvironment.mandatoryLoadFromSerializedString(getCamelContext(), envConfig)
                 );
             } else {
                 throw new IllegalStateException("Cannot load Knative configuration from file or env variable");
@@ -118,12 +132,4 @@ public class KnativeComponent extends DefaultComponent {
 
         return conf;
     }
-
-    public boolean isJsonSerializationEnabled() {
-        return jsonSerializationEnabled;
-    }
-
-    public void setJsonSerializationEnabled(boolean jsonSerializationEnabled) {
-        this.jsonSerializationEnabled = jsonSerializationEnabled;
-    }
 }
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
index cc3fa65..967f7dd 100644
--- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
@@ -25,6 +25,12 @@ public class KnativeConfiguration implements Cloneable {
     @Metadata(required = "true")
     private KnativeEnvironment environment;
 
+    @UriParam(defaultValue = "false")
+    private boolean jsonSerializationEnabled;
+
+    @UriParam(defaultValue = "0.1", enums = "0.1,0.2")
+    private String cloudEventsSpecVersion = "0.1";
+
     public KnativeConfiguration() {
     }
 
@@ -45,6 +51,22 @@ public class KnativeConfiguration implements Cloneable {
         this.environment = environment;
     }
 
+    public boolean isJsonSerializationEnabled() {
+        return jsonSerializationEnabled;
+    }
+
+    public void setJsonSerializationEnabled(boolean jsonSerializationEnabled) {
+        this.jsonSerializationEnabled = jsonSerializationEnabled;
+    }
+
+    public String getCloudEventsSpecVersion() {
+        return cloudEventsSpecVersion;
+    }
+
+    public void setCloudEventsSpecVersion(String cloudEventsSpecVersion) {
+        this.cloudEventsSpecVersion = cloudEventsSpecVersion;
+    }
+
     // ************************
     //
     // Cloneable
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index 89c7f3b..cceb218 100644
--- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -16,15 +16,17 @@
  */
 package org.apache.camel.component.knative;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.DelegateEndpoint;
 import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.cloud.ServiceDefinition;
+import org.apache.camel.component.knative.ce.CloudEventsProcessors;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.processor.Pipeline;
 import org.apache.camel.spi.Metadata;
@@ -35,16 +37,6 @@ import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.StringHelper;
 import org.apache.camel.util.URISupport;
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.InputStream;
-import java.time.ZoneId;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.camel.util.ObjectHelper.ifNotEmpty;
 
 
 @UriEndpoint(
@@ -107,73 +99,18 @@ public class KnativeEndpoint extends DefaultEndpoint implements DelegateEndpoint
 
     @Override
     public Producer createProducer() throws Exception {
-        return new KnativeProducer(
-            this,
-            exchange -> {
-                final String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
-                final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE);
-                final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
-                final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
-                final Map<String, Object> headers = exchange.getIn().getHeaders();
-
-                headers.putIfAbsent("CE-CloudEventsVersion", "0.1");
-                headers.putIfAbsent("CE-EventType", eventType);
-                headers.putIfAbsent("CE-EventID", exchange.getExchangeId());
-                headers.putIfAbsent("CE-EventTime", eventTime);
-                headers.putIfAbsent("CE-Source", getEndpointUri());
-                headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
-
-                // Always remove host so it's always computed from the URL and not inherited from the exchange
-                headers.remove("Host");
-            },
-            new KnativeConversionProcessor(getComponent().isJsonSerializationEnabled()),
-            endpoint.createProducer()
-        );
+        final String version = configuration.getCloudEventsSpecVersion();
+        final Processor ceProcessor = CloudEventsProcessors.forSpecversion(version).producerProcessor(this);
+        final Processor ceConverter = new KnativeConversionProcessor(configuration.isJsonSerializationEnabled());
+
+        return new KnativeProducer(this, ceProcessor, ceConverter, endpoint.createProducer());
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        final Processor pipeline = Pipeline.newInstance(
-            getCamelContext(),
-            exchange -> {
-                if (!KnativeSupport.hasStructuredContent(exchange)) {
-                    //
-                    // The event is not in the form of Structured Content Mode
-                    // then leave it as it is.
-                    //
-                    // Note that this is true for http binding only.
-                    //
-                    // More info:
-                    //
-                    //   https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode
-                    //
-                    return;
-                }
-
-                try (InputStream is = exchange.getIn().getBody(InputStream.class)) {
-                    final Message message = exchange.getIn();
-                    final Map<String, Object> ce = Knative.MAPPER.readValue(is, Map.class);
-
-                    ifNotEmpty(ce.remove("contentType"), val -> message.setHeader(Exchange.CONTENT_TYPE, val));
-                    ifNotEmpty(ce.remove("data"), val -> message.setBody(val));
-
-                    //
-                    // Map extensions to standard camel headers
-                    //
-                    ifNotEmpty(ce.remove("extensions"), val -> {
-                        if (val instanceof Map) {
-                            ((Map<String, Object>) val).forEach(message::setHeader);
-                        }
-                    });
-
-                    ce.forEach((key, val) -> {
-                        message.setHeader("CE-" + StringUtils.capitalize(key), val);
-                    });
-                }
-            },
-            processor
-        );
-
+        final String version = configuration.getCloudEventsSpecVersion();
+        final Processor ceProcessor = CloudEventsProcessors.forSpecversion(version).consumerProcessor(this);
+        final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor);
         final Consumer consumer = endpoint.createConsumer(pipeline);
 
         configureConsumer(consumer);
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeSupport.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeSupport.java
index f84ac46..9c6c049 100644
--- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeSupport.java
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeSupport.java
@@ -16,15 +16,11 @@
  */
 package org.apache.camel.component.knative;
 
-import java.io.UnsupportedEncodingException;
-import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.util.CollectionHelper;
-import org.apache.camel.util.URISupport;
 
 public final class KnativeSupport {
     private KnativeSupport() {
@@ -47,13 +43,4 @@ public final class KnativeSupport {
 
         return answer;
     }
-
-    public static String appendParametersToURI(String uri, String key, Object value, Object... keyVals)
-            throws UnsupportedEncodingException, URISyntaxException {
-
-        return URISupport.appendParametersToURI(
-            uri,
-            CollectionHelper.mapOf(key, value, keyVals)
-        );
-    }
 }
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventsProcessors.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventsProcessors.java
new file mode 100644
index 0000000..49ebee4d
--- /dev/null
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventsProcessors.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.ce;
+
+import java.util.function.Function;
+
+import org.apache.camel.Processor;
+import org.apache.camel.component.knative.KnativeEndpoint;
+
+public enum CloudEventsProcessors {
+    v01("0.1", V01.PRODUCER, V01.CONSUMER),
+    v02("0.2", V02.PRODUCER, V02.CONSUMER);
+
+    private final String version;
+    private final Function<KnativeEndpoint, Processor> producer;
+    private final Function<KnativeEndpoint, Processor> consumer;
+
+    CloudEventsProcessors(String version, Function<KnativeEndpoint, Processor> producer, Function<KnativeEndpoint, Processor> consumer) {
+        this.version = version;
+        this.producer = producer;
+        this.consumer = consumer;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public Processor producerProcessor(KnativeEndpoint endpoint) {
+        return this.producer.apply(endpoint);
+    }
+
+    public Processor consumerProcessor(KnativeEndpoint endpoint) {
+        return this.consumer.apply(endpoint);
+    }
+
+    // **************************
+    //
+    // Helpers
+    //
+    // **************************
+
+    public static CloudEventsProcessors forSpecversion(String version) {
+        for (CloudEventsProcessors ce : CloudEventsProcessors.values()) {
+            if (ce.version.equals(version)) {
+                return ce;
+            }
+        }
+
+        throw new IllegalArgumentException("Unable to find processors for spec version: " +  version);
+    }
+}
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V01.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V01.java
new file mode 100644
index 0000000..4505d8e
--- /dev/null
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V01.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.ce;
+
+import java.io.InputStream;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+import java.util.function.Function;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.knative.Knative;
+import org.apache.camel.component.knative.KnativeEndpoint;
+import org.apache.camel.component.knative.KnativeEnvironment;
+import org.apache.camel.component.knative.KnativeSupport;
+import org.apache.commons.lang3.StringUtils;
+
+import static org.apache.camel.util.ObjectHelper.ifNotEmpty;
+
+final class V01 {
+    private V01() {
+    }
+
+    public static final Function<KnativeEndpoint, Processor> PRODUCER = (KnativeEndpoint endpoint) -> {
+        KnativeEnvironment.KnativeServiceDefinition service = endpoint.getService();
+        String uri = endpoint.getEndpointUri();
+
+        return exchange -> {
+            final String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
+            final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE);
+            final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
+            final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
+            final Map<String, Object> headers = exchange.getIn().getHeaders();
+
+            headers.putIfAbsent("CE-CloudEventsVersion", "0.1");
+            headers.putIfAbsent("CE-EventType", eventType);
+            headers.putIfAbsent("CE-EventID", exchange.getExchangeId());
+            headers.putIfAbsent("CE-EventTime", eventTime);
+            headers.putIfAbsent("CE-Source", uri);
+            headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
+
+            // Always remove host so it's always computed from the URL and not inherited from the exchange
+            headers.remove("Host");
+        };
+    };
+
+    public static final Function<KnativeEndpoint, Processor> CONSUMER = (KnativeEndpoint endpoint) -> {
+        return exchange -> {
+            if (!KnativeSupport.hasStructuredContent(exchange)) {
+                //
+                // The event is not in the form of Structured Content Mode
+                // then leave it as it is.
+                //
+                // Note that this is true for http binding only.
+                //
+                // More info:
+                //
+                //   https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode
+                //
+                return;
+            }
+
+            try (InputStream is = exchange.getIn().getBody(InputStream.class)) {
+                final Message message = exchange.getIn();
+                final Map<String, Object> ce = Knative.MAPPER.readValue(is, Map.class);
+
+                ifNotEmpty(ce.remove("contentType"), val -> message.setHeader(Exchange.CONTENT_TYPE, val));
+                ifNotEmpty(ce.remove("data"), val -> message.setBody(val));
+
+                //
+                // Map extensions to standard camel headers
+                //
+                ifNotEmpty(ce.remove("extensions"), val -> {
+                    if (val instanceof Map) {
+                        ((Map<String, Object>) val).forEach(message::setHeader);
+                    }
+                });
+
+                ce.forEach((key, val) -> {
+                    message.setHeader("CE-" + StringUtils.capitalize(key), val);
+                });
+            }
+        };
+    };
+}
diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V02.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V02.java
new file mode 100644
index 0000000..321c502
--- /dev/null
+++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V02.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.ce;
+
+import java.io.InputStream;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+import java.util.function.Function;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.knative.Knative;
+import org.apache.camel.component.knative.KnativeEndpoint;
+import org.apache.camel.component.knative.KnativeEnvironment;
+import org.apache.camel.component.knative.KnativeSupport;
+import org.apache.commons.lang3.StringUtils;
+
+import static org.apache.camel.util.ObjectHelper.ifNotEmpty;
+
+final class V02 {
+    private V02() {
+    }
+
+    public static final Function<KnativeEndpoint, Processor> PRODUCER = (KnativeEndpoint endpoint) -> {
+        KnativeEnvironment.KnativeServiceDefinition service = endpoint.getService();
+        String uri = endpoint.getEndpointUri();
+
+        return exchange -> {
+            final String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
+            final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE);
+            final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
+            final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
+            final Map<String, Object> headers = exchange.getIn().getHeaders();
+
+            headers.putIfAbsent("ce-specversion", "0.2");
+            headers.putIfAbsent("ce-type", eventType);
+            headers.putIfAbsent("ce-id", exchange.getExchangeId());
+            headers.putIfAbsent("ce-time", eventTime);
+            headers.putIfAbsent("ce-source", uri);
+            headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
+
+            // Always remove host so it's always computed from the URL and not inherited from the exchange
+            headers.remove("Host");
+        };
+    };
+
+    public static final Function<KnativeEndpoint, Processor> CONSUMER = (KnativeEndpoint endpoint) -> {
+        return exchange -> {
+            if (!KnativeSupport.hasStructuredContent(exchange)) {
+                //
+                // The event is not in the form of Structured Content Mode
+                // then leave it as it is.
+                //
+                // Note that this is true for http binding only.
+                //
+                // More info:
+                //
+                //   https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode
+                //
+                return;
+            }
+
+            try (InputStream is = exchange.getIn().getBody(InputStream.class)) {
+                final Message message = exchange.getIn();
+                final Map<String, Object> ce = Knative.MAPPER.readValue(is, Map.class);
+
+                ifNotEmpty(ce.remove("contentType"), val -> message.setHeader(Exchange.CONTENT_TYPE, val));
+                ifNotEmpty(ce.remove("data"), val -> message.setBody(val));
+
+                //
+                // Map extensions to standard camel headers
+                //
+                ifNotEmpty(ce.remove("extensions"), val -> {
+                    if (val instanceof Map) {
+                        ((Map<String, Object>) val).forEach(message::setHeader);
+                    }
+                });
+
+                ce.forEach((key, val) -> {
+                    message.setHeader("ce-" + StringUtils.lowerCase(key), val);
+                });
+            }
+        };
+    };
+}
diff --git a/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java b/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java
new file mode 100644
index 0000000..f5a9ecc
--- /dev/null
+++ b/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.cloud.ServiceDefinition;
+import org.apache.camel.component.knative.ce.CloudEventsProcessors;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.util.CollectionHelper.mapOf;
+
+public class CloudEventsV01Test {
+
+    private CamelContext context;
+
+    // **************************
+    //
+    // Setup
+    //
+    // **************************
+
+    @BeforeEach
+    public void before() {
+        this.context = new DefaultCamelContext();
+    }
+
+    @AfterEach
+    public void after() throws Exception {
+        if (this.context != null) {
+            this.context.stop();
+        }
+    }
+
+    // **************************
+    //
+    // Tests
+    //
+    // **************************
+
+    @Test
+    void testInvokeEndpoint() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+
+        KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+            new KnativeEnvironment.KnativeServiceDefinition(
+                Knative.Type.endpoint,
+                Knative.Protocol.http,
+                "myEndpoint",
+                "localhost",
+                port,
+                mapOf(
+                    ServiceDefinition.SERVICE_META_PATH, "/a/path",
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                ))
+        ));
+
+        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+        component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
+        component.setEnvironment(env);
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:source")
+                    .to("knative:endpoint/myEndpoint");
+
+                fromF("netty4-http:http://localhost:%d/a/path", port)
+                    .to("mock:ce");
+            }
+        });
+
+        context.start();
+
+        MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+        mock.expectedMessageCount(1);
+        mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
+        mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
+        mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint");
+        mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
+        mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
+        mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID"));
+        mock.expectedBodiesReceived("test");
+
+        context.createProducerTemplate().send(
+            "direct:source",
+            e -> {
+                e.getIn().setBody("test");
+            }
+        );
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    void testConsumeStructuredContent() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+
+        KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+            new KnativeEnvironment.KnativeServiceDefinition(
+                Knative.Type.endpoint,
+                Knative.Protocol.http,
+                "myEndpoint",
+                "localhost",
+                port,
+                mapOf(
+                    ServiceDefinition.SERVICE_META_PATH, "/a/path",
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                ))
+        ));
+
+        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+        component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
+        component.setEnvironment(env);
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("knative:endpoint/myEndpoint")
+                    .to("mock:ce");
+
+                from("direct:source")
+                    .toF("netty4-http:http://localhost:%d/a/path", port);
+            }
+        });
+
+        context.start();
+
+        MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+        mock.expectedMessageCount(1);
+        mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
+        mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
+        mock.expectedHeaderReceived("CE-EventID", "myEventID");
+        mock.expectedHeaderReceived("CE-Source", "/somewhere");
+        mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
+        mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
+        mock.expectedBodiesReceived("test");
+
+        context.createProducerTemplate().send(
+            "direct:source",
+            e -> {
+                e.getIn().setHeader(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
+                e.getIn().setBody(new ObjectMapper().writeValueAsString(mapOf(
+                    "cloudEventsVersion", CloudEventsProcessors.v01.getVersion(),
+                    "eventType", "org.apache.camel.event",
+                    "eventID", "myEventID",
+                    "eventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()),
+                    "source", "/somewhere",
+                    "data", "test"
+                )));
+            }
+        );
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    void testConsumeContent() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+
+        KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+            new KnativeEnvironment.KnativeServiceDefinition(
+                Knative.Type.endpoint,
+                Knative.Protocol.http,
+                "myEndpoint",
+                "localhost",
+                port,
+                mapOf(
+                    ServiceDefinition.SERVICE_META_PATH, "/a/path",
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                ))
+        ));
+
+        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+        component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
+        component.setEnvironment(env);
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("knative:endpoint/myEndpoint")
+                    .to("mock:ce");
+
+                from("direct:source")
+                    .toF("http4://localhost:%d/a/path", port);
+            }
+        });
+
+        context.start();
+
+        MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+        mock.expectedMessageCount(1);
+        mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
+        mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
+        mock.expectedHeaderReceived("CE-EventID", "myEventID");
+        mock.expectedHeaderReceived("CE-Source", "/somewhere");
+        mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
+        mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
+        mock.expectedBodiesReceived("test");
+
+        context.createProducerTemplate().send(
+            "direct:source",
+            e -> {
+                e.getIn().setHeader(Exchange.CONTENT_TYPE, "text/plain");
+                e.getIn().setHeader("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
+                e.getIn().setHeader("CE-EventType", "org.apache.camel.event");
+                e.getIn().setHeader("CE-EventID", "myEventID");
+                e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getIn().setHeader("CE-Source", "/somewhere");
+                e.getIn().setBody("test");
+            }
+        );
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    void testConsumeContentWithFilter() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+
+        KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+            new KnativeEnvironment.KnativeServiceDefinition(
+                Knative.Type.endpoint,
+                Knative.Protocol.http,
+                "ep1",
+                "localhost",
+                port,
+                mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain",
+                    Knative.FILTER_HEADER_NAME, "CE-Source",
+                    Knative.FILTER_HEADER_VALUE, "CE1"
+                )),
+            new KnativeEnvironment.KnativeServiceDefinition(
+                Knative.Type.endpoint,
+                Knative.Protocol.http,
+                "ep2",
+                "localhost",
+                port,
+                mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain",
+                    Knative.FILTER_HEADER_NAME, "CE-Source",
+                    Knative.FILTER_HEADER_VALUE, "CE2"
+                ))
+        ));
+
+        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+        component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
+        component.setEnvironment(env);
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("knative:endpoint/ep1")
+                    .convertBodyTo(String.class)
+                    .to("log:ce1?showAll=true&multiline=true")
+                    .to("mock:ce1");
+                from("knative:endpoint/ep2")
+                    .convertBodyTo(String.class)
+                    .to("log:ce2?showAll=true&multiline=true")
+                    .to("mock:ce2");
+
+                from("direct:source")
+                    .setBody()
+                        .constant("test")
+                    .setHeader(Exchange.HTTP_METHOD)
+                        .constant("POST")
+                    .setHeader(Exchange.HTTP_QUERY)
+                        .simple("filter.headerName=CE-Source&filter.headerValue=${header.FilterVal}")
+                    .toD("http4://localhost:" + port);
+            }
+        });
+
+        context.start();
+
+        MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
+        mock1.expectedMessageCount(1);
+        mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
+        mock1.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
+        mock1.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
+        mock1.expectedHeaderReceived("CE-EventID", "myEventID1");
+        mock1.expectedHeaderReceived("CE-Source", "CE1");
+        mock1.expectedBodiesReceived("test");
+
+        MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
+        mock2.expectedMessageCount(1);
+        mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
+        mock2.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
+        mock2.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
+        mock2.expectedHeaderReceived("CE-EventID", "myEventID2");
+        mock2.expectedHeaderReceived("CE-Source", "CE2");
+        mock2.expectedBodiesReceived("test");
+
+        context.createProducerTemplate().send(
+            "direct:source",
+            e -> {
+                e.getIn().setHeader("FilterVal", "CE1");
+                e.getIn().setHeader("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
+                e.getIn().setHeader("CE-EventType", "org.apache.camel.event");
+                e.getIn().setHeader("CE-EventID", "myEventID1");
+                e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getIn().setHeader("CE-Source", "CE1");
+            }
+        );
+        context.createProducerTemplate().send(
+            "direct:source",
+            e -> {
+                e.getIn().setHeader("FilterVal", "CE2");
+                e.getIn().setHeader("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
+                e.getIn().setHeader("CE-EventType", "org.apache.camel.event");
+                e.getIn().setHeader("CE-EventID", "myEventID2");
+                e.getIn().setHeader("CE-EventTime", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getIn().setHeader("CE-Source", "CE2");
+            }
+        );
+
+        mock1.assertIsSatisfied();
+        mock2.assertIsSatisfied();
+    }
+}
diff --git a/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java b/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java
new file mode 100644
index 0000000..3c32915
--- /dev/null
+++ b/runtime/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.cloud.ServiceDefinition;
+import org.apache.camel.component.knative.ce.CloudEventsProcessors;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.util.CollectionHelper.mapOf;
+
+public class CloudEventsV02Test {
+
+    private CamelContext context;
+
+    // **************************
+    //
+    // Setup
+    //
+    // **************************
+
+    @BeforeEach
+    public void before() {
+        this.context = new DefaultCamelContext();
+    }
+
+    @AfterEach
+    public void after() throws Exception {
+        if (this.context != null) {
+            this.context.stop();
+        }
+    }
+
+    // **************************
+    //
+    // Tests
+    //
+    // **************************
+    
+    @Test
+    void testInvokeEndpoint() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+
+        KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+            new KnativeEnvironment.KnativeServiceDefinition(
+                Knative.Type.endpoint,
+                Knative.Protocol.http,
+                "myEndpoint",
+                "localhost",
+                port,
+                mapOf(
+                    ServiceDefinition.SERVICE_META_PATH, "/a/path",
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                ))
+        ));
+
+        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+        component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion());
+        component.setEnvironment(env);
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:source")
+                    .to("knative:endpoint/myEndpoint");
+
+                fromF("netty4-http:http://localhost:%d/a/path", port)
+                    .to("mock:ce");
+            }
+        });
+
+        context.start();
+
+        MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+        mock.expectedMessageCount(1);
+        mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
+        mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
+        mock.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint");
+        mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
+        mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
+        mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id"));
+        mock.expectedBodiesReceived("test");
+
+        context.createProducerTemplate().send(
+            "direct:source",
+            e -> {
+                e.getIn().setBody("test");
+            }
+        );
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    void testConsumeStructuredContent() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+
+        KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+            new KnativeEnvironment.KnativeServiceDefinition(
+                Knative.Type.endpoint,
+                Knative.Protocol.http,
+                "myEndpoint",
+                "localhost",
+                port,
+                mapOf(
+                    ServiceDefinition.SERVICE_META_PATH, "/a/path",
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                ))
+        ));
+
+        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+        component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion());
+        component.setEnvironment(env);
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("knative:endpoint/myEndpoint")
+                    .to("mock:ce");
+
+                from("direct:source")
+                    .toF("netty4-http:http://localhost:%d/a/path", port);
+            }
+        });
+
+        context.start();
+
+        MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+        mock.expectedMessageCount(1);
+        mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
+        mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
+        mock.expectedHeaderReceived("ce-id", "myEventID");
+        mock.expectedHeaderReceived("ce-source", "/somewhere");
+        mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
+        mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
+        mock.expectedBodiesReceived("test");
+
+        context.createProducerTemplate().send(
+            "direct:source",
+            e -> {
+                e.getIn().setHeader(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
+                e.getIn().setBody(new ObjectMapper().writeValueAsString(mapOf(
+                    "specversion", CloudEventsProcessors.v02.getVersion(),
+                    "type", "org.apache.camel.event",
+                    "id", "myEventID",
+                    "time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()),
+                    "source", "/somewhere",
+                    "data", "test"
+                )));
+            }
+        );
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    void testConsumeContent() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+
+        KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+            new KnativeEnvironment.KnativeServiceDefinition(
+                Knative.Type.endpoint,
+                Knative.Protocol.http,
+                "myEndpoint",
+                "localhost",
+                port,
+                mapOf(
+                    ServiceDefinition.SERVICE_META_PATH, "/a/path",
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                ))
+        ));
+
+        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+        component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion());
+        component.setEnvironment(env);
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("knative:endpoint/myEndpoint")
+                    .to("mock:ce");
+
+                from("direct:source")
+                    .toF("http4://localhost:%d/a/path", port);
+            }
+        });
+
+        context.start();
+
+        MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
+        mock.expectedMessageCount(1);
+        mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
+        mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
+        mock.expectedHeaderReceived("ce-id", "myEventID");
+        mock.expectedHeaderReceived("ce-source", "/somewhere");
+        mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
+        mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
+        mock.expectedBodiesReceived("test");
+
+        context.createProducerTemplate().send(
+            "direct:source",
+            e -> {
+                e.getIn().setHeader(Exchange.CONTENT_TYPE, "text/plain");
+                e.getIn().setHeader("ce-specversion", CloudEventsProcessors.v02.getVersion());
+                e.getIn().setHeader("ce-type", "org.apache.camel.event");
+                e.getIn().setHeader("ce-id", "myEventID");
+                e.getIn().setHeader("ce-time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getIn().setHeader("ce-source", "/somewhere");
+                e.getIn().setBody("test");
+            }
+        );
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    void testConsumeContentWithFilter() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+
+        KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
+            new KnativeEnvironment.KnativeServiceDefinition(
+                Knative.Type.endpoint,
+                Knative.Protocol.http,
+                "ep1",
+                "localhost",
+                port,
+                mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain",
+                    Knative.FILTER_HEADER_NAME, "ce-source",
+                    Knative.FILTER_HEADER_VALUE, "CE1"
+                )),
+            new KnativeEnvironment.KnativeServiceDefinition(
+                Knative.Type.endpoint,
+                Knative.Protocol.http,
+                "ep2",
+                "localhost",
+                port,
+                mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain",
+                    Knative.FILTER_HEADER_NAME, "ce-source",
+                    Knative.FILTER_HEADER_VALUE, "CE2"
+                ))
+        ));
+
+        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+        component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion());
+        component.setEnvironment(env);
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("knative:endpoint/ep1")
+                    .convertBodyTo(String.class)
+                    .to("log:ce1?showAll=true&multiline=true")
+                    .to("mock:ce1");
+                from("knative:endpoint/ep2")
+                    .convertBodyTo(String.class)
+                    .to("log:ce2?showAll=true&multiline=true")
+                    .to("mock:ce2");
+
+                from("direct:source")
+                    .setBody()
+                        .constant("test")
+                    .setHeader(Exchange.HTTP_METHOD)
+                        .constant("POST")
+                    .setHeader(Exchange.HTTP_QUERY)
+                        .simple("filter.headerName=ce-source&filter.headerValue=${header.FilterVal}")
+                    .toD("http4://localhost:" + port);
+            }
+        });
+
+        context.start();
+
+        MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
+        mock1.expectedMessageCount(1);
+        mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
+        mock1.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
+        mock1.expectedHeaderReceived("ce-type", "org.apache.camel.event");
+        mock1.expectedHeaderReceived("ce-id", "myEventID1");
+        mock1.expectedHeaderReceived("ce-source", "CE1");
+        mock1.expectedBodiesReceived("test");
+
+        MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
+        mock2.expectedMessageCount(1);
+        mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
+        mock2.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
+        mock2.expectedHeaderReceived("ce-type", "org.apache.camel.event");
+        mock2.expectedHeaderReceived("ce-id", "myEventID2");
+        mock2.expectedHeaderReceived("ce-source", "CE2");
+        mock2.expectedBodiesReceived("test");
+
+        context.createProducerTemplate().send(
+            "direct:source",
+            e -> {
+                e.getIn().setHeader("FilterVal", "CE1");
+                e.getIn().setHeader("ce-specversion", CloudEventsProcessors.v02.getVersion());
+                e.getIn().setHeader("ce-type", "org.apache.camel.event");
+                e.getIn().setHeader("ce-id", "myEventID1");
+                e.getIn().setHeader("ce-time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getIn().setHeader("ce-source", "CE1");
+            }
+        );
+        context.createProducerTemplate().send(
+            "direct:source",
+            e -> {
+                e.getIn().setHeader("FilterVal", "CE2");
+                e.getIn().setHeader("ce-specversion", CloudEventsProcessors.v02.getVersion());
+                e.getIn().setHeader("ce-type", "org.apache.camel.event");
+                e.getIn().setHeader("ce-id", "myEventID2");
+                e.getIn().setHeader("ce-time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getIn().setHeader("ce-source", "CE2");
+            }
+        );
+
+        mock1.assertIsSatisfied();
+        mock2.assertIsSatisfied();
+    }
+}