You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2023/11/09 21:00:12 UTC

(camel) branch main updated: CAMEL-19905: Add streaming option to camel-platform-http-vertx

This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new eefdd528b80 CAMEL-19905: Add streaming option to camel-platform-http-vertx
eefdd528b80 is described below

commit eefdd528b80ec28da81a3b4401704b3dfbdf5d00
Author: James Netherton <ja...@gmail.com>
AuthorDate: Thu Nov 9 13:39:12 2023 +0000

    CAMEL-19905: Add streaming option to camel-platform-http-vertx
---
 .../camel/catalog/components/platform-http.json    |  13 +-
 .../src/main/docs/platform-http-vertx.adoc         |  12 +
 .../platform/http/vertx/AsyncInputStream.java      | 243 +++++++++++++++++++
 .../http/vertx/DefaultHttpRequestBodyHandler.java  |  50 ++++
 .../http/vertx/HttpRequestBodyHandler.java         |  50 ++++
 .../vertx/StreamingHttpRequestBodyHandler.java     |  93 ++++++++
 .../http/vertx/VertxPlatformHttpConsumer.java      | 142 ++++++-----
 .../http/vertx/VertxPlatformHttpSupport.java       |  89 +++++--
 .../http/vertx/VertxPlatformHttpEngineTest.java    |   2 +-
 ...VertxPlatformHttpLargeMessageStreamingTest.java |  94 ++++++++
 .../http/vertx/VertxPlatformHttpProxyTest.java     |  10 +-
 .../http/vertx/VertxPlatformHttpStreamingTest.java | 264 +++++++++++++++++++++
 .../http/PlatformHttpEndpointConfigurer.java       |   6 +
 .../http/PlatformHttpEndpointUriFactory.java       |   3 +-
 .../component/platform/http/platform-http.json     |  13 +-
 .../platform/http/PlatformHttpEndpoint.java        |  11 +
 .../dsl/PlatformHttpEndpointBuilderFactory.java    |  31 +++
 17 files changed, 1009 insertions(+), 117 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/platform-http.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/platform-http.json
