You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2019/10/25 09:14:02 UTC

[camel-k-runtime] branch master updated: Wrong CloudEvent Headers used #173

This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git


The following commit(s) were added to refs/heads/master by this push:
     new f3453ff   Wrong CloudEvent Headers used #173
     new 34c792b  Merge pull request #174 from lburgazzoli/github-173
f3453ff is described below

commit f3453ffc757f592789026cf24804f9a64c5bc74a
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Fri Oct 25 01:37:08 2019 +0200

     Wrong CloudEvent Headers used #173
---
 .../camel/component/knative/spi/CloudEvent.java    |  15 ++
 .../camel/component/knative/spi/CloudEvents.java   |  50 ++--
 .../component/knative/http/KnativeHttpTest.java    | 261 +++++++++++++--------
 .../camel/component/knative/KnativeEndpoint.java   |   3 +-
 .../knative/ce/AbstractCloudEventProcessor.java    |  36 ++-
 .../component/knative/ce/CloudEventProcessors.java |  12 +-
 6 files changed, 224 insertions(+), 153 deletions(-)

diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java
index df17494..bc92f38 100644
--- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java
@@ -20,7 +20,22 @@ import java.util.Collection;
 import java.util.Objects;
 import java.util.Optional;
 
+import org.apache.camel.Exchange;
+
 public interface CloudEvent {
+    String CAMEL_CLOUD_EVENT_ID = "CamelCloudEventID";
+    String CAMEL_CLOUD_EVENT_SOURCE = "CamelCloudEventSource";
+    String CAMEL_CLOUD_EVENT_VERSION = "CamelCloudEventVersion";
+    String CAMEL_CLOUD_EVENT_TYPE = "CamelCloudEventType";
+    String CAMEL_CLOUD_EVENT_TYPE_VERSION = "CamelCloudEventTypeVersion";
+    String CAMEL_CLOUD_EVENT_DATA_CONTENT_TYPE = "CamelCloudEventDataContentType";
+    String CAMEL_CLOUD_EVENT_DATA_CONTENT_ENCODING = "CamelCloudEventDataContentEncoding";
+    String CAMEL_CLOUD_EVENT_SCHEMA_URL = "CamelCloudEventSchemaURL";
+    String CAMEL_CLOUD_EVENT_SUBJECT = "CamelCloudEventSubject";
+    String CAMEL_CLOUD_EVENT_TIME = "CamelCloudEventTime";
+    String CAMEL_CLOUD_EVENT_EXTENSIONS = "CamelCloudEventExtensions";
+    String CAMEL_CLOUD_EVENT_CONTENT_TYPE = Exchange.CONTENT_TYPE;
+
     /**
      * The CloudEvent spec version.
      */
diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
index 7d7e7a9..dd8ab96 100644
--- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
@@ -27,15 +27,15 @@ public enum CloudEvents implements CloudEvent {
     V01(new CloudEventImpl(
         "0.1",
         Arrays.asList(
-            Attribute.simple("type", "CE-EventType", "eventType"),
-            Attribute.simple("type.version", "CE-EventTypeVersion", "eventTypeVersion"),
-            Attribute.simple("version", "CE-CloudEventsVersion", "cloudEventsVersion"),
-            Attribute.simple("source", "CE-Source", "source"),
-            Attribute.simple("id", "CE-EventID", "eventID"),
-            Attribute.simple("time", "CE-EventTime", "eventTime"),
-            Attribute.simple("schema.url", "CE-SchemaURL", "schemaURL"),
-            Attribute.simple("content.type", "ContentType", "contentType"),
-            Attribute.simple("extensions", "CE-Extensions", "extensions")
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "CE-EventType", "eventType"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TYPE_VERSION, "CE-EventTypeVersion", "eventTypeVersion"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, "CE-CloudEventsVersion", "cloudEventsVersion"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "CE-Source", "source"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_ID, "CE-EventID", "eventID"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TIME, "CE-EventTime", "eventTime"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SCHEMA_URL, "CE-SchemaURL", "schemaURL"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_CONTENT_TYPE, "ContentType", "contentType"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_EXTENSIONS, "CE-Extensions", "extensions")
         )
     )),
     //
@@ -44,13 +44,13 @@ public enum CloudEvents implements CloudEvent {
     V02(new CloudEventImpl(
         "0.2",
         Arrays.asList(
-            Attribute.simple("type", "ce-type", "type"),
-            Attribute.simple("version", "ce-specversion", "specversion"),
-            Attribute.simple("source", "ce-source", "source"),
-            Attribute.simple("id", "ce-id", "id"),
-            Attribute.simple("time", "ce-time", "time"),
-            Attribute.simple("schema.url", "ce-schemaurl", "schemaurl"),
-            Attribute.simple("content.type", "Content-Type", "contenttype")
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "ce-type", "type"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, "ce-specversion", "specversion"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "ce-source", "source"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_ID, "ce-id", "id"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TIME, "ce-time", "time"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SCHEMA_URL, "ce-schemaurl", "schemaurl"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_CONTENT_TYPE, "Content-Type", "contenttype")
         )
     )),
     //
@@ -59,15 +59,15 @@ public enum CloudEvents implements CloudEvent {
     V03(new CloudEventImpl(
         "0.3",
         Arrays.asList(
-            Attribute.simple("id", "ce-id", "id"),
-            Attribute.simple("source", "ce-source", "source"),
-            Attribute.simple("version", "ce-specversion", "specversion"),
-            Attribute.simple("type", "ce-type", "type"),
-            Attribute.simple("data.content.encoding", "ce-datacontentencoding", "datacontentencoding"),
-            Attribute.simple("data.content.type", "ce-datacontenttype", "datacontenttype"),
-            Attribute.simple("schema.url", "ce-schemaurl", "schemaurl"),
-            Attribute.simple("subject", "ce-subject", "subject"),
-            Attribute.simple("time", "ce-time", "time")
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_ID, "ce-id", "id"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, "ce-source", "source"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, "ce-specversion", "specversion"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "ce-type", "type"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_DATA_CONTENT_ENCODING, "ce-datacontentencoding", "datacontentencoding"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_DATA_CONTENT_TYPE, "ce-datacontenttype", "datacontenttype"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SCHEMA_URL, "ce-schemaurl", "schemaurl"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SUBJECT, "ce-subject", "subject"),
+            Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TIME, "ce-time", "time")
         )
     ));
 
diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index 6e0eb8d..e9f13df 100644
--- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -22,11 +22,15 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.undertow.Undertow;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.HeaderMap;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
@@ -136,12 +140,12 @@ public class KnativeHttpTest {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "knative://endpoint/myEndpoint");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version());
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "knative://endpoint/myEndpoint");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("id").id()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http()));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -181,12 +185,12 @@ public class KnativeHttpTest {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID");
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "/somewhere");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version());
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "/somewhere");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -266,12 +270,12 @@ public class KnativeHttpTest {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID");
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "/somewhere");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version());
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "/somewhere");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -279,11 +283,11 @@ public class KnativeHttpTest {
             "direct:source",
             e -> {
                 e.getMessage().setHeader(Exchange.CONTENT_TYPE, "text/plain");
-                e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version());
-                e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
-                e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID");
-                e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
-                e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "/somewhere");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version());
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere");
                 e.getMessage().setBody("test");
             }
         );
@@ -305,7 +309,7 @@ public class KnativeHttpTest {
                 KnativeSupport.mapOf(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
-                    Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE1"
+                    Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1"
                 )),
             endpoint(
                 Knative.EndpointKind.source,
@@ -315,7 +319,7 @@ public class KnativeHttpTest {
                 KnativeSupport.mapOf(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
-                    Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE2"
+                    Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2"
                 ))
         );
 
@@ -343,41 +347,41 @@ public class KnativeHttpTest {
         context.start();
 
         MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
-        mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
-        mock1.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
-        mock1.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
-        mock1.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID1");
-        mock1.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE1");
+        mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version());
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event");
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID1");
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE1");
         mock1.expectedBodiesReceived("test");
         mock1.expectedMessageCount(1);
 
         MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
-        mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
-        mock2.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
-        mock2.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
-        mock2.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID2");
-        mock2.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE2");
+        mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version());
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event");
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID2");
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE2");
         mock2.expectedBodiesReceived("test");
         mock2.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
             e -> {
-                e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version());
-                e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
-                e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID1");
-                e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
-                e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE1");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version());
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1");
             }
         );
         context.createProducerTemplate().send(
             "direct:source",
             e -> {
-                e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version());
-                e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
-                e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID2");
-                e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
-                e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE2");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version());
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2");
             }
         );
 
@@ -399,7 +403,7 @@ public class KnativeHttpTest {
                 KnativeSupport.mapOf(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
-                    Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE[01234]"
+                    Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE[01234]"
                 )),
             endpoint(
                 Knative.EndpointKind.source,
@@ -409,7 +413,7 @@ public class KnativeHttpTest {
                 KnativeSupport.mapOf(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
-                    Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE[56789]"
+                    Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE[56789]"
                 ))
         );
 
