You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/07/17 11:19:02 UTC

[camel-k-runtime] branch master updated: camel-knative: don't replace message when the producers completes #399

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git


The following commit(s) were added to refs/heads/master by this push:
     new b72dc22  camel-knative: don't replace message when the producers completes #399
b72dc22 is described below

commit b72dc22bd925e07c598ea512078b3f492b45534f
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Thu Jul 16 11:17:39 2020 +0200

    camel-knative: don't replace message when the producers completes #399
---
 .../knative/http/KnativeHttpConsumer.java          |   5 +
 .../knative/http/KnativeHttpProducer.java          |   9 +-
 .../component/knative/http/KnativeHttpSupport.java |  51 +--
 .../knative/http/KnativeHttpTransport.java         |   8 +-
 .../component/knative/http/KnativeHttpServer.java  |  33 +-
 .../component/knative/http/KnativeHttpTest.java    | 507 +++++++++++----------
 .../knative/http/KnativeHttpTestSupport.java       |   4 +
 .../http/assertions/HttpServerRequestAssert.java   |  66 +++
 .../component/knative/KnativeConfiguration.java    |   6 +-
 9 files changed, 405 insertions(+), 284 deletions(-)

diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
index fc83036..fb60431 100644
--- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
@@ -287,6 +287,11 @@ public class KnativeHttpConsumer extends DefaultConsumer {
                     }
                 }
             }
+
+            KnativeHttpSupport.remapCloudEventHeaders(configuration.getCloudEvent(), message);
+            if (configuration.isRemoveCloudEventHeadersInReply()) {
+                KnativeHttpSupport.removeCloudEventHeaders(configuration.getCloudEvent(), message);
+            }
         }
 
         return response;
diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
index 67045d1..ca23300 100644
--- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
@@ -35,7 +35,6 @@ import org.apache.camel.Message;
 import org.apache.camel.component.knative.spi.KnativeEnvironment;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.support.DefaultAsyncProducer;