index bb67bb10512..69c9b292526 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/platform-http.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/platform-http.json
@@ -33,11 +33,12 @@
     "matchOnUriPrefix": { "index": 3, "kind": "parameter", "displayName": "Match On Uri Prefix", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether or not the consumer should try to find a target consumer by matching the URI prefix if no exact match is found." },
     "muteException": { "index": 4, "kind": "parameter", "displayName": "Mute Exception", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "If enabled and an Exchange failed processing on the consumer side the response's body won't contain the exception's stack trace." },
     "produces": { "index": 5, "kind": "parameter", "displayName": "Produces", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The content type this endpoint produces, such as application\/xml or application\/json." },
-    "bridgeErrorHandler": { "index": 6, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming  [...]
-    "exceptionHandler": { "index": 7, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By def [...]
-    "exchangePattern": { "index": 8, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
-    "fileNameExtWhitelist": { "index": 9, "kind": "parameter", "displayName": "File Name Ext Whitelist", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "A comma or whitespace separated list of file extensions. Uploads having these extensions will be stored locally. Null value or asterisk () will allow all files." },
-    "headerFilterStrategy": { "index": 10, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter headers to and from Camel message." },
-    "platformHttpEngine": { "index": 11, "kind": "parameter", "displayName": "Platform Http Engine", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.platform.http.spi.PlatformHttpEngine", "deprecated": false, "autowired": false, "secret": false, "description": "An HTTP Server engine implementation to serve the requests of this endpoint." }
+    "useStreaming": { "index": 6, "kind": "parameter", "displayName": "Use Streaming", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to use streaming for large requests and responses" },
+    "bridgeErrorHandler": { "index": 7, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming  [...]
+    "exceptionHandler": { "index": 8, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By def [...]
+    "exchangePattern": { "index": 9, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
+    "fileNameExtWhitelist": { "index": 10, "kind": "parameter", "displayName": "File Name Ext Whitelist", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "A comma or whitespace separated list of file extensions. Uploads having these extensions will be stored locally. Null value or asterisk () will allow all files." },
+    "headerFilterStrategy": { "index": 11, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter headers to and from Camel message." },
+    "platformHttpEngine": { "index": 12, "kind": "parameter", "displayName": "Platform Http Engine", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.platform.http.spi.PlatformHttpEngine", "deprecated": false, "autowired": false, "secret": false, "description": "An HTTP Server engine implementation to serve the requests of this endpoint." }
   }
 }
diff --git a/components/camel-platform-http-vertx/src/main/docs/platform-http-vertx.adoc b/components/camel-platform-http-vertx/src/main/docs/platform-http-vertx.adoc
index 118225964ff..176d5bef976 100644
--- a/components/camel-platform-http-vertx/src/main/docs/platform-http-vertx.adoc
+++ b/components/camel-platform-http-vertx/src/main/docs/platform-http-vertx.adoc
@@ -96,3 +96,15 @@ Camel `HttpMessage` as shown in the custom `Processor` below :
 });
 ----
 
+== Handling large request / response payloads
+
+When large request / response payloads are expected, there is a `useStreaming` option, which can be enabled to improve performance.
+When `useStreaming` is `true`, it will take advantage of xref:manual::stream-caching.adoc[stream caching]. In conjunction with enabling disk spooling, you can avoid having to store the entire request body payload in memory.
+
+[source,java]
+----
+// Handle a large request body and stream it to a file
+from("platform-http:/upload?httpMethodRestrict=POST&useStreaming=true")
+    .log("Processing large request body...")
+    .to("file:/uploads?fileName=uploaded.txt")
+----
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java
new file mode 100644
index 00000000000..813dbbe6a85
--- /dev/null
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/AsyncInputStream.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.platform.http.vertx;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Context;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.impl.InboundBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ReadStream} that can process an {@link InputStream} in an asynchronous way, so that the content can be pumped
+ * to the {@link io.vertx.core.streams.WriteStream} of an {@link io.vertx.core.http.HttpServerResponse}.
+ */
+public class AsyncInputStream implements ReadStream<Buffer> {
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncInputStream.class);
+    private static final int DEFAULT_BUFFER_SIZE = 4096;
+
+    private final ReadableByteChannel channel;
+    private final Vertx vertx;
+    private final Context context;
+    private final InboundBuffer<Buffer> queue;
+    private long readPos;
+    private boolean closed;
+    private boolean readInProgress;
+    private Handler<Buffer> dataHandler;
+    private Handler<Void> endHandler;
+    private Handler<Throwable> exceptionHandler;
+
+    public AsyncInputStream(Vertx vertx, Context context, InputStream inputStream) {
+        this.vertx = vertx;
+        this.context = context;
+        this.channel = Channels.newChannel(inputStream);
+        this.queue = new InboundBuffer<>(context, 0);
+        queue.handler(buffer -> {
+            if (buffer.length() > 0) {
+                handleData(buffer);
+            } else {
+                handleEnd();
+            }
+        });
+        queue.drainHandler(v -> doRead());
+    }
+
+    @Override
+    public synchronized AsyncInputStream endHandler(Handler<Void> endHandler) {
+        checkStreamClosed();
+        this.endHandler = endHandler;
+        return this;
+    }
+
+    @Override
+    public synchronized AsyncInputStream exceptionHandler(Handler<Throwable> exceptionHandler) {
+        checkStreamClosed();
+        this.exceptionHandler = exceptionHandler;
+        return this;
+    }
+
+    @Override
+    public synchronized AsyncInputStream handler(Handler<Buffer> handler) {
+        checkStreamClosed();
+        this.dataHandler = handler;
+        if (this.dataHandler != null && !this.closed) {
+            this.doRead();
+        } else {
+            queue.clear();
+        }
+        return this;
+    }
+
+    @Override
+    public synchronized AsyncInputStream pause() {
+        checkStreamClosed();
+        queue.pause();
+        return this;
+    }
+
+    @Override
+    public synchronized AsyncInputStream resume() {
+        checkStreamClosed();
+        queue.resume();
+        return this;
+    }
+
+    @Override
+    public ReadStream<Buffer> fetch(long amount) {
+        checkStreamClosed();
+        queue.fetch(amount);
+        return this;
+    }
+
+    public void close(Handler<AsyncResult<Void>> handler) {
+        closeInternal(handler);
+    }
+
+    private void checkStreamClosed() {
+        if (this.closed) {
+            throw new IllegalStateException("Stream closed");
+        }
+    }
+
+    private void checkContext() {
+        Context contextToCheck = vertx.getOrCreateContext();
+        if (!contextToCheck.equals(context)) {
+            throw new IllegalStateException(
+                    "AsyncInputStream must only be used in the context that created it, expected: " + this.context
+                                            + " actual " + contextToCheck);
+        }
+    }
+
+    private synchronized void closeInternal(Handler<AsyncResult<Void>> handler) {
+        closed = true;
+        doClose(handler);
+    }
+
+    private void doClose(Handler<AsyncResult<Void>> handler) {
+        try {
+            channel.close();
+            if (handler != null) {
+                this.vertx.runOnContext(v -> handler.handle(Future.succeededFuture()));
+            }
+        } catch (IOException e) {
+            if (handler != null) {
+                this.vertx.runOnContext(v -> handler.handle(Future.failedFuture(e)));
+            }
+        }
+    }
+
+    private void doRead() {
+        checkStreamClosed();
+        doRead(ByteBuffer.allocate(DEFAULT_BUFFER_SIZE));
+    }
+
+    private synchronized void doRead(ByteBuffer buffer) {
+        if (!readInProgress) {
+            readInProgress = true;
+            Buffer buff = Buffer.buffer(DEFAULT_BUFFER_SIZE);
+            doRead(buff, 0, buffer, readPos, result -> {
+                if (result.succeeded()) {
+                    readInProgress = false;
+                    Buffer updatedBuffer = result.result();
+                    readPos += updatedBuffer.length();
+                    if (queue.write(updatedBuffer) && updatedBuffer.length() > 0) {
+                        doRead(buffer);
+                    }
+                } else {
+                    handleException(result.cause());
+                }
+            });
+        }
+    }
+
+    private void doRead(Buffer writeBuff, int offset, ByteBuffer buffer, long position, Handler<AsyncResult<Buffer>> handler) {
+        vertx.executeBlocking(() -> channel.read(buffer))
+                .onComplete(result -> {
+                    if (result.succeeded()) {
+                        Integer bytesRead = result.result();
+                        if (bytesRead == -1) {
+                            // EOF
+                            context.runOnContext((v) -> {
+                                buffer.flip();
+                                writeBuff.setBytes(offset, buffer);
+                                buffer.compact();
+                                handler.handle(Future.succeededFuture(writeBuff));
+                            });
+                        } else if (buffer.hasRemaining()) {
+                            // Read from the next offset
+                            context.runOnContext((v) -> {
+                                doRead(writeBuff, offset, buffer, position + bytesRead, handler);
+                            });
+                        } else {
+                            // All data is written
+                            context.runOnContext((v) -> {
+                                buffer.flip();
+                                writeBuff.setBytes(offset, buffer);
+                                buffer.compact();
+                                handler.handle(Future.succeededFuture(writeBuff));
+                            });
+                        }
+                    } else {
+                        context.runOnContext((v) -> handler.handle(Future.failedFuture(result.cause())));
+                    }
+                });
+    }
+
+    private void handleData(Buffer buffer) {
+        Handler<Buffer> handler;
+        synchronized (this) {
+            handler = this.dataHandler;
+        }
+        if (handler != null) {
+            checkContext();
+            handler.handle(buffer);
+        }
+    }
+
+    private synchronized void handleEnd() {
+        Handler<Void> endHandler;
+        synchronized (this) {
+            dataHandler = null;
+            endHandler = this.endHandler;
+        }
+        if (endHandler != null) {
+            checkContext();
+            endHandler.handle(null);
+        }
+    }
+
+    private void handleException(Throwable t) {
+        if (exceptionHandler != null && t instanceof Exception) {
+            exceptionHandler.handle(t);
+        } else {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Unhandled error while processing stream", t);
+            }
+        }
+    }
+}
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/DefaultHttpRequestBodyHandler.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/DefaultHttpRequestBodyHandler.java
new file mode 100644
index 00000000000..4f187da7f0a
--- /dev/null
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/DefaultHttpRequestBodyHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.platform.http.vertx;
+
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.ext.web.RequestBody;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.camel.Message;
+
+import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isFormUrlEncoded;
+import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isMultiPartFormData;
+
+/**
+ * Default {@link HttpRequestBodyHandler} that will read to read the entire HTTP request body into memory.
+ */
+class DefaultHttpRequestBodyHandler extends HttpRequestBodyHandler {
+    DefaultHttpRequestBodyHandler(Handler<RoutingContext> delegate) {
+        super(delegate);
+    }
+
+    @Override
+    void configureRoute(Route route) {
+        route.handler(delegate);
+    }
+
+    @Override
+    Future<Void> handle(RoutingContext routingContext, Message message) {
+        if (!isMultiPartFormData(routingContext) && !isFormUrlEncoded(routingContext)) {
+            final RequestBody requestBody = routingContext.body();
+            message.setBody(requestBody.buffer());
+        }
+        return Future.succeededFuture();
+    }
+}
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/HttpRequestBodyHandler.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/HttpRequestBodyHandler.java
new file mode 100644
index 00000000000..e537612a518
--- /dev/null
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/HttpRequestBodyHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.platform.http.vertx;
+
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.camel.Message;
+
+/**
+ * Abstraction to handle HTTP request body processing.
+ */
+abstract class HttpRequestBodyHandler {
+    Handler<RoutingContext> delegate;
+
+    HttpRequestBodyHandler(Handler<RoutingContext> delegate) {
+        this.delegate = delegate;
+    }
+
+    /**
+     * Performs any required configuration on a {@link Route}.
+     *
+     * @param route The route to configure
+     */
+    abstract void configureRoute(Route route);
+
+    /**
+     * Processes the incoming HTTP request body.
+     *
+     * @param  routingContext The {@link RoutingContext} for the HTTP request being processed
+     * @param  message        The {@link Message} associated with the HTTP request being processed
+     * @return                {@link Future} to determine when the HTTP request body has been fully processed
+     */
+    abstract Future<Void> handle(RoutingContext routingContext, Message message);
+}
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/StreamingHttpRequestBodyHandler.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/StreamingHttpRequestBodyHandler.java
new file mode 100644
index 00000000000..88531a6a07e
--- /dev/null
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/StreamingHttpRequestBodyHandler.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.platform.http.vertx;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.camel.Message;
+import org.apache.camel.converter.stream.CachedOutputStream;
+
+import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isFormUrlEncoded;
+import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isMultiPartFormData;
+
+/**
+ * A {@link HttpRequestBodyHandler} that can handle large request bodies via {@link CachedOutputStream}.
+ */
+class StreamingHttpRequestBodyHandler extends HttpRequestBodyHandler {
+    StreamingHttpRequestBodyHandler(Handler<RoutingContext> delegate) {
+        super(delegate);
+    }
+
+    @Override
+    void configureRoute(Route route) {
+        // No configuration necessary for streaming
+    }
+
+    @Override
+    Future<Void> handle(RoutingContext routingContext, Message message) {
+        // Reject multipart requests if streaming enabled as we can't be sure when Vert.x has
+        // fully written the attachments to disk after invoking the default body handler.
+        if (isMultiPartFormData(routingContext)) {
+            return Future.failedFuture(
+                    new IllegalStateException("Cannot process multipart/form-data requests when useStreaming=true"));
+        }
+
+        Promise<Void> promise = Promise.promise();
+        HttpServerRequest request = routingContext.request();
+        if (isFormUrlEncoded(routingContext)) {
+            // Delegate body handling to the default body handler
+            delegate.handle(routingContext);
+            request.endHandler(promise::complete);
+        } else {
+            // Process each body 'chunk' and write it to CachedOutputStream
+            CachedOutputStream stream = new CachedOutputStream(message.getExchange(), true);
+            AtomicReference<Exception> failureCause = new AtomicReference<>();
+            request.handler(buffer -> {
+                try {
+                    stream.write(buffer.getBytes());
+                } catch (IOException e) {
+                    failureCause.set(e);
+                }
+            });
+            // After the body is read, close the CachedOutputStream and get an InputStream to use as the message body
+            request.endHandler(event -> {
+                try {
+                    stream.close();
+
+                    Exception failure = failureCause.get();
+                    if (failure == null) {
+                        message.setBody(stream.getInputStream());
+                        promise.complete();
+                    } else {
+                        promise.fail(failure);
+                    }
+                } catch (IOException e) {
+                    promise.fail(e);
+                }
+            });
+        }
+
+        return promise.future();
+    }
+}
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
index d948d5c833a..915c1926110 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
@@ -26,15 +26,13 @@ import java.util.regex.Pattern;
 
 import jakarta.activation.DataHandler;
 
-import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
 import io.vertx.core.Handler;
 import io.vertx.core.MultiMap;
 import io.vertx.core.Vertx;
-import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpMethod;
 import io.vertx.ext.auth.User;
 import io.vertx.ext.web.FileUpload;
-import io.vertx.ext.web.RequestBody;
 import io.vertx.ext.web.Route;
 import io.vertx.ext.web.RoutingContext;
 import io.vertx.ext.web.impl.RouteImpl;
@@ -56,6 +54,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.appendHeader;
+import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isFormUrlEncoded;
+import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.isMultiPartFormData;
 import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.populateCamelHeaders;
 import static org.apache.camel.component.platform.http.vertx.VertxPlatformHttpSupport.writeResponse;
 
@@ -74,6 +74,7 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
     private String path;
     private Route route;
     private VertxPlatformHttpRouter router;
+    private HttpRequestBodyHandler httpRequestBodyHandler;
 
     public VertxPlatformHttpConsumer(PlatformHttpEndpoint endpoint,
                                      Processor processor,
@@ -97,6 +98,11 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
         methods = Method.parseList(getEndpoint().getHttpMethodRestrict());
         path = configureEndpointPath(getEndpoint());
         router = VertxPlatformHttpRouter.lookup(getEndpoint().getCamelContext());
+        if (!getEndpoint().isHttpProxy() && getEndpoint().isUseStreaming()) {
+            httpRequestBodyHandler = new StreamingHttpRequestBodyHandler(router.bodyHandler());
+        } else {
+            httpRequestBodyHandler = new DefaultHttpRequestBodyHandler(router.bodyHandler());
+        }
     }
 
     @Override
@@ -126,7 +132,7 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
             }
         }
 
-        newRoute.handler(router.bodyHandler());
+        httpRequestBodyHandler.configureRoute(newRoute);
         for (Handler<RoutingContext> handler : handlers) {
             newRoute.handler(handler);
         }
@@ -161,7 +167,8 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
         }
 
         final Vertx vertx = ctx.vertx();
-        final Exchange exchange = toExchange(ctx);
+        final Exchange exchange = createExchange(false);
+        exchange.setPattern(ExchangePattern.InOut);
 
         //
         // We do not know if any of the processing logic of the route is synchronous or not so we
@@ -180,45 +187,51 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
         //             .to("rest:get:?bridgeEndpoint=true");
         //
 
-        if (getEndpoint().isHttpProxy()) {
-            handleProxy(ctx, exchange);
-        }
+        // Note: any logic that needs to interrogate HTTP headers not provided by RoutingContext.parsedHeaders, should
+        // be done inside of the following onComplete block, to ensure that the HTTP request is fully processed.
+        processHttpRequest(exchange, ctx).onComplete(result -> {
+            if (result.failed()) {
+                handleFailure(exchange, ctx, result.cause());
+                return;
+            }
 
-        vertx.executeBlocking(() -> processRequest(exchange), false)
-                .onComplete(result -> processResult(ctx, result, exchange));
-    }
+            if (getEndpoint().isHttpProxy()) {
+                handleProxy(ctx, exchange);
+            }
 
-    private void processResult(
-            RoutingContext ctx, AsyncResult<Object> result, Exchange exchange) {
-        Throwable failure = null;
-        try {
-            if (result.succeeded()) {
-                try {
-                    writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy(), muteExceptions);
-                } catch (Exception e) {
-                    failure = e;
+            populateMultiFormData(ctx, exchange.getIn(), getEndpoint().getHeaderFilterStrategy());
+
+            vertx.executeBlocking(() -> processExchange(exchange), false).onComplete(processExchangeResult -> {
+                if (processExchangeResult.succeeded()) {
+                    writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy(), muteExceptions)
+                            .onComplete(writeResponseResult -> {
+                                if (writeResponseResult.succeeded()) {
+                                    handleExchangeComplete(exchange);
+                                } else {
+                                    handleFailure(exchange, ctx, writeResponseResult.cause());
+                                }
+                            });
+                } else {
+                    handleFailure(exchange, ctx, processExchangeResult.cause());
                 }
-            } else {
-                failure = result.cause();
-            }
+            });
+        });
+    }
 
-            if (failure != null) {
-                handleFailure(ctx, failure);
-            }
-        } finally {
-            doneUoW(exchange);
-            releaseExchange(exchange, false);
-        }
+    private void handleExchangeComplete(Exchange exchange) {
+        doneUoW(exchange);
+        releaseExchange(exchange, false);
     }
 
-    private void handleFailure(RoutingContext ctx, Throwable failure) {
+    private void handleFailure(Exchange exchange, RoutingContext ctx, Throwable failure) {
         getExceptionHandler().handleException(
                 "Failed handling platform-http endpoint " + getEndpoint().getPath(),
                 failure);
         ctx.fail(failure);
+        handleExchangeComplete(exchange);
     }
 
-    private Object processRequest(Exchange exchange) throws Exception {
+    private Object processExchange(Exchange exchange) throws Exception {
         createUoW(exchange);
         getProcessor().process(exchange);
         return null;
@@ -236,10 +249,7 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
         exchange.getMessage().removeHeader("Proxy-Connection");
     }
 
-    protected Exchange toExchange(RoutingContext ctx) {
-        final Exchange exchange = createExchange(false);
-        exchange.setPattern(ExchangePattern.InOut);
-
+    protected Future<Void> processHttpRequest(Exchange exchange, RoutingContext ctx) {
         // reuse existing http message if pooled
         Message in = exchange.getIn();
         if (in instanceof HttpMessage hm) {
@@ -248,7 +258,6 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
             in = new HttpMessage(exchange, ctx.request(), ctx.response());
             exchange.setMessage(in);
         }
-        populateCamelMessage(ctx, exchange, in);
 
         final String charset = ctx.parsedHeaders().contentType().parameter("charset");
         if (charset != null) {
@@ -261,53 +270,38 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
             in.setHeader(VertxPlatformHttpConstants.AUTHENTICATED_USER, user);
         }
 
-        return exchange;
+        return populateCamelMessage(ctx, exchange, in);
     }
 
-    protected void populateCamelMessage(RoutingContext ctx, Exchange exchange, Message result) {
+    protected Future<Void> populateCamelMessage(RoutingContext ctx, Exchange exchange, Message message) {
         final HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy();
-        populateCamelHeaders(ctx, result.getHeaders(), exchange, headerFilterStrategy);
-        final String mimeType = ctx.parsedHeaders().contentType().value();
-        final boolean isMultipartFormData = "multipart/form-data".equals(mimeType);
-
-        if ("application/x-www-form-urlencoded".equals(mimeType) || isMultipartFormData) {
-            populateMultiFormData(ctx, exchange, result, headerFilterStrategy, isMultipartFormData);
-        } else {
-            populateDefaultMessage(ctx, result);
-        }
-    }
-
-    private static void populateDefaultMessage(RoutingContext ctx, Message result) {
-        final RequestBody requestBody = ctx.body();
-        final Buffer body = requestBody.buffer();
-        if (body != null) {
-            result.setBody(body);
-        } else {
-            result.setBody(null);
-        }
+        populateCamelHeaders(ctx, message.getHeaders(), exchange, headerFilterStrategy);
+        return httpRequestBodyHandler.handle(ctx, message);
     }
 
     private void populateMultiFormData(
-            RoutingContext ctx, Exchange exchange, Message result, HeaderFilterStrategy headerFilterStrategy,
-            boolean isMultipartFormData) {
-        final MultiMap formData = ctx.request().formAttributes();
-        final Map<String, Object> body = new HashMap<>();
-        for (String key : formData.names()) {
-            for (String value : formData.getAll(key)) {
-                if (headerFilterStrategy != null
-                        && !headerFilterStrategy.applyFilterToExternalHeaders(key, value, exchange)) {
-                    appendHeader(result.getHeaders(), key, value);
-                    appendHeader(body, key, value);
+            RoutingContext ctx, Message message, HeaderFilterStrategy headerFilterStrategy) {
+        final boolean isMultipartFormData = isMultiPartFormData(ctx);
+        if (isFormUrlEncoded(ctx) || isMultipartFormData) {
+            final MultiMap formData = ctx.request().formAttributes();
+            final Map<String, Object> body = new HashMap<>();
+            for (String key : formData.names()) {
+                for (String value : formData.getAll(key)) {
+                    if (headerFilterStrategy != null
+                            && !headerFilterStrategy.applyFilterToExternalHeaders(key, value, message.getExchange())) {
+                        appendHeader(message.getHeaders(), key, value);
+                        appendHeader(body, key, value);
+                    }
                 }
             }
-        }
 
-        if (!body.isEmpty()) {
-            result.setBody(body);
-        }
+            if (!body.isEmpty()) {
+                message.setBody(body);
+            }
 
-        if (isMultipartFormData) {
-            populateAttachments(ctx.fileUploads(), result);
+            if (isMultipartFormData) {
+                populateAttachments(ctx.fileUploads(), message);
+            }
         }
     }
 
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
index 73e3e22641c..19b14a5fc47 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
@@ -27,11 +27,17 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.vertx.core.Context;
+import io.vertx.core.Future;
 import io.vertx.core.MultiMap;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpServerRequest;
 import io.vertx.core.http.HttpServerResponse;
 import io.vertx.core.net.SocketAddress;
+import io.vertx.core.streams.Pump;
 import io.vertx.ext.web.RoutingContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -189,57 +195,72 @@ public final class VertxPlatformHttpSupport {
         return codeToUse;
     }
 
-    static void writeResponse(
-            RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy, boolean muteExceptions)
-            throws Exception {
+    static Future<Void> writeResponse(
+            RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy, boolean muteExceptions) {
         final Object body = toHttpResponse(ctx.response(), camelExchange.getMessage(), headerFilterStrategy, muteExceptions);
-        final HttpServerResponse response = ctx.response();
+        final Promise<Void> promise = Promise.promise();
 
         if (body == null) {
             LOGGER.trace("No payload to send as reply for exchange: {}", camelExchange);
-            response.end();
+            ctx.end();
+            promise.complete();
         } else if (body instanceof String) {
-            response.end((String) body);
+            ctx.end((String) body);
+            promise.complete();
         } else if (body instanceof InputStream) {
-            writeResponseAs(response, (InputStream) body);
+            writeResponseAs(promise, ctx, (InputStream) body);
         } else if (body instanceof Buffer) {
-            response.end((Buffer) body);
+            ctx.end((Buffer) body);
+            promise.complete();
         } else {
-            writeResponseAsFallback(camelExchange, body, response);
+            try {
+                writeResponseAsFallback(promise, camelExchange, body, ctx);
+            } catch (IOException | NoTypeConversionAvailableException e) {
+                promise.fail(e);
+            }
         }
+
+        return promise.future();
     }
 
-    private static void writeResponseAsFallback(Exchange camelExchange, Object body, HttpServerResponse response)
+    private static void writeResponseAsFallback(Promise<Void> promise, Exchange camelExchange, Object body, RoutingContext ctx)
             throws NoTypeConversionAvailableException, IOException {
         final TypeConverter tc = camelExchange.getContext().getTypeConverter();
         // Try to convert to ByteBuffer for performance reason
         final ByteBuffer bb = tc.tryConvertTo(ByteBuffer.class, camelExchange, body);
         if (bb != null) {
-            writeResponseAs(response, bb);
+            writeResponseAs(promise, ctx, bb);
         } else {
             // Otherwise fallback to most generic InputStream conversion
             final InputStream is = tc.mandatoryConvertTo(InputStream.class, camelExchange, body);
-            writeResponseAs(response, is);
+            writeResponseAs(promise, ctx, is);
         }
     }
 
-    private static void writeResponseAs(HttpServerResponse response, ByteBuffer bb) {
+    private static void writeResponseAs(Promise<Void> promise, RoutingContext ctx, ByteBuffer bb) {
         final Buffer b = Buffer.buffer(bb.capacity());
         b.setBytes(0, bb);
-        response.end(b);
+        ctx.end(b);
+        promise.complete();
     }
 
-    private static void writeResponseAs(HttpServerResponse response, InputStream is) throws IOException {
-        final byte[] bytes = new byte[4096];
-        try (InputStream in = is) {
-            int len;
-            while ((len = in.read(bytes)) >= 0) {
-                final Buffer b = Buffer.buffer(len);
-                b.appendBytes(bytes, 0, len);
-                response.write(b);
-            }
-        }
-        response.end();
+    private static void writeResponseAs(Promise<Void> promise, RoutingContext ctx, InputStream is) {
+        HttpServerResponse response = ctx.response();
+        Vertx vertx = ctx.vertx();
+        Context context = vertx.getOrCreateContext();
+
+        // Process the InputStream async to avoid blocking the Vert.x event loop on large responses
+        AsyncInputStream asyncInputStream = new AsyncInputStream(vertx, context, is);
+        asyncInputStream.exceptionHandler(promise::fail);
+        asyncInputStream.endHandler(event -> {
+            response.end().onComplete(result -> {
+                asyncInputStream.close(closeResult -> promise.complete());
+            });
+        });
+
+        // Pump the InputStream content into the HTTP response WriteStream
+        Pump pump = Pump.pump(asyncInputStream, response);
+        context.runOnContext(event -> pump.start());
     }
 
     static void populateCamelHeaders(
@@ -336,4 +357,22 @@ public final class VertxPlatformHttpSupport {
         value = list;
         return value;
     }
+
+    static boolean isMultiPartFormData(RoutingContext ctx) {
+        return isContentTypeMatching(ctx, HttpHeaderValues.MULTIPART_FORM_DATA.toString());
+    }
+
+    static boolean isFormUrlEncoded(RoutingContext ctx) {
+        return isContentTypeMatching(ctx, HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString());
+    }
+
+    private static boolean isContentTypeMatching(RoutingContext ctx, String expectedContentType) {
+        String contentType = ctx.parsedHeaders().contentType().value();
+        boolean match = false;
+        if (org.apache.camel.util.ObjectHelper.isNotEmpty(contentType)) {
+            String lowerCaseContentType = contentType.toLowerCase();
+            match = lowerCaseContentType.startsWith(expectedContentType);
+        }
+        return match;
+    }
 }
diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
index 633826ad959..8b5b5642ebf 100644
--- a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
+++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
@@ -880,7 +880,7 @@ public class VertxPlatformHttpEngineTest {
         return createCamelContext(null);
     }
 
-    private static CamelContext createCamelContext(ServerConfigurationCustomizer customizer) throws Exception {
+    static CamelContext createCamelContext(ServerConfigurationCustomizer customizer) throws Exception {
         int port = AvailablePortFinder.getNextAvailable();
         VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
         conf.setBindPort(port);
diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpLargeMessageStreamingTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpLargeMessageStreamingTest.java
new file mode 100644
index 00000000000..7c3a118a30e
--- /dev/null
+++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpLargeMessageStreamingTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.platform.http.vertx;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Random;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.IOHelper;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+
+import static io.restassured.RestAssured.given;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@EnabledIfSystemProperty(named = "performance-tests", matches = ".*")
+public class VertxPlatformHttpLargeMessageStreamingTest {
+
+    @Test
+    void testStreamingWithLargeRequestAndResponseBody() throws Exception {
+        final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext();
+        context.getStreamCachingStrategy().setSpoolEnabled(true);
+
+        Path input = createLargeFile();
+        Path output = Files.createTempFile("platform-http-output", "dat");
+
+        try {
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("platform-http:/streaming?useStreaming=true")
+                            .log("Done echoing back request body as response body");
+                }
+            });
+
+            context.start();
+
+            InputStream response = given()
+                    .body(new FileInputStream(input.toFile()))
+                    .post("/streaming")
+                    .then()
+                    .extract()
+                    .asInputStream();
+
+            try (FileOutputStream fos = new FileOutputStream(output.toFile())) {
+                IOHelper.copy(response, fos);
+            }
+
+            assertEquals(input.toFile().length(), output.toFile().length());
+        } finally {
+            context.stop();
+            Files.deleteIfExists(input);
+            Files.deleteIfExists(output);
+        }
+    }
+
+    private Path createLargeFile() throws IOException {
+        // Create a 4GB file containing random data
+        Path path = Files.createTempFile("platform-http-input", "dat");
+        try (FileOutputStream fos = new FileOutputStream(path.toFile())) {
+            Random random = new Random();
+            long targetFileSize = (long) (4 * Math.pow(1024, 3));
+            long bytesWritten = 0L;
+
+            byte[] data = new byte[1024];
+            while (bytesWritten < targetFileSize) {
+                random.nextBytes(data);
+                fos.write(data);
+                bytesWritten += data.length;
+            }
+        }
+        return path;
+    }
+}
diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpProxyTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpProxyTest.java
index df2a47cdc9d..d940aef8ac3 100644
--- a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpProxyTest.java
+++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpProxyTest.java
@@ -25,7 +25,8 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.AvailablePortFinder;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
 import static com.github.tomakehurst.wiremock.client.WireMock.get;
@@ -55,15 +56,16 @@ public class VertxPlatformHttpProxyTest {
         }
     }
 
-    @Test
-    void testProxy() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = { false, true })
+    void testProxy(boolean useStreaming) throws Exception {
         final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext();
 
         try {
             context.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() {
-                    from("platform-http:proxy")
+                    from("platform-http:proxy?useStreaming=" + useStreaming)
                             .toD("${headers." + Exchange.HTTP_URI + "}?bridgeEndpoint=true");
                 }
             });
diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpStreamingTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpStreamingTest.java
new file mode 100644
index 00000000000..786047cbda6
--- /dev/null
+++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpStreamingTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.platform.http.vertx;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import io.restassured.http.ContentType;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.IOHelper;
+import org.junit.jupiter.api.Test;
+
+import static io.restassured.RestAssured.given;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class VertxPlatformHttpStreamingTest {
+
+    @Test
+    void testStreamingWithStringRequestAndResponseBody() throws Exception {
+        final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext();
+
+        try {
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("platform-http:/streaming?useStreaming=true")
+                            .transform().simple("Hello ${body}");
+                }
+            });
+
+            context.start();
+
+            String requestBody = "Vert.x Platform HTTP";
+            given()
+                    .body(requestBody)
+                    .post("/streaming")
+                    .then()
+                    .statusCode(200)
+                    .body(is("Hello " + requestBody));
+        } finally {
+            context.stop();
+        }
+    }
+
+    @Test
+    void testStreamingWithFileRequestAndResponseBody() throws Exception {
+        final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext();
+        String content = "Hello World";
+        Path testFile = Files.createTempFile("platform-http-testing", "txt");
+        Files.writeString(testFile, content);
+
+        try {
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("platform-http:/streaming?useStreaming=true")
+                            .log("Done processing request");
+                }
+            });
+
+            context.start();
+
+            given()
+                    .body(testFile.toFile())
+                    .post("/streaming")
+                    .then()
+                    .statusCode(200)
+                    .body(is(content));
+        } finally {
+            context.stop();
+            Files.deleteIfExists(testFile);
+        }
+    }
+
+    @Test
+    void testStreamingWithFormUrlEncodedBody() throws Exception {
+        final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext();
+        try {
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("platform-http:/streaming?useStreaming=true")
+                            .setBody().simple("foo = ${header.foo}");
+                }
+            });
+
+            context.start();
+
+            given()
+                    .contentType(ContentType.URLENC)
+                    .formParam("foo", "bar")
+                    .post("/streaming")
+                    .then()
+                    .statusCode(200)
+                    .body(is("foo = bar"));
+        } finally {
+            context.stop();
+        }
+    }
+
+    @Test
+    void testStreamingWithMultiPartRequestRejected() throws Exception {
+        String content = "Hello World";
+        Path testFile = Files.createTempFile("platform-http-testing", "txt");
+        Files.writeString(testFile, content);
+
+        final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext(configuration -> {
+            VertxPlatformHttpServerConfiguration.BodyHandler bodyHandler
+                    = new VertxPlatformHttpServerConfiguration.BodyHandler();
+            // turn on file uploads
+            bodyHandler.setHandleFileUploads(true);
+            bodyHandler.setUploadsDirectory(testFile.toFile().getParent());
+            configuration.setBodyHandler(bodyHandler);
+        });
+
+        try {
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("platform-http:/streaming?useStreaming=true")
+                            .setBody().constant("multipart request should have been rejected");
+                }
+            });
+
+            context.start();
+
+            given()
+                    .multiPart(testFile.toFile())
+                    .post("/streaming")
+                    .then()
+                    .statusCode(500);
+        } finally {
+            context.stop();
+            Files.deleteIfExists(testFile);
+        }
+    }
+
+    @Test
+    void testStreamingWithSpecificEncoding() throws Exception {
+        final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext();
+        Path input = Files.createTempFile("platform-http-input", "dat");
+        Path output = Files.createTempFile("platform-http-output", "dat");
+
+        String fileContent = "Content with special character ð";
+        Files.writeString(input, fileContent, StandardCharsets.ISO_8859_1);
+
+        try {
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("platform-http:/streaming?useStreaming=true")
+                            .log("Done echoing back request body as response body");
+                }
+            });
+
+            context.start();
+
+            InputStream response = given()
+                    .body(new FileInputStream(input.toFile()))
+                    .post("/streaming")
+                    .then()
+                    .statusCode(200)
+                    .extract()
+                    .body()
+                    .asInputStream();
+
+            try (FileOutputStream fos = new FileOutputStream(output.toFile())) {
+                IOHelper.copy(response, fos);
+            }
+
+            assertEquals(fileContent, Files.readString(output, StandardCharsets.ISO_8859_1));
+        } finally {
+            context.stop();
+        }
+    }
+
+    @Test
+    void testStreamingWithClosedInputStreamResponse() throws Exception {
+        final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext();
+        try {
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("platform-http:/streaming?useStreaming=true")
+                            .process(new Processor() {
+                                @Override
+                                public void process(Exchange exchange) throws Exception {
+                                    // Simulate an error processing an input stream by closing it ahead of the response being written
+                                    // Verifies the response promise.fail is called correctly
+                                    InputStream stream = getClass().getResourceAsStream("/authentication/auth.properties");
+                                    if (stream != null) {
+                                        stream.close();
+                                    }
+                                    exchange.getMessage().setBody(stream);
+                                }
+                            });
+                }
+            });
+
+            context.start();
+
+            given()
+                    .get("/streaming")
+                    .then()
+                    .statusCode(500);
+        } finally {
+            context.stop();
+        }
+    }
+
+    @Test
+    void testStreamingWithUnconvertableResponseType() throws Exception {
+        final CamelContext context = VertxPlatformHttpEngineTest.createCamelContext();
+        try {
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("platform-http:/streaming?useStreaming=true")
+                            .process(new Processor() {
+                                @Override
+                                public void process(Exchange exchange) throws Exception {
+                                    // Force a type conversion exception and verify the response promise.fail is called correctly
+                                    exchange.getMessage().setBody(new TestBean());
+                                }
+                            });
+                }
+            });
+
+            context.start();
+
+            given()
+                    .get("/streaming")
+                    .then()
+                    .statusCode(500);
+        } finally {
+            context.stop();
+        }
+    }
+
+    static final class TestBean {
+    }
+}
diff --git a/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointConfigurer.java b/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointConfigurer.java
index 426e9de52d5..6223fdd295b 100644
--- a/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointConfigurer.java
+++ b/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointConfigurer.java
@@ -41,6 +41,8 @@ public class PlatformHttpEndpointConfigurer extends PropertyConfigurerSupport im
         case "platformhttpengine":
         case "platformHttpEngine": target.setPlatformHttpEngine(property(camelContext, org.apache.camel.component.platform.http.spi.PlatformHttpEngine.class, value)); return true;
         case "produces": target.setProduces(property(camelContext, java.lang.String.class, value)); return true;