@@ -437,41 +441,41 @@ public class KnativeHttpTest {
         context.start();
 
         MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
-        mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
-        mock1.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
-        mock1.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
-        mock1.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID1");
-        mock1.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE0");
+        mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version());
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event");
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID1");
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE0");
         mock1.expectedBodiesReceived("test");
         mock1.expectedMessageCount(1);
 
         MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
-        mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
-        mock2.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
-        mock2.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
-        mock2.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID2");
-        mock2.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE5");
+        mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version());
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event");
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID2");
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE5");
         mock2.expectedBodiesReceived("test");
         mock2.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
             e -> {
-                e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version());
-                e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
-                e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID1");
-                e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
-                e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE0");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version());
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE0");
             }
         );
         context.createProducerTemplate().send(
             "direct:source",
             e -> {
-                e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version());
-                e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
-                e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID2");
-                e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
-                e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE5");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version());
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE5");
             }
         );
 
@@ -516,41 +520,41 @@ public class KnativeHttpTest {
         context.start();
 
         MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
-        mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
-        mock1.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
-        mock1.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "event1");
-        mock1.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID1");
-        mock1.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE1");
+        mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version());
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "event1");
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID1");
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE1");
         mock1.expectedBodiesReceived("test");
         mock1.expectedMessageCount(1);
 
         MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
-        mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
-        mock2.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
-        mock2.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "event2");
-        mock2.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID2");
-        mock2.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE2");
+        mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version());
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "event2");
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID2");
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "CE2");
         mock2.expectedBodiesReceived("test");
         mock2.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
             e -> {
-                e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version());
-                e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "event1");
-                e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID1");
-                e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
-                e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE1");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version());
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event1");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1");
             }
         );
         context.createProducerTemplate().send(
             "direct:source",
             e -> {
-                e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version());
-                e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "event2");
-                e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID2");
-                e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
-                e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE2");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version());
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event2");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2");
             }
         );
 
@@ -865,11 +869,11 @@ public class KnativeHttpTest {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "myEvent");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version());
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "myEvent");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("id").id()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id()));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -922,11 +926,11 @@ public class KnativeHttpTest {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "myEvent");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version());
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "myEvent");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("id").id()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id()));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -978,12 +982,12 @@ public class KnativeHttpTest {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID");
-        mock.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "/somewhere");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).id(), ce.version());
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).id(), "org.apache.camel.event");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).id(), "myEventID");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).id(), "/somewhere");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).id()));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -991,11 +995,11 @@ public class KnativeHttpTest {
             "direct:source",
             e -> {
                 e.getMessage().setHeader(Exchange.CONTENT_TYPE, "text/plain");
-                e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version());
-                e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
-                e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID");
-                e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
-                e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "/somewhere");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version());
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID");
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getMessage().setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere");
                 e.getMessage().setBody("test");
             }
         );
@@ -1191,5 +1195,58 @@ public class KnativeHttpTest {
             assertThat(result.getMessage().getBody()).isEqualTo(definition.getName());
         }
     }
+
+    @ParameterizedTest
+    @MethodSource("provideCloudEventsImplementations")
+    void testHeaders(CloudEvent ce) throws Exception {
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
+                Knative.EndpointKind.sink,
+                "ep",
+                "localhost",
+                port,
+                KnativeSupport.mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                )
+            )
+        );
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();
+
+        Undertow server = Undertow.builder()
+            .addHttpListener(port, "localhost")
+            .setHandler(se -> {
+                exchange.set(se);
+                latch.countDown();
+            })
+            .build();
+
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("direct:start")
+                .to("knative:endpoint/ep");
+        });
+
+        context.start();
+        try {
+            server.start();
+            template.sendBody("direct:start", "");
+
+            latch.await();
+
+            HeaderMap headers = exchange.get().getRequestHeaders();
+
+            assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
+            assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("org.apache.camel.event");
+            assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
+            assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
+            assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
+        }  finally {
+            server.stop();
+        }
+    }
 }
 
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index 087286d..d06adf9 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -26,6 +26,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.knative.ce.CloudEventProcessor;
 import org.apache.camel.component.knative.ce.CloudEventProcessors;
+import org.apache.camel.component.knative.spi.CloudEvent;
 import org.apache.camel.component.knative.spi.Knative;
 import org.apache.camel.component.knative.spi.KnativeEnvironment;
 import org.apache.camel.processor.Pipeline;