-import org.apache.camel.support.DefaultMessage;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
@@ -117,18 +116,20 @@ public class KnativeHttpProducer extends DefaultAsyncProducer {
             .sendBuffer(Buffer.buffer(payload), response -> {
                 if (response.succeeded()) {
                     HttpResponse<Buffer> result = response.result();
+                    Message answer = exchange.getMessage();
 
-                    Message answer = new DefaultMessage(exchange.getContext());
                     answer.setHeader(Exchange.HTTP_RESPONSE_CODE, result.statusCode());
 
                     for (Map.Entry<String, String> entry : result.headers().entries()) {
                         if (!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
-                            KnativeHttpSupport.appendHeader(answer.getHeaders(), entry.getKey(), entry.getValue());
+                            answer.setHeader(entry.getKey(), entry.getValue());
                         }
                     }
 
                     if (result.body() != null) {
                         answer.setBody(result.body().getBytes());
+                    } else {
+                        answer.setBody(null);
                     }
 
                     if (result.statusCode() < 200 || result.statusCode() >= 300) {
@@ -143,8 +144,6 @@ public class KnativeHttpProducer extends DefaultAsyncProducer {
                     }
 
                     answer.setHeader(Exchange.HTTP_RESPONSE_CODE, result.statusCode());
-
-                    exchange.setMessage(answer);
                 } else if (response.failed()) {
                     String exceptionMessage = "HTTP operation failed invoking " + URISupport.sanitizeUri(this.uri.get());
                     if (response.result() != null) {
diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
index c74e0ab..67bb64f 100644
--- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
@@ -24,14 +24,10 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import io.vertx.core.http.HttpServerRequest;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
 import org.apache.camel.Message;
-import org.apache.camel.Processor;
 import org.apache.camel.component.knative.spi.CloudEvent;
 import org.apache.camel.component.knative.spi.Knative;
 import org.apache.camel.component.knative.spi.KnativeEnvironment;
-import org.apache.camel.support.processor.DelegateAsyncProcessor;
 
 public final class KnativeHttpSupport {
     private KnativeHttpSupport() {
@@ -97,46 +93,25 @@ public final class KnativeHttpSupport {
     /**
      * Removes cloud event headers at the end of the processing.
      */
-    public static Processor withoutCloudEventHeaders(Processor delegate, CloudEvent ce) {
-        return new DelegateAsyncProcessor(delegate) {
-            @Override
-            public boolean process(Exchange exchange, AsyncCallback callback) {
-                return processor.process(exchange, doneSync -> {
-                    final Message message = exchange.getMessage();
-
-                    // remove CloudEvent headers
-                    for (CloudEvent.Attribute attr : ce.attributes()) {
-                        message.removeHeader(attr.http());
-                    }
-
-                    callback.done(doneSync);
-                });
-            }
-        };
+    public static void removeCloudEventHeaders(CloudEvent ce, Message message) {
+        // remove CloudEvent headers
+        for (CloudEvent.Attribute attr : ce.attributes()) {
+            message.removeHeader(attr.http());
+            message.removeHeader(attr.id());
+        }
     }
 
     /**
      * Remap camel headers to cloud event http headers.
      */
-    public static Processor remapCloudEventHeaders(Processor delegate, CloudEvent ce) {
-        return new DelegateAsyncProcessor(delegate) {
-            @Override
-            public boolean process(Exchange exchange, AsyncCallback callback) {
-                return processor.process(exchange, doneSync -> {
-                    final Message message = exchange.getMessage();
-
-                    // remap CloudEvent camel --> http
-                    for (CloudEvent.Attribute attr : ce.attributes()) {
-                        Object value = message.getHeader(attr.id());
-                        if (value != null) {
-                            message.setHeader(attr.http(), value);
-                        }
-                    }
-
-                    callback.done(doneSync);
-                });
+    public static void remapCloudEventHeaders(CloudEvent ce, Message message) {
+        // remap CloudEvent camel --> http
+        for (CloudEvent.Attribute attr : ce.attributes()) {
+            Object value = message.getHeader(attr.id());
+            if (value != null) {
+                message.setHeader(attr.http(), value);
             }
-        };
+        }
     }
 
 }
diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
index af3f7d4..a9b0613 100644
--- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpTransport.java
@@ -96,13 +96,7 @@ public class KnativeHttpTransport extends ServiceSupport implements CamelContext
 
     @Override
     public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeServiceDefinition service, Processor processor) {
-        Processor next = KnativeHttpSupport.remapCloudEventHeaders(processor, config.getCloudEvent());
-
-        if (config.isRemoveCloudEventHeadersInReply()) {
-            next = KnativeHttpSupport.withoutCloudEventHeaders(next, config.getCloudEvent());
-        }
-
-        return new KnativeHttpConsumer(config, endpoint, service, this.router, next);
+        return new KnativeHttpConsumer(config, endpoint, service, this.router, processor);
     }
 
 }
diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java
index cf3a507..eecbfe6 100644
--- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpServer.java
@@ -33,6 +33,7 @@ import io.vertx.ext.web.RoutingContext;
 import io.vertx.ext.web.handler.BodyHandler;
 import org.apache.camel.CamelContext;
 import org.apache.camel.component.platform.http.PlatformHttpConstants;
+import org.apache.camel.k.test.AvailablePortFinder;
 import org.apache.camel.support.service.ServiceSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,13 +45,17 @@ public class KnativeHttpServer extends ServiceSupport {
     private final String host;
     private final int port;
     private final String path;
+    private final BlockingQueue<HttpServerRequest> requests;
+    private final Handler<RoutingContext> handler;
 
     private Vertx vertx;
     private Router router;
     private ExecutorService executor;
     private HttpServer server;
-    private BlockingQueue<HttpServerRequest> requests;
-    private Handler<RoutingContext> handler;
+
+    public KnativeHttpServer(CamelContext context) {
+        this(context, "localhost", AvailablePortFinder.getNextAvailable(), "/", null);
+    }
 
     public KnativeHttpServer(CamelContext context, int port) {
         this(context, "localhost", port, "/", null);
@@ -60,10 +65,22 @@ public class KnativeHttpServer extends ServiceSupport {
         this(context, "localhost", port, "/", handler);
     }
 
+    public KnativeHttpServer(CamelContext context, Handler<RoutingContext> handler) {
+        this(context, "localhost", AvailablePortFinder.getNextAvailable(), "/", handler);
+    }
+
     public KnativeHttpServer(CamelContext context, String host, int port, String path) {
         this(context, host, port, path, null);
     }
 
+    public KnativeHttpServer(CamelContext context, String host, String path) {
+        this(context, host, AvailablePortFinder.getNextAvailable(), path, null);
+    }
+
+    public KnativeHttpServer(CamelContext context, String host, String path, Handler<RoutingContext> handler) {
+        this(context, host, AvailablePortFinder.getNextAvailable(), path, handler);
+    }
+
     public KnativeHttpServer(CamelContext context, String host, int port, String path, Handler<RoutingContext> handler) {
         this.context = context;
         this.host = host;
@@ -78,6 +95,18 @@ public class KnativeHttpServer extends ServiceSupport {
             };
     }
 
+    public String getHost() {
+        return host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
     public HttpServerRequest poll(int timeout, TimeUnit unit) throws InterruptedException {
         return requests.poll(timeout, unit);
     }
diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index caaf91b..63e18cb 100644
--- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -57,13 +57,13 @@ import static io.restassured.RestAssured.config;
 import static io.restassured.RestAssured.given;
 import static io.restassured.config.EncoderConfig.encoderConfig;
 import static org.apache.camel.component.knative.http.KnativeHttpTestSupport.configureKnativeComponent;
+import static org.apache.camel.component.knative.http.KnativeHttpTestSupport.httpAttribute;
 import static org.apache.camel.component.knative.spi.KnativeEnvironment.channel;
 import static org.apache.camel.component.knative.spi.KnativeEnvironment.endpoint;
 import static org.apache.camel.component.knative.spi.KnativeEnvironment.event;
 import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceChannel;
 import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceEndpoint;
 import static org.apache.camel.component.knative.spi.KnativeEnvironment.sourceEvent;
-import static org.apache.camel.util.CollectionHelper.mapOf;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.hamcrest.Matchers.emptyOrNullString;
 import static org.hamcrest.Matchers.is;
@@ -73,6 +73,7 @@ public class KnativeHttpTest {
     private CamelContext context;
     private ProducerTemplate template;
     private int platformHttpPort;
+    private String platformHttpHost;
 
     // **************************
     //
@@ -84,6 +85,7 @@ public class KnativeHttpTest {
     public void before() {
         this.context = new DefaultCamelContext();
         this.template = this.context.createProducerTemplate();
+        this.platformHttpHost = "localhost";
         this.platformHttpPort = AvailablePortFinder.getNextAvailable();
 
         PlatformHttpServiceContextCustomizer httpService = new PlatformHttpServiceContextCustomizer();
@@ -124,8 +126,8 @@ public class KnativeHttpTest {
             ce,
             sourceEndpoint(
                 "myEndpoint",
-                mapOf(
-                    Knative.SERVICE_META_PATH, path,
+                Map.of(
+                    Knative.SERVICE_META_PATH, ObjectHelper.supplyIfEmpty(path, () -> "/"),
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 ))
@@ -160,11 +162,11 @@ public class KnativeHttpTest {
         given()
             .body("test")
             .header(Exchange.CONTENT_TYPE, "text/plain")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
         .when()
             .post(targetPath)
         .then()
@@ -206,9 +208,9 @@ public class KnativeHttpTest {
             endpoint(
                 Knative.EndpointKind.sink,
                 "myEndpoint",
-                "localhost",
+                platformHttpHost,
                 platformHttpPort,
-                mapOf(
+                Map.of(
                     Knative.SERVICE_META_PATH, "/a/path",
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
@@ -225,12 +227,12 @@ public class KnativeHttpTest {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version());
-        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event");
-        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "knative://endpoint/myEndpoint");
+        mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version());
+        mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event");
+        mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "knative://endpoint/myEndpoint");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http()));
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME)));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID)));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -250,7 +252,7 @@ public class KnativeHttpTest {
                 "myEndpoint",
                 "none",
                 -1,
-                mapOf(
+                Map.of(
                     Knative.SERVICE_META_PATH, "/does/not/exist",
                     Knative.SERVICE_META_URL, String.format("http://localhost:%d/a/path", platformHttpPort),
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
@@ -268,12 +270,12 @@ public class KnativeHttpTest {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version());
-        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event");
-        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "knative://endpoint/myEndpoint");
+        mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version());
+        mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event");
+        mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "knative://endpoint/myEndpoint");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http()));
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME)));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID)));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -291,7 +293,7 @@ public class KnativeHttpTest {
             ce,
             sourceEndpoint(
                 "myEndpoint",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 ))
@@ -318,7 +320,7 @@ public class KnativeHttpTest {
             given()
                 .contentType(Knative.MIME_STRUCTURED_CONTENT_MODE)
                 .body(
-                    mapOf(
+                    Map.of(
                         "cloudEventsVersion", ce.version(),
                         "eventType", "org.apache.camel.event",
                         "eventID", "myEventID",
@@ -337,7 +339,7 @@ public class KnativeHttpTest {
             given()
                 .contentType(Knative.MIME_STRUCTURED_CONTENT_MODE)
                 .body(
-                    mapOf(
+                    Map.of(
                         "specversion", ce.version(),
                         "type", "org.apache.camel.event",
                         "id", "myEventID",
@@ -356,7 +358,7 @@ public class KnativeHttpTest {
             given()
                 .contentType(Knative.MIME_STRUCTURED_CONTENT_MODE)
                 .body(
-                    mapOf(
+                    Map.of(
                         "specversion", ce.version(),
                         "type", "org.apache.camel.event",
                         "id", "myEventID",
@@ -375,7 +377,7 @@ public class KnativeHttpTest {
             given()
                 .contentType(Knative.MIME_STRUCTURED_CONTENT_MODE)
                 .body(
-                    mapOf(
+                    Map.of(
                         "specversion", ce.version(),
                         "type", "org.apache.camel.event",
                         "id", "myEventID",
@@ -405,7 +407,7 @@ public class KnativeHttpTest {
             ce,
             sourceEndpoint(
                 "myEndpoint",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 ))
@@ -431,11 +433,11 @@ public class KnativeHttpTest {
         given()
             .body("test")
             .header(Exchange.CONTENT_TYPE, "text/plain")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
         .when()
             .post()
         .then()
@@ -452,17 +454,17 @@ public class KnativeHttpTest {
             ce,
             sourceEndpoint(
                 "ep1",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
-                    Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1"
+                    Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE1"
                 )),
             sourceEndpoint(
                 "ep2",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
-                    Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2"
+                    Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE2"
                 ))
         );
 
@@ -500,11 +502,11 @@ public class KnativeHttpTest {
         given()
             .body("test")
             .header(Exchange.CONTENT_TYPE, "text/plain")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID1")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE1")
             .when()
             .post()
             .then()
@@ -513,11 +515,11 @@ public class KnativeHttpTest {
         given()
             .body("test")
             .header(Exchange.CONTENT_TYPE, "text/plain")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID2")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE2")
         .when()
             .post()
         .then()
@@ -535,17 +537,17 @@ public class KnativeHttpTest {
             ce,
             sourceEndpoint(
                 "ep1",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
-                    Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE[01234]"
+                    Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE[01234]"
                 )),
             sourceEndpoint(
                 "ep2",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
-                    Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE[56789]"
+                    Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE[56789]"
                 ))
         );
 
@@ -583,11 +585,11 @@ public class KnativeHttpTest {
         given()
             .body("test")
             .header(Exchange.CONTENT_TYPE, "text/plain")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE0")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID1")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE0")
         .when()
             .post()
         .then()
@@ -596,11 +598,11 @@ public class KnativeHttpTest {
         given()
             .body("test")
             .header(Exchange.CONTENT_TYPE, "text/plain")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE5")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID2")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE5")
         .when()
             .post()
         .then()
@@ -653,11 +655,11 @@ public class KnativeHttpTest {
         given()
             .body("test")
             .header(Exchange.CONTENT_TYPE, "text/plain")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event1")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID1")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE1")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "event1")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID1")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE1")
         .when()
             .post()
         .then()
@@ -666,11 +668,11 @@ public class KnativeHttpTest {
         given()
             .body("test")
             .header(Exchange.CONTENT_TYPE, "text/plain")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event2")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID2")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "CE2")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "event2")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID2")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE2")
         .when()
             .post()
         .then()
@@ -688,17 +690,17 @@ public class KnativeHttpTest {
             ce,
             sourceEndpoint(
                 "from",
-                mapOf(
-                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                Map.of(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event.from",
                     Knative.CONTENT_TYPE, "text/plain"
                 )),
             endpoint(
                 Knative.EndpointKind.sink,
                 "to",
-                "localhost",
+                platformHttpHost,
                 platformHttpPort,
-                mapOf(
-                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                Map.of(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event.to",
                     Knative.CONTENT_TYPE, "text/plain"
                 ))
         );
@@ -707,9 +709,9 @@ public class KnativeHttpTest {
             b.from("knative:endpoint/from")
                 .convertBodyTo(String.class)
                 .setBody()
-                .constant("consumer")
+                    .constant("consumer")
                 .setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE)
-                .constant("custom");
+                    .constant("custom");
             b.from("direct:source")
                 .to("knative://endpoint/to")
                 .log("${body}")
@@ -718,7 +720,7 @@ public class KnativeHttpTest {
 
         MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
         mock.expectedBodiesReceived("consumer");
-        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), null);
+        mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event.to");
         mock.expectedMessageCount(1);
 
         context.start();
@@ -735,16 +737,16 @@ public class KnativeHttpTest {
             ce,
             sourceEndpoint(
                 "from",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 )),
             endpoint(
                 Knative.EndpointKind.sink,
                 "to",
-                "localhost",
+                platformHttpHost,
                 platformHttpPort,
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 ))
@@ -754,9 +756,9 @@ public class KnativeHttpTest {
             b.from("knative:endpoint/from?replyWithCloudEvent=true")
                 .convertBodyTo(String.class)
                 .setBody()
-                .constant("consumer")
+                    .constant("consumer")
                 .setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE)
-                .constant("custom");
+                    .constant("custom");
             b.from("direct:source")
                 .to("knative://endpoint/to")
                 .log("${body}")
@@ -765,7 +767,7 @@ public class KnativeHttpTest {
 
         MockEndpoint mock = context.getEndpoint("mock:to", MockEndpoint.class);
         mock.expectedBodiesReceived("consumer");
-        mock.expectedHeaderReceived(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "custom");
+        mock.expectedHeaderReceived(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "custom");
         mock.expectedMessageCount(1);
 
         context.start();
@@ -785,7 +787,7 @@ public class KnativeHttpTest {
                 "test",
                 "",
                 platformHttpPort,
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 )
@@ -815,9 +817,9 @@ public class KnativeHttpTest {
             endpoint(
                 Knative.EndpointKind.sink,
                 "test",
-                "localhost",
+                platformHttpHost,
                 platformHttpPort,
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 )
@@ -846,7 +848,7 @@ public class KnativeHttpTest {
             ce,
             sourceEndpoint(
                 "ep1",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
                     Knative.KNATIVE_FILTER_PREFIX + "h", "h1"
@@ -854,7 +856,7 @@ public class KnativeHttpTest {
             ),
             sourceEndpoint(
                 "ep2",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
                     Knative.KNATIVE_FILTER_PREFIX + "h", "h2"
@@ -897,7 +899,7 @@ public class KnativeHttpTest {
             ce,
             sourceEndpoint(
                 "ep1",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
                     Knative.KNATIVE_FILTER_PREFIX + "h", "h1"
@@ -905,7 +907,7 @@ public class KnativeHttpTest {
             ),
             sourceEndpoint(
                 "ep2",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
                     Knative.KNATIVE_FILTER_PREFIX + "h", "h2"
@@ -951,9 +953,9 @@ public class KnativeHttpTest {
             endpoint(
                 Knative.EndpointKind.sink,
                 "ep",
-                "localhost",
+                platformHttpHost,
                 platformHttpPort,
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 )
@@ -989,15 +991,15 @@ public class KnativeHttpTest {
             event(
                 Knative.EndpointKind.sink,
                 "default",
-                "localhost",
+                platformHttpHost,
                 platformHttpPort,
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 )),
             sourceEvent(
                 "default",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 ))
@@ -1035,9 +1037,9 @@ public class KnativeHttpTest {
             event(
                 Knative.EndpointKind.sink,
                 "default",
-                "localhost",
+                platformHttpHost,
                 platformHttpPort,
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
                     Knative.KNATIVE_KIND, "MyObject",
@@ -1045,7 +1047,7 @@ public class KnativeHttpTest {
                 )),
             sourceEvent(
                 "default",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
                     Knative.KNATIVE_KIND, "MyOtherObject",
@@ -1084,7 +1086,7 @@ public class KnativeHttpTest {
             ce,
             sourceEndpoint(
                 "myEndpoint",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
                     Knative.KNATIVE_KIND, "MyObject",
@@ -1092,7 +1094,7 @@ public class KnativeHttpTest {
                 )),
             sourceEndpoint(
                 "myEndpoint",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
                     Knative.KNATIVE_KIND, "MyObject",
@@ -1120,11 +1122,11 @@ public class KnativeHttpTest {
         given()
             .body("test")
             .header(Exchange.CONTENT_TYPE, "text/plain")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
         .when()
             .post()
         .then()
@@ -1141,7 +1143,7 @@ public class KnativeHttpTest {
             ce,
             sourceEndpoint(
                 "myEndpoint",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 ))
@@ -1172,9 +1174,9 @@ public class KnativeHttpTest {
             endpoint(
                 Knative.EndpointKind.sink,
                 "myEndpoint",
-                "localhost",
+                platformHttpHost,
                 platformHttpPort,
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 ))
@@ -1196,8 +1198,7 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @EnumSource(CloudEvents.class)
     void testNoContent(CloudEvent ce) throws Exception {
-        final int wordsPort = AvailablePortFinder.getNextAvailable();
-        final KnativeHttpServer server = new KnativeHttpServer(context, wordsPort, event -> {
+        final KnativeHttpServer server = new KnativeHttpServer(context, event -> {
             event.response().setStatusCode(204);
             event.response().end("");
         });
@@ -1210,25 +1211,25 @@ public class KnativeHttpTest {
                 "messages",
                 null,
                 -1,
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 )),
             channel(
                 Knative.EndpointKind.sink,
                 "messages",
-                "localhost",
+                platformHttpHost,
                 platformHttpPort,
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 )),
             channel(
                 Knative.EndpointKind.sink,
                 "words",
-                "localhost",
-                wordsPort,
-                mapOf(
+                server.getHost(),
+                server.getPort(),
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 ))
@@ -1278,11 +1279,11 @@ public class KnativeHttpTest {
         given()
             .body("test")
             .header(Exchange.CONTENT_TYPE, "text/plain")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
         .when()
             .post()
         .then()
@@ -1315,11 +1316,11 @@ public class KnativeHttpTest {
         given()
             .body("test")
             .header(Exchange.CONTENT_TYPE, "text/plain")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
         .when()
             .post()
         .then()
@@ -1352,11 +1353,11 @@ public class KnativeHttpTest {
         given()
             .body("test")
             .header(Exchange.CONTENT_TYPE, "text/plain")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
-            .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+            .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
         .when()
             .post()
         .then()
@@ -1373,7 +1374,7 @@ public class KnativeHttpTest {
             .limit(10)
             .mapToObj(i -> sourceEndpoint(
                 "ep-" + i,
-                mapOf(Knative.KNATIVE_FILTER_PREFIX + "MyHeader", "channel-" + i)))
+                Map.of(Knative.KNATIVE_FILTER_PREFIX + "MyHeader", "channel-" + i)))
             .collect(Collectors.toList());
 
         configureKnativeComponent(context, ce, hops);
@@ -1413,7 +1414,7 @@ public class KnativeHttpTest {
     @EnumSource(CloudEvents.class)
     void testHeaders(CloudEvent ce) throws Exception {
         final int port = AvailablePortFinder.getNextAvailable();
-        final KnativeHttpServer server = new KnativeHttpServer(context, port);
+        final KnativeHttpServer server = new KnativeHttpServer(context);
 
         configureKnativeComponent(
             context,
@@ -1421,9 +1422,9 @@ public class KnativeHttpTest {
             endpoint(
                 Knative.EndpointKind.sink,
                 "ep",
-                "localhost",
-                port,
-                mapOf(
+                server.getHost(),
+                server.getPort(),
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 )
@@ -1432,19 +1433,31 @@ public class KnativeHttpTest {
 
         RouteBuilder.addRoutes(context, b -> {
             b.from("direct:start")
-                .to("knative:endpoint/ep");
+                .setHeader("CamelDummyHeader").constant("test")
+                .to("knative:endpoint/ep")
+                .to("direct:mock");
+            b.from("direct:mock")
+                .to("mock:ep");
         });
 
         context.start();
+
         try {
+            MockEndpoint mock = context.getEndpoint("mock:ep", MockEndpoint.class);
+            mock.expectedHeaderReceived("CamelDummyHeader", "test");
+            mock.expectedMessageCount(1);
+
             server.start();
+
             template.sendBody("direct:start", "");
 
+            mock.assertIsSatisfied();
+
             HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("org.apache.camel.event");
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("org.apache.camel.event");
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo("knative://endpoint/ep");
             assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
         } finally {
             server.stop();
@@ -1453,12 +1466,54 @@ public class KnativeHttpTest {
 
     @ParameterizedTest
     @EnumSource(CloudEvents.class)
-    void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception {
+    void testHeadersInReply(CloudEvent ce) throws Exception {
         final int port = AvailablePortFinder.getNextAvailable();
-        final KnativeHttpServer server = new KnativeHttpServer(context, port);
-        final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
+        final KnativeHttpServer server = new KnativeHttpServer(context);
+
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
+                Knative.EndpointKind.sink,
+                "ep",
+                server.getHost(),
+                server.getPort(),
+                Map.of(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                )
+            )
+        );
+
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("direct:start")
+                .setHeader("CamelDummyHeader").constant("test")
+                .to("knative:endpoint/ep");
+        });
+
+        context.start();
+
+        try {
+            MockEndpoint mock = context.getEndpoint("mock:ep", MockEndpoint.class);
+            mock.expectedHeaderReceived("CamelDummyHeader", "test");
+            mock.expectedMessageCount(1);
+
+            server.start();
+
+            Exchange exchange = template.request("direct:start", e -> e.getMessage().setBody(null));
+            assertThat(exchange.getMessage().getHeaders()).containsEntry("CamelDummyHeader", "test");
+        } finally {
+            server.stop();
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(CloudEvents.class)
+    void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception {
+        final KnativeHttpServer server = new KnativeHttpServer(context);
+        final String typeHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE);
         final String typeHeaderVal = UUID.randomUUID().toString();
-        final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http();
+        final String sourceHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE);
         final String sourceHeaderVal = UUID.randomUUID().toString();
 
         configureKnativeComponent(
@@ -1467,9 +1522,9 @@ public class KnativeHttpTest {
             endpoint(
                 Knative.EndpointKind.sink,
                 "ep",
-                "localhost",
-                port,
-                mapOf(
+                server.getHost(),
+                server.getPort(),
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
                     Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal,
@@ -1489,10 +1544,10 @@ public class KnativeHttpTest {
             template.sendBody("direct:start", "");
 
             HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo(typeHeaderVal);
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo(sourceHeaderVal);
             assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
         } finally {
             server.stop();
@@ -1502,11 +1557,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @EnumSource(CloudEvents.class)
     void testHeadersOverrideFromURI(CloudEvent ce) throws Exception {
-        final int port = AvailablePortFinder.getNextAvailable();
-        final KnativeHttpServer server = new KnativeHttpServer(context, port);
-        final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
+        final KnativeHttpServer server = new KnativeHttpServer(context);
+        final String typeHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE);
         final String typeHeaderVal = UUID.randomUUID().toString();
-        final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http();
+        final String sourceHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE);
         final String sourceHeaderVal = UUID.randomUUID().toString();
 
         configureKnativeComponent(
@@ -1515,9 +1569,9 @@ public class KnativeHttpTest {
             endpoint(
                 Knative.EndpointKind.sink,
                 "ep",
-                "localhost",
-                port,
-                mapOf(
+                server.getHost(),
+                server.getPort(),
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 )
@@ -1537,10 +1591,10 @@ public class KnativeHttpTest {
             template.sendBody("direct:start", "");
 
             HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo(typeHeaderVal);
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo(sourceHeaderVal);
             assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
         } finally {
             server.stop();
@@ -1550,11 +1604,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @EnumSource(CloudEvents.class)
     void testHeadersOverrideFromConf(CloudEvent ce) throws Exception {
-        final int port = AvailablePortFinder.getNextAvailable();
-        final KnativeHttpServer server = new KnativeHttpServer(context, port);
-        final String typeHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
+        final KnativeHttpServer server = new KnativeHttpServer(context);
+        final String typeHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE);
         final String typeHeaderVal = UUID.randomUUID().toString();
-        final String sourceHeaderKey = ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http();
+        final String sourceHeaderKey = httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE);
         final String sourceHeaderVal = UUID.randomUUID().toString();
 
         KnativeComponent component = configureKnativeComponent(
@@ -1563,16 +1616,16 @@ public class KnativeHttpTest {
             endpoint(
                 Knative.EndpointKind.sink,
                 "ep",
-                "localhost",
-                port,
-                mapOf(
+                server.getHost(),
+                server.getPort(),
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 )
             )
         );
 
-        component.getConfiguration().setCeOverride(mapOf(
+        component.getConfiguration().setCeOverride(Map.of(
             Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal,
             Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal
         ));
@@ -1588,10 +1641,10 @@ public class KnativeHttpTest {
             template.sendBody("direct:start", "");
 
             HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo(typeHeaderVal);
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo(sourceHeaderVal);
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo(typeHeaderVal);
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo(sourceHeaderVal);
             assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
         } finally {
             server.stop();
@@ -1601,8 +1654,7 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @EnumSource(CloudEvents.class)
     void testHeadersOverrideFromRouteWithCamelHeader(CloudEvent ce) throws Exception {
-        final int port = AvailablePortFinder.getNextAvailable();
-        final KnativeHttpServer server = new KnativeHttpServer(context, port);
+        final KnativeHttpServer server = new KnativeHttpServer(context);
 
         configureKnativeComponent(
             context,
@@ -1610,9 +1662,9 @@ public class KnativeHttpTest {
             endpoint(
                 Knative.EndpointKind.sink,
                 "ep",
-                "localhost",
-                port,
-                mapOf(
+                server.getHost(),
+                server.getPort(),
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 )
@@ -1631,10 +1683,10 @@ public class KnativeHttpTest {
             template.sendBody("direct:start", "");
 
             HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("myType");
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("myType");
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo("knative://endpoint/ep");
             assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
         } finally {
             server.stop();
@@ -1644,8 +1696,7 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @EnumSource(CloudEvents.class)
     void testHeadersOverrideFromRouteWithCEHeader(CloudEvent ce) throws Exception {
-        final int port = AvailablePortFinder.getNextAvailable();
-        final KnativeHttpServer server = new KnativeHttpServer(context, port);
+        final KnativeHttpServer server = new KnativeHttpServer(context);
 
         configureKnativeComponent(
             context,
@@ -1653,9 +1704,9 @@ public class KnativeHttpTest {
             endpoint(
                 Knative.EndpointKind.sink,
                 "ep",
-                "localhost",
-                port,
-                mapOf(
+                server.getHost(),
+                server.getPort(),
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 )
@@ -1664,7 +1715,7 @@ public class KnativeHttpTest {
 
         RouteBuilder.addRoutes(context, b -> {
             b.from("direct:start")
-                .setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http()).constant("fromCEHeader")
+                .setHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE)).constant("fromCEHeader")
                 .setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).constant("fromCamelHeader")
                 .to("knative:endpoint/ep");
         });
@@ -1675,10 +1726,10 @@ public class KnativeHttpTest {
             template.sendBody("direct:start", "");
 
             HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("fromCEHeader");
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("fromCEHeader");
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID))).isNotNull();
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE))).isEqualTo("knative://endpoint/ep");
             assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
         } finally {
             server.stop();
@@ -1688,8 +1739,7 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @EnumSource(CloudEvents.class)
     void testEventBridge(CloudEvent ce) throws Exception {
-        final int port = AvailablePortFinder.getNextAvailable();
-        final KnativeHttpServer server = new KnativeHttpServer(context, port);
+        final KnativeHttpServer server = new KnativeHttpServer(context);
 
         configureKnativeComponent(
             context,
@@ -1697,14 +1747,14 @@ public class KnativeHttpTest {
             event(
                 Knative.EndpointKind.sink,
                 "event.sink",
-                "localhost",
-                port,
-                mapOf(
+                server.getHost(),
+                server.getPort(),
+                Map.of(
                     Knative.CONTENT_TYPE, "text/plain"
                 )),
             sourceEvent(
                 "event.source",
-                mapOf(
+                Map.of(
                     Knative.CONTENT_TYPE, "text/plain"
                 ))
         );
@@ -1722,19 +1772,19 @@ public class KnativeHttpTest {
             given()
                 .body("test")
                 .header(Exchange.CONTENT_TYPE, "text/plain")
-                .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
-                .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event.source")
-                .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
-                .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
-                .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+                .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+                .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "event.source")
+                .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+                .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+                .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
             .when()
                 .post()
             .then()
                 .statusCode(204);
 
             HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("event.sink");
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("event.sink");
             assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
         } finally {
             server.stop();
@@ -1745,7 +1795,7 @@ public class KnativeHttpTest {
     @EnumSource(CloudEvents.class)
     void testDynamicEventBridge(CloudEvent ce) throws Exception {
         final int port = AvailablePortFinder.getNextAvailable();
-        final KnativeHttpServer server = new KnativeHttpServer(context, port);
+        final KnativeHttpServer server = new KnativeHttpServer(context);
 
         configureKnativeComponent(
             context,
@@ -1753,14 +1803,14 @@ public class KnativeHttpTest {
             event(
                 Knative.EndpointKind.sink,
                 "default",
-                "localhost",
-                port,
-                mapOf(
+                server.getHost(),
+                server.getPort(),
+                Map.of(
                     Knative.CONTENT_TYPE, "text/plain"
                 )),
             sourceEvent(
                 "event.source",
-                mapOf(
+                Map.of(
                     Knative.CONTENT_TYPE, "text/plain"
                 ))
         );
@@ -1779,19 +1829,19 @@ public class KnativeHttpTest {
             given()
                 .body("test")
                 .header(Exchange.CONTENT_TYPE, "text/plain")
-                .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
-                .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "event.source")
-                .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
-                .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
-                .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+                .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+                .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "event.source")
+                .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+                .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+                .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
             .when()
                 .post()
             .then()
                 .statusCode(204);
 
             HttpServerRequest request = server.poll(30, TimeUnit.SECONDS);
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
-            assertThat(request.getHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("event.sink");
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION))).isEqualTo(ce.version());
+            assertThat(request.getHeader(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE))).isEqualTo("event.sink");
             assertThat(request.getHeader(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
         } finally {
             server.stop();
@@ -1801,8 +1851,7 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @EnumSource(CloudEvents.class)
     void testSlowConsumer(CloudEvent ce) throws Exception {
-        final int port = AvailablePortFinder.getNextAvailable();
-        final KnativeHttpServer server = new KnativeHttpServer(context, port, event -> {
+        final KnativeHttpServer server = new KnativeHttpServer(context, event -> {
             event.vertx().executeBlocking(
                 promise -> {
                     try {
@@ -1825,7 +1874,7 @@ public class KnativeHttpTest {
             ce,
             sourceEndpoint(
                 "start",
-                mapOf(
+                Map.of(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 ))
@@ -1837,7 +1886,7 @@ public class KnativeHttpTest {
             RouteBuilder.addRoutes(context, b -> {
                 b.from("knative:endpoint/start")
                     .removeHeaders("Camel*")
-                    .toF("http://localhost:%d", port);
+                    .toF("http://%s:%d", server.getHost(), server.getPort());
             });
 
             context.start();
@@ -1845,11 +1894,11 @@ public class KnativeHttpTest {
             given()
                 .body("test")
                 .header(Exchange.CONTENT_TYPE, "text/plain")
-                .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version())
-                .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), "org.apache.camel.event")
-                .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), "myEventID")
-                .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
-                .header(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), "/somewhere")
+                .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_VERSION), ce.version())
+                .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TYPE), "org.apache.camel.event")
+                .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_ID), "myEventID")
+                .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_TIME), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()))
+                .header(httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "/somewhere")
                 .when()
                     .post()
                 .then()
diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java
index 8bf3702..bbe209d 100644
--- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java
@@ -39,4 +39,8 @@ public final class KnativeHttpTestSupport {
 
         return component;
     }
+
+    public static String httpAttribute(CloudEvent ce, String name) {
+        return ce.mandatoryAttribute(name).http();
+    }
 }
diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/assertions/HttpServerRequestAssert.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/assertions/HttpServerRequestAssert.java
new file mode 100644
index 0000000..19250b4
--- /dev/null
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/assertions/HttpServerRequestAssert.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http.assertions;
+
+import java.util.Objects;
+
+import io.vertx.core.http.HttpServerRequest;
+import org.assertj.core.api.AbstractAssert;
+import org.assertj.core.api.AbstractStringAssert;
+import org.assertj.core.api.AssertionsForClassTypes;
+
+public class HttpServerRequestAssert extends AbstractAssert<HttpServerRequestAssert, HttpServerRequest> {
+    public HttpServerRequestAssert(HttpServerRequest request) {
+        super(request, HttpServerRequest.class);
+    }
+
+    public static HttpServerRequestAssert assertThat(HttpServerRequest actual) {
+        return new HttpServerRequestAssert(actual);
+    }
+
+
+    public AbstractStringAssert<?> header(String name) {
+        isNotNull();
+
+        return AssertionsForClassTypes.assertThat(actual.getHeader(name));
+    }
+
+    public HttpServerRequestAssert hasHeader(String name) {
+        isNotNull();
+
+        if (Objects.isNull(actual.getHeader(name))) {
+            //failWithMessage("Expected header name to be <%s> but was <%s>", name, actual.getName());
+            failWithMessage("Expected header %s not present", name);
+        }
+
+        return this;
+    }
+
+    public HttpServerRequestAssert hasHeader(String name, String value) {
+        isNotNull();
+
+        if (Objects.isNull(actual.getHeader(name))) {
+            failWithMessage("Expected header %s not present", name);
+        }
+
+        if (Objects.equals(actual.getHeader(name), value)) {
+            failWithMessage("Expected header %s to be <%s> but was <%s>", name, value, actual.getHeader(name));
+        }
+
+        return this;
+    }
+}
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
index b38f510..0e2b235 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
@@ -128,7 +128,7 @@ public class KnativeConfiguration implements Cloneable {
      * Set the transport options.
      */
     public void setTransportOptions(Map<String, Object> transportOptions) {
-        this.transportOptions = transportOptions;
+        this.transportOptions = new HashMap<>(transportOptions);
     }
 
     /**
@@ -150,7 +150,7 @@ public class KnativeConfiguration implements Cloneable {
      * Set the filters.
      */
     public void setFilters(Map<String, Object> filters) {
-        this.filters = filters;
+        this.filters = new HashMap<>(filters);
     }
 
     public Map<String, Object> getCeOverride() {
@@ -161,7 +161,7 @@ public class KnativeConfiguration implements Cloneable {
      * CloudEvent headers to override
      */
     public void setCeOverride(Map<String, Object> ceOverride) {
-        this.ceOverride = ceOverride;
+        this.ceOverride = new HashMap<>(ceOverride);
     }
 
     public String getApiVersion() {