You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2019/10/17 13:35:13 UTC

[camel-k-runtime] branch master updated: Can't bridge two Knative channels with new component #163

This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git


The following commit(s) were added to refs/heads/master by this push:
     new 1adf649  Can't bridge two Knative channels with new component #163
     new 66ac3bb  Merge pull request #166 from lburgazzoli/github-163
1adf649 is described below

commit 1adf6499a95d8b573d244de451005db4f3de6027
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Thu Oct 17 13:08:04 2019 +0200

    Can't bridge two Knative channels with new component #163
---
 .../knative/http/KnativeHttpConsumer.java          |  32 +--
 .../component/knative/http/KnativeHttpTest.java    | 266 ++++++++++++---------
 .../knative/http/KnativeHttpTestSupport.java       |  36 +++
 3 files changed, 210 insertions(+), 124 deletions(-)

diff --git a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
index cb99b0d..0091dcb 100644
--- a/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
+++ b/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.function.Predicate;
 
 import io.vertx.core.buffer.Buffer;
-import io.vertx.core.http.HttpHeaders;
 import io.vertx.core.http.HttpMethod;
 import io.vertx.core.http.HttpServerRequest;
 import io.vertx.core.http.HttpServerResponse;
@@ -99,21 +98,24 @@ public class KnativeHttpConsumer extends DefaultConsumer implements KnativeHttp.
                     getAsyncProcessor().process(exchange, doneSync -> {
                         try {
                             HttpServerResponse response = toHttpResponse(request, exchange.getMessage());
-                            Buffer body = computeResponseBody(exchange.getMessage());
+                            Buffer body = null;
 
-                            // set the content type in the response.
-                            String contentType = MessageHelper.getContentType(exchange.getMessage());
-                            if (contentType != null) {
-                                // set content-type
-                                response.putHeader(Exchange.CONTENT_TYPE, contentType);
+                            if (request.response().getStatusCode() != 204) {
+                                body = computeResponseBody(exchange.getMessage());
+
+                                // set the content type in the response.
+                                String contentType = MessageHelper.getContentType(exchange.getMessage());
+                                if (contentType != null) {
+                                    // set content-type
+                                    response.putHeader(Exchange.CONTENT_TYPE, contentType);
+                                }
                             }
 
-                            if (body == null) {
-                                request.response().setStatusCode(204);
-                                request.response().putHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
-                                request.response().end("No response available");
-                            } else {
+                            if (body != null) {
                                 request.response().end(body);
+                            } else {
+                                request.response().setStatusCode(204);
+                                request.response().end();
                             }
                         } catch (Exception e) {
                             getExceptionHandler().handleException(e);
@@ -214,9 +216,9 @@ public class KnativeHttpConsumer extends DefaultConsumer implements KnativeHttp.
             ExchangeHelper.setFailureHandled(message.getExchange());
         }
 
-        return Buffer.buffer(
-            message.getExchange().getContext().getTypeConverter().mandatoryConvertTo(byte[].class, body)
-        );
+        return body != null
+            ? Buffer.buffer(message.getExchange().getContext().getTypeConverter().mandatoryConvertTo(byte[].class, body))
+            : null;
     }
 
 }
diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index cd572ec..c89e6a3 100644
--- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -22,6 +22,7 @@ import java.util.Objects;
 import java.util.stream.Stream;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import io.undertow.Undertow;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
@@ -31,7 +32,6 @@ import org.apache.camel.component.knative.KnativeComponent;
 import org.apache.camel.component.knative.spi.CloudEvent;
 import org.apache.camel.component.knative.spi.CloudEvents;
 import org.apache.camel.component.knative.spi.Knative;
-import org.apache.camel.component.knative.spi.KnativeEnvironment;
 import org.apache.camel.component.knative.spi.KnativeSupport;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.http.common.HttpOperationFailedException;
@@ -44,6 +44,10 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import static org.apache.camel.component.knative.http.KnativeHttpTestSupport.configureKnativeComponent;
+import static org.apache.camel.component.knative.spi.KnativeEnvironment.channel;
+import static org.apache.camel.component.knative.spi.KnativeEnvironment.endpoint;
+import static org.apache.camel.component.knative.spi.KnativeEnvironment.event;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class KnativeHttpTest {
@@ -99,8 +103,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testInvokeEndpoint(CloudEvent ce) throws Exception {
-        KnativeEnvironment env = KnativeEnvironment.on(
-            KnativeEnvironment.endpoint(
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
                 Knative.EndpointKind.sink,
                 "myEndpoint",
                 "localhost",
@@ -112,11 +118,6 @@ public class KnativeHttpTest {
                 ))
         );
 
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setProtocol(Knative.Protocol.http);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(env);
-
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
@@ -147,8 +148,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testConsumeStructuredContent(CloudEvent ce) throws Exception {
-        KnativeEnvironment env = KnativeEnvironment.on(
-            KnativeEnvironment.endpoint(
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
                 Knative.EndpointKind.source,
                 "myEndpoint",
                 "localhost",
@@ -160,10 +163,6 @@ public class KnativeHttpTest {
                 ))
         );
 
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(env);
-
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
@@ -233,8 +232,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testConsumeContent(CloudEvent ce) throws Exception {
-        KnativeEnvironment env = KnativeEnvironment.on(
-            KnativeEnvironment.endpoint(
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
                 Knative.EndpointKind.source,
                 "myEndpoint",
                 "localhost",
@@ -246,10 +247,6 @@ public class KnativeHttpTest {
                 ))
         );
 
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(env);
-
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
@@ -292,8 +289,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testConsumeContentWithFilter(CloudEvent ce) throws Exception {
-        KnativeEnvironment env = KnativeEnvironment.on(
-            KnativeEnvironment.endpoint(
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
                 Knative.EndpointKind.source,
                 "ep1",
                 "localhost",
@@ -303,7 +302,7 @@ public class KnativeHttpTest {
                     Knative.CONTENT_TYPE, "text/plain",
                     Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE1"
                 )),
-            KnativeEnvironment.endpoint(
+            endpoint(
                 Knative.EndpointKind.source,
                 "ep2",
                 "localhost",
@@ -315,10 +314,6 @@ public class KnativeHttpTest {
                 ))
         );
 
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(env);
-
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
@@ -388,8 +383,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testConsumeContentWithRegExFilter(CloudEvent ce) throws Exception {
-        KnativeEnvironment env = KnativeEnvironment.on(
-            KnativeEnvironment.endpoint(
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
                 Knative.EndpointKind.source,
                 "ep1",
                 "localhost",
@@ -399,7 +396,7 @@ public class KnativeHttpTest {
                     Knative.CONTENT_TYPE, "text/plain",
                     Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE[01234]"
                 )),
-            KnativeEnvironment.endpoint(
+            endpoint(
                 Knative.EndpointKind.source,
                 "ep2",
                 "localhost",
@@ -411,10 +408,6 @@ public class KnativeHttpTest {
                 ))
         );
 
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(env);
-
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
@@ -484,18 +477,16 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testConsumeEventContent(CloudEvent ce) throws Exception {
-        KnativeEnvironment env = KnativeEnvironment.on(
-            KnativeEnvironment.event(
+        configureKnativeComponent(
+            context,
+            ce,
+            event(
                 Knative.EndpointKind.source,
                 "default",
                 "localhost",
                 port)
         );
 
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(env);
-
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
@@ -565,8 +556,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testReply(CloudEvent ce) throws Exception {
-        KnativeEnvironment env = KnativeEnvironment.on(
-            KnativeEnvironment.endpoint(
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
                 Knative.EndpointKind.source,
                 "from",
                 "localhost",
@@ -575,7 +568,7 @@ public class KnativeHttpTest {
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 )),
-            KnativeEnvironment.endpoint(
+            endpoint(
                 Knative.EndpointKind.sink,
                 "to",
                 "localhost",
@@ -586,10 +579,6 @@ public class KnativeHttpTest {
                 ))
         );
 
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(env);
-
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
@@ -617,10 +606,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testInvokeServiceWithoutHost(CloudEvent ce) throws Exception {
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(KnativeEnvironment.on(
-            KnativeEnvironment.endpoint(
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
                 Knative.EndpointKind.sink,
                 "test",
                 "",
@@ -630,7 +619,7 @@ public class KnativeHttpTest {
                     Knative.CONTENT_TYPE, "text/plain"
                 )
             )
-        ));
+        );
 
         RouteBuilder.addRoutes(context, b -> {
             b.from("direct:start")
@@ -649,10 +638,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testInvokeNotExistingEndpoint(CloudEvent ce) throws Exception {
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(KnativeEnvironment.on(
-            KnativeEnvironment.endpoint(
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
                 Knative.EndpointKind.sink,
                 "test",
                 "localhost",
@@ -662,7 +651,7 @@ public class KnativeHttpTest {
                     Knative.CONTENT_TYPE, "text/plain"
                 )
             )
-        ));
+        );
 
         RouteBuilder.addRoutes(context, b -> {
             b.from("direct:start")
@@ -681,10 +670,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testRemoveConsumer(CloudEvent ce) throws Exception {
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(KnativeEnvironment.on(
-            KnativeEnvironment.endpoint(
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
                 Knative.EndpointKind.source,
                 "ep1",
                 "localhost",
@@ -695,7 +684,7 @@ public class KnativeHttpTest {
                     Knative.KNATIVE_FILTER_PREFIX + "h", "h1"
                 )
             ),
-            KnativeEnvironment.endpoint(
+            endpoint(
                 Knative.EndpointKind.source,
                 "ep2",
                 "localhost",
@@ -706,7 +695,7 @@ public class KnativeHttpTest {
                     Knative.KNATIVE_FILTER_PREFIX + "h", "h2"
                 )
             )
-        ));
+        );
 
         RouteBuilder.addRoutes(context, b -> {
             b.from("knative:endpoint/ep1")
@@ -738,10 +727,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testAddConsumer(CloudEvent ce) throws Exception {
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(KnativeEnvironment.on(
-            KnativeEnvironment.endpoint(
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
                 Knative.EndpointKind.source,
                 "ep1",
                 "localhost",
@@ -752,7 +741,7 @@ public class KnativeHttpTest {
                     Knative.KNATIVE_FILTER_PREFIX + "h", "h1"
                 )
             ),
-            KnativeEnvironment.endpoint(
+            endpoint(
                 Knative.EndpointKind.source,
                 "ep2",
                 "localhost",
@@ -763,7 +752,7 @@ public class KnativeHttpTest {
                     Knative.KNATIVE_FILTER_PREFIX + "h", "h2"
                 )
             )
-        ));
+        );
 
         RouteBuilder.addRoutes(context, b -> {
             b.from("knative:endpoint/ep1")
@@ -797,10 +786,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testInvokeEndpointWithError(CloudEvent ce) throws Exception {
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(KnativeEnvironment.on(
-            KnativeEnvironment.endpoint(
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
                 Knative.EndpointKind.sink,
                 "ep",
                 "localhost",
@@ -810,7 +799,7 @@ public class KnativeHttpTest {
                     Knative.CONTENT_TYPE, "text/plain"
                 )
             )
-        ));
+        );
 
         RouteBuilder.addRoutes(context, b -> {
             b.from("direct:start")
@@ -835,8 +824,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testEvents(CloudEvent ce) throws Exception {
-        KnativeEnvironment env = KnativeEnvironment.on(
-            KnativeEnvironment.event(
+        configureKnativeComponent(
+            context,
+            ce,
+            event(
                 Knative.EndpointKind.sink,
                 "default",
                 "localhost",
@@ -845,7 +836,7 @@ public class KnativeHttpTest {
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain"
                 )),
-            KnativeEnvironment.event(
+            event(
                 Knative.EndpointKind.source,
                 "default",
                 "localhost",
@@ -856,11 +847,6 @@ public class KnativeHttpTest {
                 ))
         );
 
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setProtocol(Knative.Protocol.http);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(env);
-
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
@@ -890,8 +876,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testEventsWithTypeAndVersion(CloudEvent ce) throws Exception {
-        KnativeEnvironment env = KnativeEnvironment.on(
-            KnativeEnvironment.event(
+        configureKnativeComponent(
+            context,
+            ce,
+            event(
                 Knative.EndpointKind.sink,
                 "default",
                 "localhost",
@@ -902,7 +890,7 @@ public class KnativeHttpTest {
                     Knative.KNATIVE_KIND, "MyObject",
                     Knative.KNATIVE_API_VERSION, "v1"
                 )),
-            KnativeEnvironment.event(
+            event(
                 Knative.EndpointKind.source,
                 "default",
                 "localhost",
@@ -915,11 +903,6 @@ public class KnativeHttpTest {
                 ))
         );
 
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setProtocol(Knative.Protocol.http);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(env);
-
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
@@ -950,8 +933,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testConsumeContentWithTypeAndVersion(CloudEvent ce) throws Exception {
-        KnativeEnvironment env = KnativeEnvironment.on(
-            KnativeEnvironment.endpoint(
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
                 Knative.EndpointKind.source,
                 "myEndpoint",
                 "localhost",
@@ -962,7 +947,7 @@ public class KnativeHttpTest {
                     Knative.KNATIVE_KIND, "MyObject",
                     Knative.KNATIVE_API_VERSION, "v1"
                 )),
-            KnativeEnvironment.endpoint(
+            endpoint(
                 Knative.EndpointKind.source,
                 "myEndpoint",
                 "localhost",
@@ -975,10 +960,6 @@ public class KnativeHttpTest {
                 ))
         );
 
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(env);
-
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
@@ -1020,8 +1001,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testWrongMethod(CloudEvent ce) throws Exception {
-        KnativeEnvironment env = KnativeEnvironment.on(
-            KnativeEnvironment.endpoint(
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
                 Knative.EndpointKind.source,
                 "myEndpoint",
                 "localhost",
@@ -1032,10 +1015,6 @@ public class KnativeHttpTest {
                 ))
         );
 
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(env);
-
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
@@ -1058,8 +1037,10 @@ public class KnativeHttpTest {
     @ParameterizedTest
     @MethodSource("provideCloudEventsImplementations")
     void testNoBody(CloudEvent ce) throws Exception {
-        KnativeEnvironment env = KnativeEnvironment.on(
-            KnativeEnvironment.endpoint(
+        configureKnativeComponent(
+            context,
+            ce,
+            endpoint(
                 Knative.EndpointKind.sink,
                 "myEndpoint",
                 "localhost",
@@ -1070,10 +1051,6 @@ public class KnativeHttpTest {
                 ))
         );
 
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setCloudEventsSpecVersion(ce.version());
-        component.setEnvironment(env);
-
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
@@ -1089,5 +1066,76 @@ public class KnativeHttpTest {
         assertThat(exchange.getException()).isInstanceOf(IllegalArgumentException.class);
         assertThat(exchange.getException()).hasMessage("body must not be null");
     }
+
+
+
+    @ParameterizedTest
+    @MethodSource("provideCloudEventsImplementations")
+    void testNoContent(CloudEvent ce) throws Exception {
+        final int messagesPort = AvailablePortFinder.getNextAvailable();
+        final int wordsPort = AvailablePortFinder.getNextAvailable();
+
+        configureKnativeComponent(
+            context,
+            ce,
+            channel(
+                Knative.EndpointKind.source,
+                "messages",
+                "localhost",
+                messagesPort,
+                KnativeSupport.mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                )),
+            channel(
+                Knative.EndpointKind.sink,
+                "messages",
+                "localhost",
+                messagesPort,
+                KnativeSupport.mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                )),
+            channel(
+                Knative.EndpointKind.sink,
+                "words",
+                "localhost",
+                wordsPort,
+                KnativeSupport.mapOf(
+                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
+                    Knative.CONTENT_TYPE, "text/plain"
+                ))
+        );
+
+        Undertow server = Undertow.builder()
+            .addHttpListener(wordsPort, "localhost")
+            .setHandler(exchange -> {
+                exchange.setStatusCode(204);
+                exchange.getResponseSender().send("");
+            })
+            .build();
+
+        try {
+            server.start();
+
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("knative:channel/messages")
+                        .transform().simple("transformed ${body}")
+                        .log("${body}")
+                        .to("knative:channel/words");
+                }
+            });
+
+            context.start();
+
+            Exchange exchange = template.request("knative:channel/messages", e -> e.getMessage().setBody("message"));
+            assertThat(exchange.getMessage().getHeaders()).containsEntry(Exchange.HTTP_RESPONSE_CODE, 204);
+            assertThat(exchange.getMessage().getBody()).isNull();
+        } finally {
+            server.stop();
+        }
+    }
 }
 
diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java
new file mode 100644
index 0000000..273d5fb
--- /dev/null
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.knative.KnativeComponent;
+import org.apache.camel.component.knative.spi.CloudEvent;
+import org.apache.camel.component.knative.spi.KnativeEnvironment;
+
+public final class KnativeHttpTestSupport {
+    private KnativeHttpTestSupport() {
+    }
+
+    public static KnativeComponent configureKnativeComponent(CamelContext context, CloudEvent ce, KnativeEnvironment.KnativeServiceDefinition... definitions) {
+        KnativeEnvironment env = KnativeEnvironment.on(definitions);
+        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
+        component.setCloudEventsSpecVersion(ce.version());
+        component.setEnvironment(env);
+
+        return component;
+    }
+}