+        case "usestreaming":
+        case "useStreaming": target.setUseStreaming(property(camelContext, boolean.class, value)); return true;
         default: return false;
         }
     }
@@ -68,6 +70,8 @@ public class PlatformHttpEndpointConfigurer extends PropertyConfigurerSupport im
         case "platformhttpengine":
         case "platformHttpEngine": return org.apache.camel.component.platform.http.spi.PlatformHttpEngine.class;
         case "produces": return java.lang.String.class;
+        case "usestreaming":
+        case "useStreaming": return boolean.class;
         default: return null;
         }
     }
@@ -96,6 +100,8 @@ public class PlatformHttpEndpointConfigurer extends PropertyConfigurerSupport im
         case "platformhttpengine":
         case "platformHttpEngine": return target.getPlatformHttpEngine();
         case "produces": return target.getProduces();
+        case "usestreaming":
+        case "useStreaming": return target.isUseStreaming();
         default: return null;
         }
     }
diff --git a/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointUriFactory.java b/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointUriFactory.java
index f680a4e90a8..78a658d5a8a 100644
--- a/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointUriFactory.java
+++ b/components/camel-platform-http/src/generated/java/org/apache/camel/component/platform/http/PlatformHttpEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class PlatformHttpEndpointUriFactory extends org.apache.camel.support.com
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(12);
+        Set<String> props = new HashSet<>(13);
         props.add("bridgeErrorHandler");
         props.add("consumes");
         props.add("exceptionHandler");