@@ -159,7 +160,7 @@ public class KnativeEndpoint extends DefaultEndpoint {
 
         if (service.get().getType() == Knative.Type.event) {
             metadata.put(Knative.KNATIVE_EVENT_TYPE, serviceName);
-            metadata.put(Knative.KNATIVE_FILTER_PREFIX + cloudEvent.cloudEvent().mandatoryAttribute("type").id(), serviceName);
+            metadata.put(Knative.KNATIVE_FILTER_PREFIX + cloudEvent.cloudEvent().mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), serviceName);
         }
 
         return new KnativeEnvironment.KnativeServiceDefinition(
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
index 5e0ca0d..6d1af26 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
@@ -51,21 +51,19 @@ abstract class AbstractCloudEventProcessor implements CloudEventProcessor {
             }
 
             if (!Objects.equals(exchange.getIn().getHeader(Exchange.CONTENT_TYPE), Knative.MIME_STRUCTURED_CONTENT_MODE)) {
-                //
-                // The event is not in the form of Structured Content Mode
-                // then leave it as it is.
-                //
-                // Note that this is true for http binding only.
-                //
-                // More info:
-                //
-                //   https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode
-                //
-                return;
-            }
+                final CloudEvent ce = cloudEvent();
+                final Map<String, Object> headers = exchange.getIn().getHeaders();
 
-            try (InputStream is = exchange.getIn().getBody(InputStream.class)) {
-                decodeStructuredContent(exchange, Knative.MAPPER.readValue(is, Map.class));
+                for (CloudEvent.Attribute attribute: ce.attributes()) {
+                    Object val = headers.remove(attribute.http());
+                    if (val != null) {
+                        headers.put(attribute.id(), val);
+                    }
+                }
+            } else {
+                try (InputStream is = exchange.getIn().getBody(InputStream.class)) {
+                    decodeStructuredContent(exchange, Knative.MAPPER.readValue(is, Map.class));
+                }
             }
         };
     }
@@ -87,11 +85,11 @@ abstract class AbstractCloudEventProcessor implements CloudEventProcessor {
             final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
             final Map<String, Object> headers = exchange.getIn().getHeaders();
 
-            headers.putIfAbsent(ce.mandatoryAttribute("id").id(), exchange.getExchangeId());
-            headers.putIfAbsent(ce.mandatoryAttribute("source").id(), endpoint.getEndpointUri());
-            headers.putIfAbsent(ce.mandatoryAttribute("version").id(), ce.version());
-            headers.putIfAbsent(ce.mandatoryAttribute("type").id(), eventType);
-            headers.putIfAbsent(ce.mandatoryAttribute("time").id(), eventTime);
+            headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), exchange.getExchangeId());
+            headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), endpoint.getEndpointUri());
+            headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version());
+            headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), eventType);
+            headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), eventTime);
             headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
         };
     }
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java
index a91a64d..2d5f499 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java
@@ -41,14 +41,14 @@ public enum CloudEventProcessors implements CloudEventProcessor {
             // body
             ifNotEmpty(content.remove("data"), message::setBody);
 
-            ifNotEmpty(content.remove(ce.mandatoryAttribute("content.type").json()), val -> {
+            ifNotEmpty(content.remove(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_CONTENT_TYPE).json()), val -> {
                 message.setHeader(Exchange.CONTENT_TYPE, val);
             });
 
             //
             // Map extensions to standard camel headers
             //
-            ifNotEmpty(content.remove("extensions"), val -> {
+            ifNotEmpty(content.remove(CloudEvent.CAMEL_CLOUD_EVENT_EXTENSIONS), val -> {
                 if (val instanceof Map) {
                     ((Map<String, Object>) val).forEach(message::setHeader);
                 }
@@ -70,7 +70,7 @@ public enum CloudEventProcessors implements CloudEventProcessor {
             // body
             ifNotEmpty(content.remove("data"), message::setBody);
 
-            ifNotEmpty(content.remove(ce.mandatoryAttribute("content.type").json()), val -> {
+            ifNotEmpty(content.remove(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_CONTENT_TYPE).json()), val -> {
                 message.setHeader(Exchange.CONTENT_TYPE, val);
             });
 
@@ -98,11 +98,11 @@ public enum CloudEventProcessors implements CloudEventProcessor {
             // body
             ifNotEmpty(content.remove("data"), message::setBody);
 
-            ifNotEmpty(content.remove(ce.mandatoryAttribute("data.content.type").json()), val -> {
+            ifNotEmpty(content.remove(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_DATA_CONTENT_TYPE).json()), val -> {
                 message.setHeader(Exchange.CONTENT_TYPE, val);
             });
-            ifNotEmpty(content.remove(ce.mandatoryAttribute("data.content.encoding").json()), val -> {
-                message.setBody(val);
+            ifNotEmpty(content.remove(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_DATA_CONTENT_ENCODING).json()), val -> {
+                message.setHeader(Exchange.CONTENT_ENCODING, val);
             });
 
             for (CloudEvent.Attribute attribute: ce.attributes()) {