@@ -34,6 +34,7 @@ public class PlatformHttpEndpointUriFactory extends org.apache.camel.support.com
         props.add("path");
         props.add("platformHttpEngine");
         props.add("produces");
+        props.add("useStreaming");
         PROPERTY_NAMES = Collections.unmodifiableSet(props);
         SECRET_PROPERTY_NAMES = Collections.emptySet();
         MULTI_VALUE_PREFIXES = Collections.emptySet();
diff --git a/components/camel-platform-http/src/generated/resources/org/apache/camel/component/platform/http/platform-http.json b/components/camel-platform-http/src/generated/resources/org/apache/camel/component/platform/http/platform-http.json
index bb67bb10512..69c9b292526 100644
--- a/components/camel-platform-http/src/generated/resources/org/apache/camel/component/platform/http/platform-http.json
+++ b/components/camel-platform-http/src/generated/resources/org/apache/camel/component/platform/http/platform-http.json
@@ -33,11 +33,12 @@
     "matchOnUriPrefix": { "index": 3, "kind": "parameter", "displayName": "Match On Uri Prefix", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether or not the consumer should try to find a target consumer by matching the URI prefix if no exact match is found." },
     "muteException": { "index": 4, "kind": "parameter", "displayName": "Mute Exception", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "If enabled and an Exchange failed processing on the consumer side the response's body won't contain the exception's stack trace." },
     "produces": { "index": 5, "kind": "parameter", "displayName": "Produces", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The content type this endpoint produces, such as application\/xml or application\/json." },
-    "bridgeErrorHandler": { "index": 6, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming  [...]
-    "exceptionHandler": { "index": 7, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By def [...]
-    "exchangePattern": { "index": 8, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
-    "fileNameExtWhitelist": { "index": 9, "kind": "parameter", "displayName": "File Name Ext Whitelist", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "A comma or whitespace separated list of file extensions. Uploads having these extensions will be stored locally. Null value or asterisk () will allow all files." },
-    "headerFilterStrategy": { "index": 10, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter headers to and from Camel message." },
-    "platformHttpEngine": { "index": 11, "kind": "parameter", "displayName": "Platform Http Engine", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.platform.http.spi.PlatformHttpEngine", "deprecated": false, "autowired": false, "secret": false, "description": "An HTTP Server engine implementation to serve the requests of this endpoint." }
+    "useStreaming": { "index": 6, "kind": "parameter", "displayName": "Use Streaming", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to use streaming for large requests and responses" },
+    "bridgeErrorHandler": { "index": 7, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming  [...]
+    "exceptionHandler": { "index": 8, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By def [...]
+    "exchangePattern": { "index": 9, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
+    "fileNameExtWhitelist": { "index": 10, "kind": "parameter", "displayName": "File Name Ext Whitelist", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "A comma or whitespace separated list of file extensions. Uploads having these extensions will be stored locally. Null value or asterisk () will allow all files." },
+    "headerFilterStrategy": { "index": 11, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter headers to and from Camel message." },
+    "platformHttpEngine": { "index": 12, "kind": "parameter", "displayName": "Platform Http Engine", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.platform.http.spi.PlatformHttpEngine", "deprecated": false, "autowired": false, "secret": false, "description": "An HTTP Server engine implementation to serve the requests of this endpoint." }
   }
 }
diff --git a/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/PlatformHttpEndpoint.java b/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/PlatformHttpEndpoint.java
index 6c3a3e81de1..2b12a69e88c 100644
--- a/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/PlatformHttpEndpoint.java
+++ b/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/PlatformHttpEndpoint.java
@@ -69,6 +69,9 @@ public class PlatformHttpEndpoint extends DefaultEndpoint implements AsyncEndpoi
     @UriParam(label = "advanced",
               description = "To use a custom HeaderFilterStrategy to filter headers to and from Camel message.")
     private HeaderFilterStrategy headerFilterStrategy = new PlatformHttpHeaderFilterStrategy();
+    @UriParam(label = "consumer", defaultValue = "false",
+              description = "Whether to use streaming for large requests and responses")
+    private boolean useStreaming;
 
     public PlatformHttpEndpoint(String uri, String remaining, Component component) {
         super(uri, component);
@@ -168,6 +171,14 @@ public class PlatformHttpEndpoint extends DefaultEndpoint implements AsyncEndpoi
         this.muteException = muteException;
     }
 
+    public boolean isUseStreaming() {
+        return useStreaming;
+    }
+
+    public void setUseStreaming(boolean useStreaming) {
+        this.useStreaming = useStreaming;
+    }
+
     PlatformHttpEngine getOrCreateEngine() {
         return platformHttpEngine != null
                 ? platformHttpEngine
diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PlatformHttpEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PlatformHttpEndpointBuilderFactory.java
index 8a6dd245fcc..c5db0313347 100644
--- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PlatformHttpEndpointBuilderFactory.java
+++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PlatformHttpEndpointBuilderFactory.java
@@ -159,6 +159,37 @@ public interface PlatformHttpEndpointBuilderFactory {
             doSetProperty("produces", produces);
             return this;
         }
+        /**
+         * Whether to use streaming for large requests and responses.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: consumer
+         * 
+         * @param useStreaming the value to set
+         * @return the dsl builder
+         */
+        default PlatformHttpEndpointBuilder useStreaming(boolean useStreaming) {
+            doSetProperty("useStreaming", useStreaming);
+            return this;
+        }
+        /**
+         * Whether to use streaming for large requests and responses.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: consumer
+         * 
+         * @param useStreaming the value to set
+         * @return the dsl builder
+         */
+        default PlatformHttpEndpointBuilder useStreaming(String useStreaming) {
+            doSetProperty("useStreaming", useStreaming);
+            return this;
+        }
     }
 
     /**